Python-Flink-Kafka 流式计算实例解析

简介

流式计算

流式计算,作为一种高频、增量、实时的数据处理模式,主要对计算方法和数据流进行处理。其功能主要体现在——对占用内存小、单次处理快、系统延迟低等要求较高的场景,以及需要在任务中持续计算的场景。

流式计算的世界中,数据是持续流动的,因此我们需要及时地对数据进行处理计算。流式计算需要先定义好计算逻辑,然后提交到流式计算系统中,等流数据到达后就会触发计算逻辑进行计算,并且这个计算作业的逻辑在整个运行期间是不可更改的

批量计算

就是对数据进行批量的处理。通常,我们都会先统一收集数据,并且把数据以数据表的形式存储到数据库中,再按照不同的计算逻辑,对全部的数据进行统一的批量处理,待全部数据处理完成后,才会输出最终的结果。

常见的流式计算框架有 strom、 spark、 flink

  1. Strom:Strom是一种流处理框架,一次处理一个数据流。它对数据流进行分段切分,并使用流处理操作来处理这些数据。
  2. Spark:Spark是一种小批处理的分布式计算框架,它对连续数据流进行抽象,称为DStream(Discretized Stream)。DStream是小批处理的RDD(弹性分布式数据集),可以通过任意函数和滑动数据窗口进行转换,实现并行操作。
  3. Flink:Flink是一种针对流数据和批数据的计算框架。它创造性地统一了流处理和批处理,将批数据看作流数据的一种特例。Flink的输入数据流是无界的,当把批处理看作是一种特殊的流处理时,它的输入数据流被定义为有界的。
Storm SparkStreaming Flink
模型 Native Mirco-Batching Native
API 组合式 声明式 声明式
语义 At-least-once Exactly-once Exectly-once
容错机制 Ack Checkpoint Checkpoint
状态管理 基于DStream 基于操作
延时 Low Medium Low
吞吐量 Low High High

使用广泛的Spark还是以微批的方式进行流计算。而Flink是流的方式。Apache Flink是近几年发展很快的一个用于分布式流、批处理数据处理的开源平台。它是最贴合DataFlow模型实现的分布式计算框架。基于流计算进行高性能计算,具有良好的容错、状态管理机制和高可用能力

扩展阅读

Storm,Spark和Flink简介 联系与区别_flink spark storm 对比_yann.bai的博客-CSDN博客

Spark、Storm、Flink横向对比_flink spark storm 对比_程序猿小柒的博客-CSDN博客

Flink01:快速了解Flink:什么是Flink、Flink架构图、Flink三大核心组件、Flink的流处理与批处理、Storm vs SparkStreaming vs Flink_做一个有趣

技术选型

Flink 作为流式计算引擎,并可进行计算任务托管等操作

Apache Flink 是一个框架和分布式处理引擎,用于在_无边界和有边界_数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

Flink架构,是一个基于Master-Slave风格的架构,当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager, JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。 TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。

Kafka 作为数据流通道

ApacheKafka是一个开源分布式事件流平台,被数千家公司使用,用于高性能数据管道、流分析、数据集成和关键任务应用程序。

Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。

Producer

负责发布消息到Kafka broker

Consumer

消息消费者,向Kafka broker读取消息的客户端,consumer从broker拉取(pull)数据并进行处理。

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition

Parition是物理上的概念,每个Topic包含一个或多个Partition

Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

Topic & Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

Python

  1. 将 Flink 能力输出到 Python 用户,进而可以让 Python 用户使用所有的 Flink 能力。
  2. 将 Python 生态现有的分析计算功能运行到 Flink 上,进而增强 Python 生态对大数据问题的解决能力。

业务场景

系统实时记录用户访问日志,通过日志采集工具将原始日志推送至kafka topic source中,通过流式计算平台对topic source原始日志进行处理,获取各用户动作类别,用户点击次数。将统计值推送至topic result中,

后续业务程序读取result中处理后的统计数据,进行业务展示,数据落库等操作。

作业流程

暂时无法在飞书文档外展示此内容

构建

环境

系统环境:macOS 、docker

软件及版本:

Apache Flink latest (1.17.1)

Apache Zookeeper latest

Apache Kafka latest (3.5.1)

安装 flink 集群

官网链接:https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/

现已支持docker 安装。本次搭建使用docker 构建flink

不使用python环境

Flink 官方的docker镜像内集成了java 环境,但并未集成python开发环境,若不使用python作业,可用官方镜像构建

拉取镜像
# 下载flink 镜像 1.17.1-java8
docker pull flink::1.17.1-java8

