规则引擎实战案例
概述
本 文档提供 FluxMQ 规则引擎在实际业务场景中的完整应用案例,帮助用户快速上手并掌握规则引擎的实际应用。
案例一:智能环境监控系统
业务背景
某智慧园区需要监控各区域的温湿度数据,当环境异常时自动触发告警,并将历史数据存储到数据库用于分析。
数据格式
{
"deviceId": "sensor_001",
"location": "building_a/floor_2/room_201",
"temperature": 25.5,
"humidity": 65.2,
"timestamp": 1690599987495
}
规则配置
规则1:正常数据存储
-- 规则名称:环境数据存储
-- 描述:将所有环境传感器数据存储到MySQL数据库
SELECT
uuid() as id,
clientId as device_id,
split(topic, '/').get(1) as building,
split(topic, '/').get(2) as floor,
split(topic, '/').get(3) as room,
payload.temperature as temperature,
payload.humidity as humidity,
datetime(timestamp) as record_time
FROM "$EVENT.PUBLISH"
WHERE topic =~ 'sensor/+/environment'
AND isJson(payload)
动作配置 - MySQL存储:
INSERT INTO environment_data (
id, device_id, building, floor, room,
temperature, humidity, record_time
) VALUES (
'${id}', '${device_id}', '${building}',
'${floor}', '${room}', ${temperature},
${humidity}, '${record_time}'
)
规则2:高温告警
-- 规则名称:高温告警
-- 描述:当温度超过30度时发送告警
SELECT
clientId as device_id,
'HIGH_TEMPERATURE' as alert_type,
payload.temperature as current_temp,
'Temperature exceeds threshold' as message,
split(topic, '/').get(1) as location,
datetime(timestamp) as alert_time
FROM "$EVENT.PUBLISH"
WHERE topic =~ 'sensor/+/environment'
AND payload.temperature > 30
动作配置 - Webhook告警:
{
"alert_type": "${alert_type}",
"device_id": "${device_id}",
"location": "${location}",
"current_temperature": ${current_temp},
"message": "${message}",
"timestamp": "${alert_time}"
}
规则3:设备离线检测
-- 规则名称:设备离线检测
-- 描述:检测设备连接状态变化
SELECT
clientId as device_id,
'DEVICE_OFFLINE' as event_type,
clientIp as last_ip,
datetime(timestamp) as offline_time
FROM "$EVENT.DISCONNECT"
WHERE clientId =~ 'sensor_.*'
案例二:车联网数据处理
业务背景
车联网平台需要实时处理车辆上报的GPS、速度、燃油等数据,实现轨迹记录、超速告警、油耗统计等功能。
数据格式
{
"vehicleId": "CAR001",
"gps": {
"latitude": 39.9042,
"longitude": 116.4074
},
"speed": 80,
"fuel": 45.5,
"engineStatus": "running",
"timestamp": 1690599987495
}
规则配置
规则1:轨迹数据存储
-- 规则名称:车辆轨迹记录
SELECT
clientId as vehicle_id,
payload.gps.latitude as latitude,
payload.gps.longitude as longitude,
payload.speed as speed,
payload.fuel as fuel_level,
datetime(timestamp) as record_time
FROM "$EVENT.PUBLISH"
WHERE topic =~ 'vehicle/+/location'
AND payload.gps.latitude > 0
AND payload.gps.longitude > 0
动作配置 - ClickHouse存储(适合大数据分析):
INSERT INTO vehicle_trajectory (
vehicle_id, latitude, longitude, speed,
fuel_level, record_time
) VALUES (
'${vehicle_id}', ${latitude}, ${longitude},
${speed}, ${fuel_level}, '${record_time}'
)
规则2:超速告警
-- 规则名称:超速监控
SELECT
clientId as vehicle_id,
payload.speed as current_speed,
'SPEED_VIOLATION' as violation_type,
payload.gps.latitude as latitude,
payload.gps.longitude as longitude,
datetime(timestamp) as violation_time
FROM "$EVENT.PUBLISH"
WHERE topic =~ 'vehicle/+/location'
AND payload.speed > 120 -- 超过120km/h