使用 Amazon 管理服务为 Apache Flink 构建动态规则引擎 大数据博客
使用 Amazon 管理服务构建动态规则引擎
关键要点
本文介绍了如何使用 Amazon 管理服务和 Apache Flink 实现动态规则引擎。动态规则可以实时创建和更新,无需更改底层代码。通过 Kinesis 数据流输入和输出规则与传感器数据,能够实时监控和执行操作。本文提供了架构图、代码示例和详细解析,适合希望快速实施动态规则引擎的开发者。想象一下你有一些流数据。这些数据可能来自物联网 (IoT) 传感器、日志数据摄取,或是甚至是 购物者印象数据。无论数据来源如何,你需要采取行动在某些事件发生时发送警报或触发操作。正如 Martin Fowler 所说:“你可以自己构建一个简单的规则引擎。你只需要创建一些具有条件和动作的对象,将它们存储在集合中,然后遍历这些对象以评估条件并执行动作。”
一个 业务规则引擎或简称规则引擎是一个执行多条规则的软件系统,根据某些输入确定输出。简单来说,它就是一系列“如果则”,“和”,“或”语句,基于某些数据进行评估。市面上有许多不同的业务规则引擎系统,例如 Drools、OpenL Tablets 或 RuleBook,它们有一个共同点:定义规则具有条件的对象集合,然后执行这些规则评估条件以产生输出执行动作。以下是一个简单的示例:
if (officetemperature) lt 50 degrees =gt send an alert
if (officetemperature) lt 50 degrees AND (occupancysensor) == TRUE =gt lt Trigger action to turn on heatgt
当单个条件或一组合条件评估为真时,旨在发送警报,以便可能对该事件采取行动例如触发加热器以温暖 50 华氏度的房间。
本文演示如何使用 Amazon Managed Service for Apache Flink 实现动态规则引擎。我们的实现提供了创建和更新动态规则的能力,无需更改或重新部署规则引擎的底层代码或实现。我们将讨论架构、实现的关键服务、一些实现细节,帮助你构建自己的规则引擎,以及一个 AWS Cloud Development KitAWS CDK项目以便在你的帐户中进行部署。
解决方案概述
我们解决方案的工作流程始于数据的摄取。我们假设有一些源数据。它可能来自各种地方,但为了演示,我们使用流数据IoT 传感器数据作为输入数据。假设我们关注的是来自 AnyCompany 家用恒温器的数据。我们将看到一些属性,比如温度、占用率、湿度等等。该恒温器每分钟发布一次相应的值,因此我们将围绕这个想法构建规则。由于我们正以接近实时的方式摄取数据,我们需要一个专门为此用例设计的服务。为此解决方案,我们使用 Amazon Kinesis Data Streams。
在传统的规则引擎中,规则的列表可能是有限的。新规则的创建可能涉及代码库的修订和重新部署、替换某些规则文件或某种覆盖过程。然而,动态规则引擎则有所不同。正如我们的流输入数据一样,我们的规则也可以流式传输。在这里,我们可以使用 Kinesis 数据流实时流式传输创建的规则。
在这个时候,我们有两条数据流:
来自恒温器的原始数据通过用户界面创建的业务规则以下图示展示我们如何将这些流连接在一起。
连接数据流
对于 Amazon Managed Service for Apache Flink,典型的用例是交互式查询和实时分析数据,并持续产生对时间敏感的用例的洞见。有鉴于此,如果有一条规则对应于温度低于某个值尤其在冬季,则及时评估和生成结果可能至关重要。
Apache Flink 连接器是将数据传入和传出 Managed Service for Apache Flink 应用的组件。连接器是灵活的集成方式,让你可以读取文件和目录。它们包含与 AWS 服务及第三方系统交互的完整模块。有关连接器的更多详细信息,请查看 如何使用 Apache Flink 连接器与 Managed Service for Apache Flink。
我们为此解决方案使用两种类型的连接器操作符:
源头 从 Kinesis 数据流、文件或其他数据源提供输入到你的应用接收器 将输出从应用发送到 Kinesis 数据流、Amazon Data Firehose 流或其他数据目标Flink 应用是流数据流,可以由用户定义的操作符进行转换。这些数据流形成有向图,从一个或多个源开始,结束于一个或多个接收器。下面的图示展示了一个示例数据流来源。如前所述,我们有两个 Kinesis 数据流可以用作我们的 Flink 程序的源。
以下代码片段展示我们如何在 Flink 代码中设置 Kinesis 源:
java/ 从 Kinesis 流中消费规则数据,创建 Rule 对象的 DataStream。 @param env Flink 作业的 StreamExecutionEnvironment @return Rule 对象的 DataStream @throws IOException 如果读取 Kinesis 属性时发生错误 /private DataStream createRuleStream(StreamExecutionEnvironment env Properties sourceProperties) throws IOException { String RULESSOURCE = KinesisUtilsgetKinesisRuntimeProperty(kinesis rulesTopicName) FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumerltgt(RULESSOURCE new SimpleStringSchema() sourceProperties) DataStream rulesStrings = envaddSource(kinesisConsumer)name(RulesStream)uid(rulesstream) return rulesStringsflatMap(new RuleDeserializer())name(Rule Deserialization)}
/ 从 Kinesis 流中消费传感器事件数据,创建 SensorEvent 对象的 DataStream。 @param env Flink 作业的 StreamExecutionEnvironment @return SensorEvent 对象的 DataStream @throws IOException 如果读取 Kinesis 属性时发生错误 /private DataStream createSensorEventStream(StreamExecutionEnvironment env Properties sourceProperties) throws IOException { String DATASOURCE = KinesisUtilsgetKinesisRuntimeProperty(kinesis dataTopicName) FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumerltgt(DATASOURCE new SimpleStringSchema() sourceProperties) DataStream transactionsStringsStream = envaddSource(kinesisConsumer)name(EventStream)uid(sensoreventsstream)
return transactionsStringsStreamflatMap(new JsonDeserializerltgt(SensorEventclass))returns(SensorEventclass)flatMap(new TimeStamperltgt())returns(SensorEventclass)name(Transactions Deserialization)
}
我们使用 广播状态,可以将两条事件流结合并共同处理。广播状态适合需要联接低吞吐量流和高吞吐量流的应用,或需要动态更新处理逻辑。以下图示展示了广播状态如何连接。有关详细信息,请参见 Apache Flink 中广播状态的实践指南。
这符合我们动态规则引擎的构想,其中我们有一条低吞吐量的规则流根据需要添加和一条高吞吐量的事务流以每分钟一次的规律流入。这个广播流允许我们将事务流或恒温器数据与规则流连接起来,如下代码片段所示:
龙猫梯子java// 处理管道设置DataStreamltAlertgt alerts = sensorEvents connect(rulesStream) process(new DynamicKeyFunction()) uid(partitionsensordata) name(按设备和 RuleId 拆分传感器数据) keyBy((equipmentSensorHash) gt equipmentSensorHashgetKey()) connect(rulesStream) process(new DynamicAlertFunction()) uid(ruleevaluator) name(规则评估器)
要了解有关广播状态的更多信息,请参见 广播状态模式。当广播流连接到数据流时如上例所示,它成为一个 BroadcastConnectedStream。应用于此流的函数允许我们处理事务和规则,并实现 processBroadcastElement 方法。KeyedBroadcastProcessFunction 接口提供三个方法来处理记录并发出结果:
processBroadcastElement() 为每条广播流记录调用我们的规则流。processElement() 为每条键控流记录调用。它提供对广播状态的只读访问,以防止在函数的并行实例中造成不同的广播状态。processElement 方法从广播状态中检索规则和键控状态中的前一个传感器事件。如果表达式评估为 TRUE将在下一节中讨论,则会发出一个警报。onTimer() 当先前注册的定时器触发时调用。可以在 processElement 方法中注册定时器,用于在未来执行计算或清理状态。我们的代码中使用此功能以确保必要时驱逐任何旧数据由我们的规则定义。我们可以按如下方式在广播状态实例中处理规则:
java@Overridepublic void processBroadcastElement(Rule rule Context ctx Collector out) throws Exception { BroadcastState broadcastState = ctxgetBroadcastState(RulesEvaluatorDescriptorsrulesDescriptor) Long currentProcessTime = SystemcurrentTimeMillis() // 如果我们得到一个新规则,将其标记为不足数据操作状态 if (!broadcastStatecontains(rulegetId())) { outputRuleOpData(rule OperationStatusINSUFFICIENTDATA currentProcessTime ctx) } ProcessingUtilshandleRuleBroadcast(rule broadcastState)}
static void handleRuleBroadcast(FDDRule rule BroadcastState broadcastState) throws Exception { switch (rulegetStatus()) { case ACTIVE broadcastStateput(rulegetId() rule) break case INACTIVE broadcastStateremove(rulegetId()) break }}
注意在代码中,当规则状态为 INACTIVE 时会发生什么。这将从广播状态中移除该规则,之后不再将该规则视为有效。类似地,处理状态为 ACTIVE 的规则将会添加或替换该规则。通过这种方式,我们可以动态调整,随时添加和移除规则。
评估规则
规则可以通过多种方式进行评估。尽管这不是强制性的,我们的规则是使用 Java 表达式语言JEXL兼容格式创建的。这使我们能够通过提供 JEXL 表达式及相关上下文必要的事务以重新评估规则或键值对,简单地调用评估方法:
javaJexlExpression expression = jexlcreateExpression(rulegetRuleExpression())Boolean isAlertTriggered = (Boolean) expressionevaluate(context)
JEXL 的一项强大功能是它不仅支持简单表达式例如包括比较和算术运算,它也支持 用户定义的函数。JEXL 允许你使用相同的语法调用 Java 对象上的任何方法。如果存在一个名为 SENSORcebb1baf2df04267b48928be562fccea 的 POJO 具有方法 hasNotChanged,你可以使用表达式调用该方法。有关我们在 SensorMapState 类中使用的更多用户定义函数,你可以查看相关源码。
让我们看一个示例,假设存在如下规则表达式:
SENSORcebb1baf2df04267b48928be562fcceahasNotChanged(5)
这个规则评估后的结果相当于传感器在过去 5 分钟内没有变化。
被暴露给 JEXL 的对应用户定义函数作为上下文的一部分如下:
javapublic Boolean hasNotChanged(Integer time) { Long minutesSinceChange = getMinutesSinceChange() logdebug(Time time Minutes since change minutesSinceChange) return minutesSinceChange gt time}
相关数据如下,会进入上下文窗口,随后用于规则的评估。
json{ id SENSORcebb1baf2df04267b48928be562fccea measureValue 10 eventTimestamp 1721666423000}
在这种情况下,结果或 isAlertTriggered 的值为 TRUE。
创建接收器
与之前创建源头类似,我们也可以创建接收器。这些接收器将作为数据流处理的终点,我们分析和评估的结果将被发出以供后续使用。与源相同,我们的接收器也是一个 Kinesis 数据流,下游的 Lambda 消费者将迭代记录并处理以采取适当的行动。下游处理有很多应用;例如,我们可以持久化该评估结果,创建推送通知,或更新规则仪表板。
基于之前的评估,我们在 process 函数内部有如下逻辑:
javaif (isAlertTriggered) { alert = new Alert(rulegetEquipmentName() rulegetName() rulegetId() AlertStatusSTART triggeringEvents currentEvalTime) loginfo(Pushing {} alert for {} AlertStatusSTART rulegetName())}outcollect(alert)
当 process 函数发出警报时,警报响应被发送到接收器,可在架构中向下游接收并使用。
javaalertsflatMap(new JsonSerializerltgt(Alertclass)) name(警报反序列化)sinkTo(createAlertSink(sinkProperties)) uid(alertsjsonsink) name(警报 JSON 接收器)
此时,我们可以进行处理。我们有一个 Lambda 函数记录这些记录,可见如下数据:
json{ equipmentNameTHERMOSTAT1 ruleNameRuleTest2 ruleIdcda160c0c79047dabd654abae838af3b statusSTART triggeringEvents[ { equipment{ idTHERMOSTAT1 } idSENSORcebb1baf2df04267b48928be562fccea measureValue200 eventTimestamp1721672715000 ingestionTimestamp1721741792958 } ] timestamp1721741792790}
虽然在这个示例中简化了,但这些代码片段构成了获取评估结果并将其发送到其他地方的基础。

结论
在本文中,我们演示了如何使用 Amazon Managed Service for Apache Flink 实现动态规则引擎,其中规则和输入数据都通过 Kinesis 数据流进行流式传输。你可以通过查看我们的 在线学习资料 来深入了解它。
随着公司寻求实施近实时的规则引擎,这种架构提供了一个引人注目的解决方案。Amazon Managed Service for Apache Flink 提供强大的能力,能够实时转化和分析流数据,同时简化 Flink 工作负载的管理,并与其他 AWS 服务无缝集成。
为了帮助你入手这个架构,我们很高兴地宣布将把我们的完整规则引擎代码作为示例发布在 GitHub 上。这个全面示例将超出我们帖子中提供的代码片段,提供更深入的视角,帮助你了解使用 Flink 构建动态规则引擎的复杂性。
我们鼓励你探索这个示例代码,针对你的