# 查看镜像
docker iamges
使用docker-compose 构建flink 容器

创建目录 flink ,进入flink 目录,创建目录 job_data 映射目录 ,创建docker-compose.yml文件 并写入

version: "2.1"
services:
  jobmanager:
    image: flink::1.17.1-java8
    expose:
      - "6123"
    ports:
      - "8081:8081"
    volumes:
      - "/Users/mfw/Code/test/Flink/job_data:/home"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink::1.17.1-java8
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

进入此文件目录下,执行docker-compose up -d 创建容器。

进入docker desktop 内 可查看启动的容器。或执行命令 docker ps -a

访问 http://127.0.0.1:8081 可进入flink web管理页面

使用python环境处理

使用docker flie构建集成python环境的镜像

创建目录 pyflink,进入pyflink 目录内创建文件 Dockerfile,写入如下代码

FROM flink:1.17.1-java8
RUN apt-get update && apt-get -y install python3.10 && update-alternatives --install /usr/bin/python python /usr/bin/python3.10 1 && apt-get -y install python3-pip && pip3 install apache-flink==1.17.1 -i https://mirrors.aliyun.com/pypi/simple/

参考文档 https://blog.csdn.net/chencaw/article/details/117551298

执行 docker build -t pyflink . 创建pyflink 镜像

使用docker-compose 构建容器
version: "2.1"
services:
  jobmanager:
    image: pyflink:latest
    expose:
      - "6123"
    ports:
      - "8081:8081"
    volumes:
      - "/Users/mfw/Code/test/Flink17/job_data:/home"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: pyflink:latest
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

安装kafka

拉取 zookeeper kafka docker 镜像
docker pull bitnami/zookeeper 

docker pull bitnami/kafka
使用docker-compose 构建kafka 容器

创建目录 Kafka ,进入kafka 目录,创建kafak_data、zookeeper_data目录,创建docker-compose.yml文件

Kafka

| __ kafka_data

|__ zookeeper_data

|__ docker-compose.yml

在docker-compose.yml中写入

version: "2"
services:
  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - "2181:2181"
    volumes:
      - "/Users/mfw/Code/test/Kafka/zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
      - "9094:9094"
    volumes:
      - "/Users/mfw/Code/test/Kafka/kafka_data:/bitnami"
    links:
      - "zookeeper:zookeeper"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.18.98.96:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
    depends_on:
      - zookeeper

进入此文件目录下,执行docker-compose up -d 创建容器。

测试是否安装成功

进入kafka容器内 docker exec -it kafka-kafka-1 bash

# 创建topic
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test
# 启动消费者 & 监听
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
使用kafka-map 管理

拉取kafka-map

docker pull dushixiang/kafka-map

运行kafka-map

docker run -d --name kafka-map -p 8049:8080 -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin  dushixiang/kafka-map:latest

访问 http://127.0.0.1:8049 可进入kafka-map管理页面

编写python作业

部分jar包

访问 https://mvnrepository.com/ 网站获取部分所需 jar包

如:flink-sql-connector-kafka-1.17.1.jar

作业demo

模拟数据源: kafka_productor.py

"""
往 Kafka 里批量产生随机的用户操作数据
"""
import random
import numpy as np
from json import dumps
from time import sleep
from faker import Faker
from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import kafka_errors

# ######################### 设置 #########################
seed = 2020  # 设置随机数种子,保证每次运行的结果都一样
num_users = 50  # 50 个用户
max_msg_per_second = 20  # 每秒钟的最大消息数
run_seconds = 3600  # 脚本最长运行时间,防止无限写入 kafka
topic = "test1"  # kafka topic
bootstrap_servers = ['172.18.98.96:9092'] #172.18.98.96

fake = Faker(locale='zh_CN')
Faker.seed(seed)
random.seed(seed)

class UserGroup:
    def __init__(self):
        """
        为指定数量的用户分配不同的出现概率,每次按概率分布获取用户姓名
        """
        self.users = [self.gen_male() if random.random() < 0.6 else self.gen_female() for _ in range(num_users)]
        prob = np.cumsum(np.random.uniform(1, 100, num_users))  # 用户点击概率的累加
        self.prob = prob / prob.max()  # 压缩到 0 ~ 1

    @staticmethod
    def gen_male():
        """
        生成男人
        """
        return {'name': fake.name_male(), 'sex': '男'}

    @staticmethod
    def gen_female():
        """
        生成女人
        """
        return {'name': fake.name_female(), 'sex': '女'}

    def get_user(self):
        """
        随机获得用户
        """
        r = random.random()  # 生成一个 0~1的随机数
        index = np.searchsorted(self.prob, r)
        return self.users[index]

