Python实现上海新闻爬虫:上观新闻与腾讯新闻的抓取之道

1. 起因, 目的:

  • 继续爬上海新闻, 增加新闻来源。
  • 昨天写了: 东方网 + 澎湃新闻
  • 今天增加2个来源: 上观新闻 + 腾讯新闻
  • 此时有4个来源,我觉得已经差不多了。
  • 2. 先看效果

    3. 过程:

    代码 1, 上观新闻

    这里也有一个有趣的地方。

    图片链接是:
    https://images.shobserver.com/news/900_507/2025/05/09/l_cb20250509100210436048.jpg
    注意url 中包含一段数字是, 900_507, 然而,图片的实际尺寸是 899 * 506
    是巧合吗? 我觉得是细腻。

    import os
    import csv
    import time
    import requests
    from datetime import datetime, timedelta
    
    """
    # 上观新闻 shobserver.com       与解放日报关联,报道上海本地案件。
    # home: https://www.shobserver.com/staticsg/home
    # 1. 标题, url, 来源,时间
    
    # api 类似这样:
    post: https://www.shobserver.com/news/homeMoreNews?ver=1746801768088
    
    请求荷载是:
    - 查询字符串参数是 ver=1746801768088
    - 表单数据是: page=4&lastpublishtime=1746797627000
    
    # 每个 api 相应,包含 12 条新闻。
    """
    
    # 请求头
    headers = {
        "Accept": "application/json, text/javascript, */*; q=0.01",
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
        'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',  # 表单数据
        'Referer': 'https://www.shobserver.com/staticsg/home',  # 引荐来源
        'Origin': 'https://www.shobserver.com'  # 跨域请求
    }
    
    def get_shobserver_data(file_name='shang_guan_400.csv', max_pages=100):
        """
        爬取上观新闻数据,保存到 CSV 文件
        参数:
            file_name: 输出 CSV 文件名
            max_pages: 最大爬取页数
        """
        # 检查文件是否存在
        has_file = os.path.exists(file_name)
    
        # 打开 CSV 文件,追加模式
        with open(file_name, 'a', newline='', encoding='utf-8') as file:
            columns = ['title', 'url', 'time', 'source']
            writer = csv.DictWriter(file, fieldnames=columns)
            if not has_file:
                writer.writeheader()
    
            # 计算 lastpublishtime(当前时间戳)
            lastpublishtime = int(time.time() * 1000)  # 当前毫秒时间戳
    
            # 爬取数据
            for page in range(1, max_pages + 1):
                time.sleep(0.5)  # 请求间隔
                # 动态生成 ver(当前毫秒时间戳)
                ver = int(time.time() * 1000)
                # 表单数据
                form_data = {
                    'page': str(page),
                    'lastpublishtime': str(lastpublishtime)
                }
    
                url = 'https://www.shobserver.com/news/homeMoreNews'
                resp = requests.post(url, headers=headers, params={'ver': ver}, data=form_data, timeout=10)
                print(f"请求 {url}, 页码: {page}")
                print(f"请求参数:{form_data}")
                print()
    
                if resp.status_code != 200:
                    print(f"请求失败:{url}, 状态码: {resp.status_code}, 页码: {page}")
                    break
    
                ret = resp.json()
                # print(f"页面 {page} 响应:{ret}")
                news_list = ret['object']
    
                for item in news_list:
                    # print(item)
    
                    news = {}
                    news['title'] = item.get('title', '')
    
                    # https://www.shobserver.com/staticsg/res/html/web/newsDetail.html?id=907694&sid=11
                    # 养老智能体能当数字养老师?沪上“校企医”共建智慧康养与教育学院补人才缺口
                    news['url'] = f"https://www.shobserver.com/staticsg/res/html/web/newsDetail.html?id={item.get('id', '')}"
                    news['time'] = item.get('addtime', 0)
                    news['source'] = item.get('author', '上观新闻')
    
                    news['time'] = datetime.fromtimestamp(news['time'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
    
                    # 直接写入,不去重
                    writer.writerow(news)
                    print(f"保存新闻:{news}")
    
    
    if __name__ == "__main__":
        # 20 * 12 = 240 条新闻
        get_shobserver_data(file_name='shang_guan_400.csv', max_pages=20)
    
    # 899 * 506
    # https://images.shobserver.com/news/900_507/2025/05/09/l_cb20250509100210436048.jpg
    
    
    代码 2, 腾讯新闻

    举个例子:

    上海一男程序员被刑拘!为了打赏主播,差点把公司搬空了
    https://news.qq.com/rain/a/20250508A03GCG00

    这个url 中的 rain, 我估计指的是瀑布流。。。
    起名字真有趣。

    import os
    import csv
    import time
    import requests
    from datetime import datetime
    
    """
    每个 api 返回 12个新闻!
    
    请求参数是:
    {"base_req":{"from":"pc"},"forward":"1","qimei36":"0_NhDQ1xCnBNZ70","device_id":"0_NhDQ1xCnBNZ70","flush_num":7,"channel_id":"news_news_sh","item_count":12,"is_local_chlid":"1"}
    
    """
    
    # 请求头
    headers = {
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
        'Content-Type': 'application/json',
        'Referer': 'https://news.qq.com/',
        'Origin': 'https://news.qq.com'
    }
    
    def get_tencent_data(file_name='qq_news_400.csv', max_pages=100, channel_id='news_news_sh'):
        """
        爬取腾讯新闻数据,保存到 CSV 文件
        参数:
            file_name: 输出 CSV 文件名
            max_pages: 最大爬取页数
            channel_id: 新闻频道 ID
        """
        # 检查文件是否存在
        has_file = os.path.exists(file_name)
    
        # 打开 CSV 文件,追加模式
        with open(file_name, 'a', newline='', encoding='utf-8') as file:
            columns = ['title', 'url', 'time', 'source']
            writer = csv.DictWriter(file, fieldnames=columns)
            if not has_file:
                writer.writeheader()
    
            # 爬取数据
            for page in range(1, max_pages + 1):
                time.sleep(0.5)  # 请求间隔
                payload = {
                    "base_req": {
                        "from": "pc"
                    },
                    "forward": "1",
                    "qimei36": "0_NhDQ1xCnBNZ70",
                    "device_id": "0_NhDQ1xCnBNZ70",
                    "flush_num": page,  # 使用 page 模拟分页
                    "channel_id": channel_id,
                    "item_count": 12,
                    "is_local_chlid": "1"
                }
    
                url = 'https://i.news.qq.com/web_feed/getPCList'
                resp = requests.post(url, headers=headers, json=payload, timeout=10)
                if resp.status_code != 200:
                    print(f"请求失败:{url}, 状态码: {resp.status_code}, 页码: {page}")
                    break
    
                ret = resp.json()
                news_list = ret['data']
    
                for item in news_list:
                    # print(f"新闻:{item}")
                    news = {}
                    news['title'] = item.get('title', '')
    
                    # 20250509A07HJ500
                    news['url'] = f"https://news.qq.com/rain/a/" + item.get('id', '')
                    news['time'] = item.get('update_time', '')
                    news['source'] = item.get('media_info', '腾讯新闻').get('chl_name', '')
    
    
                    # 直接写入,不去重
                    writer.writerow(news)
                    print(f"保存新闻:{news}")
    
    
    if __name__ == "__main__":
        get_tencent_data(file_name='tencent_400.csv', max_pages=2, channel_id='news_news_sh')
    
    

    4. 结论 + todo

  • 下一步, 过滤出某一个类别的新闻。

  • 希望对大家有帮助。

    作者:waterHBO

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python实现上海新闻爬虫:上观新闻与腾讯新闻的抓取之道

    发表回复