基于flink与groovy实现全实时动态规则智能营销与风控系统
前言:本文是对视频课程《基于flink与groovy实现亿用户级实时动态规则智能运营系统》的介绍说明;
本项目极具行业实用价值,可为各企业开发人员提供系统设计思路与灵感,而且,它不光可用于智能运营,也可以应用在实时规则推荐,实时广告推送,实时规则封控,实时交通监控等纪委广泛的场景中;
当然,各类培训机构学员也可以通过学习此项目来丰富自己的就业简历,绝对杀手锏级别!在面试中运用得当,可以起到一锤定音立竿见影的绝杀效果!
如果需要学习本项目,可联系博主
1 项目背景
传统的“精准营销平台”由营销人员基于画像标签数据去圈选人群,进行营销推送,存在不足;
这种传统手段,往往无法抓住那些 “转瞬即逝的营销机会”;
这些场景,显然无法通过营销人员通过人工手段去发现,需要用软件系统自动、实时监控用户的行为,并实时做出判断,并进而驱动营销推送系统推送相关营销内容;
总之,在应对这一类的需求时,以前的传统的“精准营销平台”显得力不从心;
因而,擅长用键鼠改变世界的攻城狮们,决定为公司的推广、运营部门开发一个应对上述场景的自动化、智能的、实时、规则动态可变的营销利器 —— 《实时智能运营系统》
2 需求分析
2.1 营销规则举例
2.2 规则营销需求的宏观抽象
2.3 营销规则受众条件的逻辑要素抽象分析
各类营销规则,都支持对目标人群的圈选
圈选的手段之一是:是对用户的画像标签数据进行定义、判断
圈选的手段之二是:对用户的行为数据进行自定义即时计算(广义上就是实时用户画像)
而行为数据即时计算的时间跨度,通常是有时间窗口约束的,而所涉及的时间跨度可能有:
只包含规则上线之前
只包含规则上线之后
横跨规则上线前到上线后
受众圈选行为画像即时条件中,抽象出来的判断要素主要有
最后,就是上述各类条件的逻辑组合: 与 、或 、 非、 大于、 小于、 等于……
最终规则: 条件1 与 (条件2 或 条件3)
思考:
A事件中pageid=“p05” 的次数 > A事件中pageid="p10"的次数 ,是否包含在上述要素中?
A事件中,MAX(属性amt) > 2* MIN(属性amt) , 是否包含在上述要素模型中?
各次A事件 中 amt 属性的 最大的前3个值 ,的平均值 >10 ?
3 系统概要设计
3.1 各类受众条件运算的实现逻辑初步设计(实时画像计算条件)
3.1.1 画像条件的逻辑分析
这个好说,就是一个查询画像标签库进行判断的动作
3.1.2 行为判断的逻辑分析
1)行为判断:行为事件的类型及属性的判断
判断用户的某个行为事件,是否是规则条件中要求的事件
event_id = "X"
AND
properties["p1"] =/>/!=/< "v1"
event_id = "X"
AND
properties["p1"] 包含 "kw1"
event_id = "X"
AND
properties["p1"] + properties["p2"] > 100
AND
properties["p3"] 满足 正则表达式 [1,10]*?a
2)行为序列的判断
判断用户的一系列行为事件中,是否满足规则条件所要求的依次发生的序列,比如A B E
行为序列判断,是上述 “行为判断” 的衍生
本质上,就是在 判断事件 的基础上,加上时间先后的序列关系
这个逻辑,可以用 正则匹配进行计算(全窗口运算); 也可以用滚动聚合的方式计算;
3)事件(序列)发生次数的判断
判断要求的事件或事件序列,发生了多少次
是上述 “行为判断”、“行为序列判断” 的衍生
本质上,就是在 判断事件(或序列)的基础上,进行次数累计
4)事件属性值统计条件运算
也好说,主要是对满足条件的事件,为目标属性值进行累加等运算
比如: 要求 用户所发生的的一系列A事件中,属性amt最大值 – 最小值 >100
暂时无法在飞书文档外展示此内容
比如: 要求 用户所发生的的一系列A事件中,属性amt最大的前3个,的平均值 > 200
5)事件与事件对比关系的判断
事件对比关系的判断,也是上述“行为判断”的衍生
如事件间隔时长的判断,本质上,也是在 判断事件的基础上,进行时间差的运算
比如,要求, A事件 后面发生 B事件 ,且 时间间隔 要 > 15分钟
结论:不管用户的规则如何变化,如何灵活多样,只要设计好合理的运算逻辑+合理的中间聚合结果数据结构,我们都可以用持续、滚动运算的方式来实现!
3.2 受众条件运算实现逻辑深入设计
3.2.1 静态画像条件的逻辑实现
1)画像条件判断的实现难点
初略考虑,应该就是查询用户画像标签库即可;
站在系统的角度来考虑时: 什么时机去查询? 去哪里查?
比如: 查询时机: 当一个人的某个行为触发了某个规则,此时去查询该用户的画像标签是否满足规则的人群画像条件 查询的目标:
用户画像标签数据库
如果画像数据库在hbase中,可以用如下伪代码来快速判断
## 如果条件是: 活跃等级 = C AND 首单时间< "2022-06-01"
get = new Get(guid)
get.addColumn("活跃等级")
get.addColumn("首单时间")
result = table.get(Get)
activeLevel = result.getValue("活跃等级")
firstOrderDate = result.getValue("首单时间")
if(activeLevel = 'C' && firstOrderDate< "2022-06-01")
return true;
## 如果条件是: 兴趣词 :包含 "速溶咖啡"
## 这种条件,用hbase的支撑就不太好处理了
// favroitWords : [海底捞, 咖啡伴侣, 小罐速溶咖啡,乔丹 ,篮球 ]
favroitWords = result.getValue("兴趣词")
但,要考虑到系统性能的话,就没那么简单:
2)方案概要设计
根据上文的分析,最好避免使用上述的每次规则触发都去查询外部存储系统的方案
画像条件判断的环境特点
方案要点设计
针对上面的特点分析,我们可以采用如下方案来解决规则画像条件判断的需求
- 在发布规则的时候,根据规则的画像条件,先从画像库中条件查询,圈选满足画像条件的人群
- 将圈选出的人群的guid,转换成bitmap
- 将(规则id,对应的画像人群bitmap),注入到我们的实时流式规则计算引擎(flink程序)
- 实时流式规则计算引擎,则可以在内部持有每个规则的人群bitmap,进而在需要判断的时候,直接在内部持有的规则人群bitmap中判断即可
- 另外,当外部画像库进行了离线批量更新时,将人群圈选及接入引擎的流程,也进行一次,以更新规则引擎中持有的人群bitmap
3.2.2 即时画像条件分析判断的难点
1)难点概述
即时画像条件的计算在什么时候进行,在哪里进行
只涉及规则上线之前
只涉及规则上线之后
横跨规则上线的前后
综上,我们的规则引擎在流式计算的语境下,该去哪里查询用户的明细数据,就需要深入研究设计;
用户的行为明细数据存放位置有如下可选方案
2)明细数据存储的两难选择
选择1 – 全部存储在doris中
在规则引擎中高并发高频度地去请求doris进行查询
优势
doris有着强大的查询功能,这样一来,不论规则中的判断条件有多复杂,我们在进行计算实现时,对不同的行为判断,无非是换一个sql而已,而这个sql还可以由规则发布平台在发布规则的时候直接生成并跟随规则内容一起注入到规则引擎系统;
弊端
如计算A的次数是否满足,当A到达flink时,如果这个A尚未存在于doris,则计算将有遗漏
暂时无法在飞书文档外展示此内容
这要求doris有强大的实时数据高频写入能力(而从doris的底层架构来看,它并不擅长);
选择2 – 全部存储内部state中
在规则引擎中,直接基于内部state的明细数据进行行为条件计算判断
优势
弊端
复杂判断条件的编码问题
从上文可以看出,方案中存在着对上线后数据进行编程计算的需求
而对数据的计算逻辑,有简单的,也有复杂的,而且各种条件的计算逻辑各不相同,这对我们编程带来的最大难度是,无法用同一套代码来应付灵活多变的规则条件计算逻辑;
虽然,我们可以根据当前的需求进行抽象总结,将各类需求的计算逻辑进行分门别类,从而设计出N套枚举式的硬编码,但这样一来,灵活性肯定不够,如果要新上线一个超出此前枚举逻辑的新规则,那就不得不关闭系统,重新编码,打包上线;
这显然不符合一开始的构想: 可以在线注入新规则(即动态规则)
==========================================================================
3.2.3 方案概要设计1(即时画像条件计算的数据存储问题)
既然把长时间跨度的明细数据存放在flink state中是完全不可行的,那么,我们首先排除这种方案;
既然无法用一种简单粗暴的方案来解决全场景需求,那么,我们可以针对具体场景做具体设计;
1)只涉及上线前的行为分析判断(历史)
既然只涉及历史数据,一个用户是否满足条件不会随时间而改变,只需要查询判断一次即可;
这样,我们完全可以仿照前面的 “画像条件判断实现方案” ,在发布规则的时候,从doris中用sql进行统计,并将统计结果形成固化的结果数据,接入规则引擎或从外部提供给规则引擎取用;
2)只涉及上线后的行为分析判断(未来)
可以基于flink state来计算;
请注意: 上面说的是 基于flink state来计算 , 并不是说用flink state来存储所有明细
一个规则上线后的生命周期可能也是很长的,如果从上线时起,state中就不断保存用户的行为明细,这体量也将不可接受;
如果不存储所有明细,那将如何进行行为分析判断呢?
其实,我们在计算一个行为规则条件时,并不一定需要完整的明细,因为:
综上,从理论上来说,对于只涉及上线后的行为分析计算,即使不利用state保存所有明细数据,依然可以实现,而是利用滚动运算方式,只要存储滚动运算的聚合值即可
3) 横跨上线前后时间段的行为分析判断(历史+未来)
这是最麻烦的一种,最省事的办法是从需求层面去规避,不提供此类需求的支撑即可
由于时间窗口跨上线前后,就导致既无法仅查询历史数据,也不能仅滚动聚合上线后的数据
唯一的办法是:
- 将规则条件时间窗口,分解成上线前和上线后两段;
- 上线前的历史数据的查询结果,传递给规则引擎,然后规则引擎将这个结果和后续的数据分析进行接续整合(把历史查询结果,作为未来查询运算时的初始值)
这里的数据结构设计,就比较复杂,需要具体设计
比如,条件是:要求发生过事件A次数>=5
有可能在历史数据中查到了5+次,则已经符合要求,不需要再判断上线后了
有可能在历史数据中查到了3次,就要求在上线后继续判断是否再发生2+次
对于历史查询已满足条件的人,可以直接整合到满足画像的人群固化数据中去;
那么,历史查询结果需要传递给规则引擎的数据该怎么组织呢?
用户:规则:条件:值
用户_1:规则_x:条件_1:3
比如,条件是:要求发生过事件A的sum(某属性)>=5
有可能在历史数据中查到已经符合要求,不需要再判断上线后了
有可能在历史数据中查到的结果为4,就要求在上线后继续统计
对于历史查询已满足条件的人,直接整合到满足画像的人群固化数据中去;
那么,历史查询结果需要传递给规则引擎的数据该怎么组织呢?
用户:规则:条件:值
比如: 要求发生过:事件A和时间B间隔>5(+ 这个模式的发生次数>2),如何分解呢?
有可能在历史数据中查询中,已经发生过,则已经符合要求,上线后不需要判断了
有可能在历史数据中,发生过了A,也甚至发生过B,只是间隔不满足,那么,可以记下没有后续B事件的最晚A,等上线后再看是否有B
对于历史查询已满足条件的人,直接整合到满足画像的人群固化数据中去;
那么,历史查询结果需要传递给规则引擎的数据该怎么组织呢?
用户:规则:条件
满足次数:0
待满足序列[A:时间, ]
比如: 要求依次发生过 A – C – E 行为序列(+发生次数>2),又该如何分解呢?
有可能在历史数据中查询到了 A E C ,那可认为已满足了 A – C (步骤2),待上线后等待 E 即可
有可能在历史数据中1个步骤都没有,则全待上线后进行分析判断
对于历史查询已满足条件的人,直接整合到满足画像的人群固化数据中去;
那么,历史查询结果需要传递给规则引擎的数据该怎么组织呢?
用户:规则:条件
满足次数:0
待满足序列[A , C , ]
-- 也可以这样: 传递用户所做过的行为序列字符串
满足次数:2
待满足序列 "AEC"
暂时无法在飞书文档外展示此内容
比如:要求事件A的 sum(某属性值)> 事件B的sum(某属性值)
对于历史查询已满足条件的人,直接整合到满足画像的人群固化数据中去;
那么,历史查询结果需要传递给规则引擎的数据该怎么组织呢?
用户:规则:条件
A:sum(某属性值)
B:sum(某属性值)
4)条件运算过程中的滚动聚合中间结果存储最终方案
综上所述,各种条件的滚动增量运算逻辑不同,所需要记录的中间聚合结果数据结构也不同,还需要将历史时段的条件查询结果和未来的在线运算状态初始值进行整合,最终选择的“滚动运算中间结果存储方案”为:
将中间结果存储在redis中
3.2.4 方案概要设计2(即时画像条件计算逻辑的硬编码问题)
综上所述,规则条件的查询计算,需要有sql和代码这两种具体实现
1)两种计算需求
- 在引擎外,在规则上线前,用sql在doris的历史明细数据中进行统计(这个好实现)
- 在引擎内,在规则上线后,对流入的行为事件进行统计、判断(这个需要api编程),甚至还要接续历史查询结果进行 “现时态” 数据的整合计算
对于第2种需求,显然不是简单的逻辑代码
而且,这个编程还不能简单使用硬编码(或者多套枚举式硬编码)
那该怎么办呢?
2)规则条件计算核心设计
- 合理设计好历史数据查询结果的数据结构及存储方式、及传递机制
- 引入动态脚本语言groovy,来实现具体的计算逻辑
上文中分析的结论:
而这些 灵活多变的规则条件的 “ 结果数据结构和运算逻辑 ” ,还要尽可能与我们flink规则引擎的通用逻辑进行剥离;以便于在不对规则引擎系统作任何改变的情况下,依然可以支撑 各类规则条件的 “ 结果数据结构和计算逻辑 ” ;
也就是说,规则条件的具体计算逻辑,对于flink规则引擎的通用处理流程来说,需要是透明的!
首先,借鉴面向对象的思想
1)高内聚
把 规则条件计算过程中的状态数据(历史查询结果数据、 滚动聚合的中间结果) 和 计算逻辑(包含滚动聚合逻辑和历史数据接续整合逻辑) ,进行高度绑定内聚!形成一个高内聚的“状态机”
每一类规则条件的计算,都有一个自己的 规则运算状态机 ;
每一种“状态机”,都封装了自己的计算逻辑,也负责维护、理解自己的数据结构;
2)低耦合
当新增规则时,可以由外部系统为规则去选用一个 “运算机”,然后注入到我们的规则引擎,并由规则引擎去驱动这些运算机进行运算即可,这样,我们的规则引擎就跟规则条件运算状态机实现了解耦
其次,将 状态机的 运算逻辑 动态化
1)状态机 的运算逻辑,用动态脚本语言来实现,如groovy
2)规则条件状态机的逻辑代码,将在新规则发布上线时注入到规则引擎中
最后,将状态机的 状态数据 存储外部化
状态机的状态数据,本可以直接存储在规则引擎内部(flink的state中);
但考虑到如下因素:
我们考虑将状态机的状态数据存储在外部的redis集群中
4 项目完整架构
4.1 技术架构图
通过上面的设计,我们形成了如下的最终架构设计图
4.2 架构概要说明
- 用户画像标签库,由离线数仓组提供,并将标签数据存储在elastic search中,方便画像人群圈选
- 用户行为事件明细数据(最近3个月),实时落地存入doris数据库中,方便快速统计历史时间窗口中的规则条件
- 开发规则管理web系统,功能主要有:
- 为营销人员提供规则定义、管理等操作的可视化平台,并能根据用户定义的规则条件,选择运算逻辑groovy脚本模板
- 可以根据规则条件的定义,查询doris历史明细,形成规则条件计算的初始化状态数据,并发布到状态数据redis集群
- 可以根据规则条件发布或人为触发,进行人群圈选,并形成bitmap,并存入mysql元数据库
- 可以将规则定义的相关参数和计算代码groovy脚本,存入mysql元数据库
- 开发人员,可以往规则管理平台中维护、添加groovy脚本模板,并开发新的规则定义页面,为营销人员提供的新的规则模板;
- 开发flink流式处理系统(规则运算引擎),消费kafka中的用户行为明细数据,并通过cdc-connector接受规则管理系统发布的规则信息,对不断流入的用户行为数据,驱动各规则的运算状态机,进行各类规则的条件运算、判断,并输出最终触达结果