Pulsar 消息发送
概述
Pulsar 动作用于将规则引擎处理后的数据发送到 Apache Pulsar 消息队列中。Pulsar 是一个云原生的分布式消息和流处理平台,支持多租户、统一队列和流处理模型。
配置参数
基础配置
参数 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
dataSource | string | 是 | - | Pulsar 数据源名称 |
topic | string | 是 | - | 目标主题名称 |
messageKey | string | 否 | - | 消息键,用于分区路由 |
batchSize | number | 否 | 100 | 批量发送大小 |
batchInterval | number | 否 | 1000 | 批量间隔(毫秒) |
timeout | number | 否 | 30000 | 操作超时时间(毫秒) |
消息配置
{
"messageKey": "clientId",
"properties": {
"source": "fluxmq",
"version": "2.1.0"
}
}
使用示例
基础配置示例
{
"action": "SAVE_PULSAR",
"config": {
"dataSource": "pulsar_ds",
"topic": "device-events",
"messageKey": "clientId",
"properties": {
"source": "fluxmq",
"event_type": "device_data"
},
"batchSize": 100,
"batchInterval": 1000
}
}
规则示例
-- 收集设备数据并发送到 Pulsar
SELECT
clientId,
topic,
payload,
timestamp,
'device_event' as event_type
FROM "$EVENT.PUBLISH"
WHERE topic =~ 'device/+/data'
AND isJson(payload)
高级特性
消息路由
使用消息键进行分区路由:
{
"messageKey": "clientId",
"topic": "device-events-${clientId}"
}
消息属性
设置消息属性用于消息过滤和路由:
{
"properties": {
"device_type": "sensor",
"data_type": "environmental",
"priority": "normal"
}
}
批量发送
配置批量发送参数优化性能:
{
"batchSize": 1000,
"batchInterval": 500,
"maxPendingMessages": 10000
}
性能优化
批量处理优化
- 批量大小设置:根据消息大小调整
batchSize
,建议 100-1000 - 批量间隔设置:根据实时性要求调整
batchInterval
- 并发控制:合理设置并发数避免 Pulsar 压力过大
消息优化
- 消息压缩:启用消息压缩减少网络传输
- 消息大小:控制单条消息大小,避免过大消息
- 序列化优化:选择合适的序列化格式