RocketMQ 消息发送
概述
RocketMQ 动作用于将规则引擎处理后的数据发送到 Apache RocketMQ 消息队列中。RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高并发、高可用、高可靠等特性,特别适合企业级应用场景。
配置参数
基础配置
参数 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
dataSource | string | 是 | - | RocketMQ 数据源名称 |
topic | string | 是 | - | 目标主题名称 |
tag | string | 否 | - | 消息标签,用于消息过滤 |
messageKey | string | 否 | - | 消息键,用于消息去重和顺序消费 |
batchSize | number | 否 | 100 | 批量发送大小 |
batchInterval | number | 否 | 1000 | 批量间隔(毫秒) |
timeout | number | 否 | 30000 | 操作超时时间(毫秒) |
消息配置
{
"messageKey": "clientId",
"tag": "device_data",
"properties": {
"source": "fluxmq",
"version": "2.1.0"
}
}
使用示例
基础配置示例
{
"action": "SAVE_ROCKETMQ",
"config": {
"dataSource": "rocketmq_ds",
"topic": "device-events",
"tag": "sensor_data",
"messageKey": "clientId",
"properties": {
"source": "fluxmq",
"event_type": "device_data"
},
"batchSize": 100,
"batchInterval": 1000
}
}
规则示例
-- 收集设备数据并发送到 RocketMQ
SELECT
clientId,
topic,
payload,
timestamp,
'device_event' as event_type
FROM "$EVENT.PUBLISH"
WHERE topic =~ 'device/+/data'
AND isJson(payload)