介绍
简介
FluxMQ Rule Engine (以下简称规则引擎) 用于配置FluxMQ 消息流与设备事件的处理、响应规则。规则引擎不仅提供了清晰、灵活的 "配置式" 的业务集成方案,简化了业务开发流程,提升用户易用性,降低业务系统与 FluxMQ 的耦合度;也为 FluxMQ 的私有功能定制提供了一个更优秀的基础架构。 FluxMQ 在 消息发布或事件触发 时将触发规则引擎,满足触发条件的规则将执行各自的 SQL 语句筛选并处理消息和事件的上下文信 息;
消息发布
规则引擎借助响应动作可将特定主题的消息处理结果存储到数据库,发送到 HTTP Server,转发到消息队列 Kafka 或 RabbitMQ,重新发布到新的主题甚至是另一个 Broker 集群中,每个规则可以配置多个响应动作。
SELECT * FROM "topic"
SELECT payload.x as x FROM "t/a"
事件触发
规则引擎使用 $EVENT. 开头的虚拟主题(事件主题)处理 FluxMQ 内置事件,内置事件提供更精细的消息控制和客户端动作处理能力,可用在 QoS 1 QoS 2 的消息抵达记录、设备上下线记录等业务中。
SELECT clientid, connected_at FROM "$EVENT.CONNECT" WHERE username = 'fluxmq'
规则引擎数据和 SQL 语句格式,事件主题 列表详细教程参见 规则引擎语法。
规则引擎组成
规则描述了 数据从哪里来、如何筛选并处理数据、处理结果到哪里去 三个配置,即一条可用的规则包含三个要素:
- 触发事件:规则通过事件触发,触发时事件给规则注入事件的上下文信息(数据源),通过 SQL 的 FROM 子句指定事件类型;
- 处理规则(SQL):使用 SELECT 子句 和 WHERE 子句以及内置处理函数, 从上下文信息中过滤和处理数据;
- 响应动作:如果有处理结果输出,规则将执行相应的动作,如持久化到数据库、重新发布处理后的消息、转发消息到消息队列等。
如图所示是一条简单的规则,该条规则用于处理 消息发布 时的数据,将全部主题消息的 msg 字段,消息 topic 、qos 筛选出来,发送到数据源
使用 FluxMQ 的规则引擎可以灵活地处理消息和事件。使用规则引擎可以方便地实现诸如将消息转换成指定格式,然后存入数据库表,或者发送到消息队列等。 与 FluxMQ 规则引擎相关的概念包括: 规则(rule)、动作(action)、资源(resource) 和 资源类型(resource-type)。