Python百万级别大批量数据写入实现方法详解

Python 大批量写入数据 百万级别

  • 背景
  • 方案
  • 代码
  • 背景

    	现有一个百万行数据的csv格式文件,需要在两分钟之内存入数据库。
    

    方案

    	方案一:多线程+协程+异步MySql
    	方案二:多线程+MySql批量插入
    

    代码

    	1,先通过pandas读取所有csv数据存入列表。
    	2,设置N个线程,将一百万数据均分为N份,以start,end传递给线程以切片的方法读取区间数据(建议为16个线程)
    	3,方案二 线程内以  executemany 方法批量插入所有数据。
    	4,方案一 线程内使用异步事件循环遍历所有数据异步插入。 
    	5,方案一纯属没事找事型。
    

    方案二

    import threading
    
    import pandas as pd
    import asyncio
    import time
    
    import aiomysql
    import pymysql
    
    data=[]
    error_data=[]
    
    
    def run(start,end):
        global data
        global error_data
        print("start"+threading.current_thread().name)
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        mysdb = getDb("*", *, "*", "*", "*")
        cursor = mysdb.cursor()
        sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
        cursor.executemany(sql,data[start:end])
        mysdb.commit()
        mysdb.close()
        print("end" + threading.current_thread().name)
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    
    
    def csv_file_read_use_pd(csvFile):
        csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
        csv_result = csv_result.fillna(value="None")
        result = csv_result.values.tolist()
        return result
    
    
    class MyDataBase:
        def __init__(self,host=None,port=None,username=None,password=None,database=None):
            self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database)
        def close(self):
            self.db.close()
    
    def getDb(host,port,username,password,database):
        MyDb = MyDataBase(host, port, username, password,database)
        return MyDb.db
    
    def main(csvFile):
        global data  #获取全局对象  csv全量数据
        #读取所有的数据   将所有数据均分成   thread_lens   份 分发给  thread_lens  个线程去执行
        thread_lens=20
        csv_result=csv_file_read_use_pd(csvFile)
        day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        for item in csv_result:
            item.insert(0,day)
    
        data=csv_result
        thread_exe_count_list=[]   #线程需要执行的区间
        csv_lens=len(csv_result)
        avg = csv_lens // thread_lens
        remainder=csv_lens % thread_lens
        # 0,27517  27517,55,034
        nowIndex=0
        for i in range(thread_lens):
            temp=[nowIndex,nowIndex+avg]
            nowIndex=nowIndex+avg
            thread_exe_count_list.append(temp)
        thread_exe_count_list[-1:][0][1]+=remainder  #余数分给最后一个线程
        # print(thread_exe_count_list)
    
        #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])
    
        for i in range(thread_lens):
            sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
            sub_thread.start()
            sub_thread.join()
            time.sleep(3)
    
    
    if __name__=="__main__":
        #csv_file_read_use_pd("分公司箱型箱量.csv")
        main("分公司箱型箱量.csv")
    

    方案一

    import threading
    
    import pandas as pd
    import asyncio
    import time
    
    import aiomysql
    
    
    data=[]
    error_data=[]
    
    
    async def async_basic(loop,start,end):
        global data
        global error_data
        print("start"+threading.current_thread().name)
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        conn = await aiomysql.connect(
            host="*",
            port=*,
            user="*",
            password="*",
            db="*",
            loop=loop
        )
        day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
        async with conn.cursor() as cursor:
            for item in data[start:end]:
                params=[day]
                params.extend(item)
                try:
                    x=await cursor.execute(sql,params)
                    if x==0:
                        error_data.append(item)
                    print(threading.current_thread().name+"   result "+str(x))
                except Exception as e:
                    print(e)
                    error_data.append(item)
                    time.sleep(10)
                    pass
        await conn.close()
        #await conn.commit()
        #关闭连接池
        # pool.close()
        # await pool.wait_closed()
        print("end" + threading.current_thread().name)
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    
    
    def csv_file_read_use_pd(csvFile):
        csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
        csv_result = csv_result.fillna(value="None")
        result = csv_result.values.tolist()
        return result
    
    def th(start,end):
        loop = asyncio.new_event_loop()
        loop.run_until_complete(async_basic(loop,start,end))
    
    
    def main(csvFile):
        global data  #获取全局对象  csv全量数据
        #读取所有的数据   将所有数据均分成   thread_lens   份 分发给  thread_lens  个线程去执行
        thread_lens=20
        csv_result=csv_file_read_use_pd(csvFile)
        data=csv_result
        thread_exe_count_list=[]   #线程需要执行的区间
        csv_lens=len(csv_result)
        avg = csv_lens // thread_lens
        remainder=csv_lens % thread_lens
        # 0,27517  27517,55,034
        nowIndex=0
        for i in range(thread_lens):
            temp=[nowIndex,nowIndex+avg]
            nowIndex=nowIndex+avg
            thread_exe_count_list.append(temp)
        thread_exe_count_list[-1:][0][1]+=remainder  #余数分给最后一个线程
        print(thread_exe_count_list)
    
        #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])
    
        for i in range(thread_lens):
            sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
            sub_thread.start()
            time.sleep(3)
    
    
    if __name__=="__main__":
        #csv_file_read_use_pd("分公司箱型箱量.csv")
        main("分公司箱型箱量.csv")
    
    物联沃分享整理
    物联沃-IOTWORD物联网 » Python百万级别大批量数据写入实现方法详解

    发表评论