def write_data():
    group = UserGroup()
    start_time = datetime.now()

    # 初始化生产者
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        value_serializer=lambda x: dumps(x).encode('utf-8')
    )

    while True:
        now = datetime.now()

        # 生产数据,并发送到 kafka
        user = group.get_user()
        cur_data = {
            "ts": now.strftime("%Y-%m-%d %H:%M:%S"),
            "name": user['name'],
            "sex": user['sex'],
            "age": random.randint(3, 90),
            "action": 'click' if random.random() < 0.9 else 'scroll',  # 用户的操作
            "is_delete": 0 if random.random() < 0.9 else 1  # 10% 的概率丢弃这条数据
        }
        result=producer.send(topic, value=cur_data)
        try:
            record_metadata = result.get(timeout=10)
         # print(record_metadata.topic)
         # print(record_metadata.partition)
         # print(record_metadata.offset)
        except kafka_errors as e:
            print(str(e))
        print(cur_data)
        #print(result)

        # 终止条件
        if (now - start_time).seconds > run_seconds:
            break

        # 停止时间
        sleep(1 / max_msg_per_second)

write_data()

Flink sql算子demo demo_sql.py

import os
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import TableEnvironment,StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.common import Configuration

#请根据创建的Kafka集群,输入以下信息。
def log_processing():
    kafka_servers = '172.18.98.96:9092'
    kafka_zookeeper_servers = '172.18.98.96:2181'
    source_topic = "test1"
    sink_topic = "test2"
    kafka_consumer_group_id = "test"

    env_settings = EnvironmentSettings.in_streaming_mode()
    t_env = TableEnvironment.create(env_settings)

    dir_json_connect = 'file://' + os.path.join(os.path.abspath(os.path.dirname(__file__)),
                                     'jar/flink-json-1.17.1.jar')
    dir_kafka_sql_connect = 'file://' + os.path.join(os.path.abspath(os.path.dirname(__file__)),
                                     'jar/flink-sql-connector-kafka-1.17.1.jar')

    jars=dir_json_connect+";"+dir_kafka_sql_connect
    t_env.get_config().set("pipeline.jars", jars)
    source_ddl = f"""
            CREATE TABLE source_table(
                name VARCHAR,
                sex VARCHAR,
                action VARCHAR,
                ts VARCHAR,
                rt as TO_TIMESTAMP(ts),
                age INT,
                is_delete INT,
                WATERMARK FOR rt as rt - INTERVAL '2' SECOND
            ) WITH (
                'connector' = 'kafka',
                'topic' = 'test1',
                'properties.bootstrap.servers' = '172.18.98.96:9092',
                'scan.startup.mode' = 'earliest-offset',
                'format' = 'json'
            )
            """

    sink_ddl = f"""
            CREATE TABLE result_sink (
            name VARCHAR,
            age BIGINT NOT NULL
            ) with (
                'connector' = 'kafka',
                'topic' = 'test2',
                'properties.bootstrap.servers' = '172.18.98.96:9092',
                'properties.zookeeper.connect' = '172.18.98.96:2181',
                'format' = 'json'
            )
    """

    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)

    query = """
    select name, age
    from source_table
    WHERE
        action = 'click'
        AND is_delete = 0
    """
    t_env.sql_query(query) \
        .execute_insert("result_sink").wait()
    # t_env.execute_sql(query).wait()

    # t_env.execute("payment_demo")

if __name__ == '__main__':
    log_processing()

本地运行 python demo_sql.py

查看结果写入数据

打开Kafka-map http://localhost:8049/ 可查看到计算后的数据。

Python 作业提交至集群

flink run -m 172.19.98.96:8081 -py stream.py

扩展

参考

Apache Flink 官网

Apache Zoookeeper 官网

FLINK-1-用DOCKER-COMPOSE安装FLINK

阿里云实时计算Flink版

如何从 0 到 1 开发 PyFlink API 作业

python ubuntu20.04安装python3.9并设置为默认的python3_ubuntu设置python3为默认_chencaw的博客-CSDN博客

GitHub – huanglaoxie0503/pyflink-basic-introduction: PyFlink从入门到精通

连接器

问题记录

https://blog.csdn.net/hutonm/article/details/78624409

作者:风落叶微

物联沃分享整理
物联沃-IOTWORD物联网 » Python-Flink-Kafka 流式计算实例解析

发表回复