将大型 Pandas 数据帧写入 SQL Server 数据库

Write Large Pandas DataFrames to SQL Server database(将大型 Pandas 数据帧写入 SQL Server 数据库)
本文介绍了将大型 Pandas 数据帧写入 SQL Server 数据库的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 74 个相对较大的 Pandas DataFrame(大约 34,600 行和 8 列),我试图尽快将它们插入到 SQL Server 数据库中.在做了一些研究之后,我了解到好的 ole pandas.to_sql 函数并不适合将如此大的插入到 SQL Server 数据库中,这是我最初采用的方法(非常慢 - 将近一个小时应用程序完成,而使用 mysql 数据库时大约需要 4 分钟.)

I have 74 relatively large Pandas DataFrames (About 34,600 rows and 8 columns) that I am trying to insert into a SQL Server database as quickly as possible. After doing some research, I learned that the good ole pandas.to_sql function is not good for such large inserts into a SQL Server database, which was the initial approach that I took (very slow - almost an hour for the application to complete vs about 4 minutes when using mysql database.)

这篇文章 和许多其他 StackOverflow 帖子都帮助我指明了正确的方向,但我遇到了障碍:

This article, and many other StackOverflow posts have been helpful in pointing me in the right direction, however I've hit a roadblock:

出于上面链接中解释的原因,我正在尝试使用 SQLAlchemy 的核心而不是 ORM.所以,我将数据帧转换为字典,使用 pandas.to_dict 然后执行 execute()insert():

I am trying to use SQLAlchemy's Core rather than the ORM for reasons explained in the link above. So, I am converting the dataframe to a dictionary, using pandas.to_dict and then doing an execute() and insert():

self._session_factory.engine.execute(
    TimeSeriesResultValues.__table__.insert(),
    data)
# 'data' is a list of dictionaries.

问题是插入没有得到任何值——它们显示为一堆空括号,我得到这个错误:

The problem is that insert is not getting any values -- they appear as a bunch of empty parenthesis and I get this error:

(pyodbc.IntegretyError) ('23000', "[23000] [FreeTDS][SQL Server]Cannot
insert the value NULL into the column...

我传入的字典列表中有值,所以我不知道为什么这些值没有显示出来.

There are values in the list of dictionaries that I passed in, so I can't figure out why the values aren't showing up.

这是我要离开的示例:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in range(n)]
    )
    print("SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")

推荐答案

我有一个悲伤的消息要告诉你,SQLAlchemy 实际上并没有为 SQL Server 实现批量导入,它实际上只是执行同样缓慢的单个 INSERTto_sql 正在执行的语句.我会说你最好的选择是尝试使用 bcp 命令行工具编写一些脚本.这是我过去使用过的脚本,但不能保证:

I've got some sad news for you, SQLAlchemy actually doesn't implement bulk imports for SQL Server, it's actually just going to do the same slow individual INSERT statements that to_sql is doing. I would say that your best bet is to try and script something up using the bcp command line tool. Here is a script that I've used in the past, but no guarantees:

from subprocess import check_output, call
import pandas as pd
import numpy as np
import os

pad = 0.1
tablename = 'sandbox.max.pybcp_test'
overwrite=True
raise_exception = True
server = 'P01'
trusted_connection= True
username=None
password=None
delimiter='|'
df = pd.read_csv('D:/inputdata.csv', encoding='latin', error_bad_lines=False)



def get_column_def_sql(col):
   if col.dtype == object:
      width = col.str.len().max() * (1+pad)
      return '[{}] varchar({})'.format(col.name, int(width)) 
   elif np.issubdtype(col.dtype, float):
      return'[{}] float'.format(col.name) 
   elif np.issubdtype(col.dtype, int):
      return '[{}] int'.format(col.name) 
   else:
      if raise_exception:
         raise NotImplementedError('data type {} not implemented'.format(col.dtype))
      else:
         print('Warning: cast column {} as varchar; data type {} not implemented'.format(col, col.dtype))
         width = col.str.len().max() * (1+pad)
         return '[{}] varchar({})'.format(col.name, int(width)) 

def create_table(df, tablename, server, trusted_connection, username, password, pad):         
    if trusted_connection:
       login_string = '-E'
    else:
       login_string = '-U {} -P {}'.format(username, password)

    col_defs = []
    for col in df:
       col_defs += [get_column_def_sql(df[col])]

    query_string = 'CREATE TABLE {}
({})
GO
QUIT'.format(tablename, ',
'.join(col_defs))       
    if overwrite == True:
       query_string = "IF OBJECT_ID('{}', 'U') IS NOT NULL DROP TABLE {};".format(tablename, tablename) + query_string


    query_file = 'c:\pybcp_tempqueryfile.sql'
    with open (query_file,'w') as f:
       f.write(query_string)

    if trusted_connection:
       login_string = '-E'
    else:
       login_string = '-U {} -P {}'.format(username, password)

    o = call('sqlcmd -S {} {} -i {}'.format(server, login_string, query_file), shell=True)
    if o != 0:
       raise BaseException("Failed to create table")
   # o = call('del {}'.format(query_file), shell=True)


def call_bcp(df, tablename):   
    if trusted_connection:
       login_string = '-T'
    else:
       login_string = '-U {} -P {}'.format(username, password)
    temp_file = 'c:\pybcp_tempqueryfile.csv'

    #remove the delimiter and change the encoding of the data frame to latin so sql server can read it
    df.loc[:,df.dtypes == object] = df.loc[:,df.dtypes == object].apply(lambda col: col.str.replace(delimiter,'').str.encode('latin'))
    df.to_csv(temp_file, index = False, sep = '|', errors='ignore')
    o = call('bcp sandbox.max.pybcp_test2 in c:pybcp_tempqueryfile.csv -S "localhost" -T -t^| -r
 -c')

这篇关于将大型 Pandas 数据帧写入 SQL Server 数据库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Execute complex raw SQL query in EF6(在EF6中执行复杂的原始SQL查询)
Flask + PyMySQL giving error no attribute #39;settimeout#39;(FlASK+PyMySQL给出错误,没有属性#39;setTimeout#39;)
SSIS: Model design issue causing duplications - can two fact tables be connected?(SSIS:模型设计问题导致重复-两个事实表可以连接吗?)
SQL Server Graph Database - shortest path using multiple edge types(SQL Server图形数据库-使用多种边类型的最短路径)
Invalid column name when using EF Core filtered includes(使用EF核心过滤包括时无效的列名)
How should make faster SQL Server filtering procedure with many parameters(如何让多参数的SQL Server过滤程序更快)