Python爬虫实例:福利彩票快乐8数据采集与统计指南

采集源码:

首先导入我们需要库

import os
import json
import requests
import pandas as pd
from openpyxl import load_workbook

 创建一个类,并初始化相关数据

class Cwlgovcn:
    def __init__(self, cp_name, cp_num):
        self.cp_name = cp_name
        self.cp_num = cp_num
        self.cp_total = 0
        self.file_path = f'data/{self.cp_name}.xlsx'
        self.headers = {
            'Accept': 'application/json, text/javascript, */*; q=0.01',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Connection': 'keep-alive',
            'Referer': 'http://www.cwl.gov.cn/ygkj/wqkjgg/',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 \
                    (KHTML, like Gecko) Chrome/108.0.5359.125 Safari/537.36',
            'X-Requested-With': 'XMLHttpRequest',
        }
        self.cookies = {
            'HMF_CI': '2ffd86bc4dfebd2f9e3002a3c63a7db4f99b2719aa\
                    736fe8be8a417dd6d0fd31ec6f556d758683a3d1775d2a8a996702\
                    ce0d893ec468054c04684a96b41d9439d1',
            '21_vq': '20',
        }
        self.params = {
            'name': self.cp_name,
            'pageNo': '1',
            'pageSize': self.cp_num,
            'systemType': 'PC',
        }
        self.get_response()
        pass

 添加方法,获取采集需求数量的数据

    # 采集需求数量的数据
    def get_response(self, cp_num: int = 1):
        # 重新赋值 params.pageSize
        self.params['pageSize'] = cp_num
        response = requests.get(
            'http://www.cwl.gov.cn/cwl_admin/front/cwlkj/search/kjxx/findDrawNotice',
            params=self.params,
            cookies=self.cookies,
            headers=self.headers,
            verify=False,
            timeout=1
        )
        content = response.content.decode('utf-8')
        jsons = json.loads(content)
        # 获得彩票总期数
        self.cp_total = jsons['total']
        result = jsons['result'][::-1]
        print(f'{self.cp_name}_total:{self.cp_total}')
        return result

 运用将数据格式化为json格式,并重新排序:result = jsons['result'][::-1]倒序。

按照pandas.dataframe格式化数据

    def format_data(data):
        # 格式化数据(可能有更好的处理办法)
        data_len = len(data)
        keys = list(data[0].keys())
        res_dict = dict()
        for ik in keys:
            res_dict[str(ik)] = []
            for i in range(data_len):
                res_dict[str(ik)].append(data[i][ik])

        return res_dict

 定义自己需要的字段。

    # 获得需要的字段数据
    def get_data(self, cp_num: int = 0):
        if cp_num == 0:
            cp_num = self.cp_total
        result = self.get_response(cp_num)
        res = []
        for i in result:
            dic = dict()
            dic['code'] = int(i['code'])
            dic['date'] = i['date']
            reds = i['red'].split(',')
            for r in range(len(reds)):
                dic['red' + str(r + 1)] = int(reds[r])
            dic['blue'] = int(i['blue'] or 0)
            res.append(dic)

        return self.format_data(res)

计算以保存的数据与网络数据差,得到采集或者更新数量。

    def get_update_num(self):
        sheet_name = 'data'
        df = pd.read_excel(self.file_path, sheet_name=sheet_name)
        code_values = df["code"].values
        code1 = int(code_values[len(code_values) - 1])
        code2 = int(self.get_data(self.cp_total)['code'][self.cp_total - 1])
        return code2 - code1
   def get_data_frame(data):
        return pd.DataFrame(data)

 写入excel文件。

  def save_to_excel(self, sheet_name='data'):
        if os.path.isfile(self.file_path):
            print(f"{self.file_path}已存在")
            wb = load_workbook(self.file_path)
            print(wb.sheetnames)
            wb.close()
            if sheet_name in wb.sheetnames:
                print(f"{sheet_name}工作表已存在,添加数据")
                update_num = self.get_update_num()
                if update_num == 0:
                    print(f"{sheet_name}工作表已是最新数据,不用更新。")
                    return

                data = self.get_data(update_num)
                df = self.get_data_frame(data)
                o = pd.read_excel(self.file_path, sheet_name=sheet_name)
                df1 = pd.concat([o, df], axis=0)
                with pd.ExcelWriter(
                        self.file_path,
                        engine='openpyxl',
                        mode='a',
                        if_sheet_exists='overlay'
                ) as writer:
                    df1.to_excel(
                        writer,
                        index=False,
                        sheet_name=sheet_name,
                    )
                    print(f"{sheet_name}工作表本次更新{update_num}条数据。")
            else:
                print('工作表不存在,创建表写入数据')
                data = self.get_data(self.cp_total + 1)
                df = self.get_data_frame(data)
                with pd.ExcelWriter(
                        self.file_path,
                        engine='openpyxl',
                        mode='a'
                ) as writer:
                    df.to_excel(
                        writer,
                        sheet_name=sheet_name,
                        header=True,
                        index=False,
                    )
        else:
            print(f'{self.file_path}工作簿不存在,直接创建')
            data = self.get_data(self.cp_total + 1)
            df = self.get_data_frame(data)
            with pd.ExcelWriter(
                    self.file_path,
                    engine='openpyxl',
            ) as writer:
                df.to_excel(
                    writer,
                    sheet_name=sheet_name,
                    header=True,
                    index=False,
                )

 根据需要采集相应的数据。(记得新建一个data目录)

cps = ('kl8', 'ssq', '3d')
# 选填既可以采集
cp = Cwlgovcn('kl8', 5000)
cp.save_to_excel()

采集数据如下:

新建一个统计文件:

主要运用pandas的concat连接多列。

import pandas as pd

file_path = "data/kl8.xlsx"
read_sheet_name = 'data'
write_sheet_name = 'tongji'


df1 = pd.read_excel(file_path, sheet_name=read_sheet_name)
df2 = pd.DataFrame()
for i in range(20):
    res_dict = pd.Series(df1['red' + str(i + 1)]).value_counts().to_dict()
    q1 = [x for x in res_dict]
    t1 = [res_dict[k] for k in res_dict]

    df0 = pd.concat([
        df2,
        pd.DataFrame({'q' + str(i + 1): q1, 't' + str(i + 1): t1})],
        axis=1
    )
    df2 = df0
    res_dict.clear()

with pd.ExcelWriter(
        file_path,
        engine='openpyxl',
        mode='a',
        if_sheet_exists='overlay'
) as writer:
    df2.to_excel(
        writer,
        sheet_name=write_sheet_name,
        header=True,
        index=False,
    )
    print(f"{write_sheet_name}工作表本次更新。")

统计数据如下:

物联沃分享整理
物联沃-IOTWORD物联网 » Python爬虫实例:福利彩票快乐8数据采集与统计指南

发表评论