# SQL 事件类型与字段
规则的 SQL 语句可以处理 MQTT事件 消息,或者是具有数据流入能力的数据桥接。
SQL 语句使用 FROM 来指定 MQTT事件,在 SELECT 和 WHERE 子句中可以引用相应的字段。
数据源类型不同,可以使用的字段也不同。
# 客户端事件
规则的 SQL 语句可以处理事件(发布消息、客户端上下线、客户端订阅等),FROM 子句后面跟事件主题。
事件主题以 $EVENT/ 开头,比如 $EVENT.PUBLISH,$EVENT.SUBSCRIBE 。
事件名称 | 事件主题名 | 释义 |
---|---|---|
发布事件 | $EVENT.PUBLISH | 发布消息 |
订阅事件 | $EVENT.SUBSCRIBE | 订阅成功消息 |
取消订阅事件 | $EVENT.UNSUBSCRIBE | 取消订阅成功消息 |
发布回复事件 | $EVENT.ACK | 消息接收成功并回复 |
心跳事件 | $EVENT.PING | 连接保活心跳消息 |
取消连接事件 | $EVENT.DISCONNECT | 客户端主动断开连接 |
连接断开事件 | $EVENT.CLOSE | 服务端关闭连接 |
建立连接事件 | $EVENT.CONNECT | 连接成功 |
离线消息事件 | $EVENT.OFFLINE | 离线期间接收的消息 |
# 发布事件 ("$EVENT.PUBLISH")
当消息被放入底层socket时触发规则。
字段 | 解释 |
---|---|
id | 消息唯一标识,客户端publish会生成唯一Id,当qos>0时候,Ack消息会携带此id,代表一次完成应答操作 |
payload | MQTT 消息体 |
messageId | MQTT 消息 ID |
topic | MQTT 主题 |
qos | MQTT 消息的 QoS |
retain | MQTT 消息是否保持 |
timestamp | 事件触发时间 (单位:毫秒) |
clientId | 客户端 ID |
clientIp | 客户端的 IPAddress |
nodeIp | 事件触发所在节点 IPAddress |
示例
select
*
from
"$EVENT.PUBLISH"
输出
{
"id": "8ace2bbab65b4fb1a0afc7c77419e6b5",
"payload": {},
"messageId": 1,
"topic": "test",
"qos": 1,
"retain": false,
"timestamp": "1690599987495",
"clientId": "A1212313",
"clientIp": "183.136.225.31",
"nodeIp": "192.168.1.12"
}
# 发布回复事件 ("$EVENT.ACK")
当消息发送到客户端,并收到客户端回复的ack时触发规则,仅QOS1,QOS2会触发。
字段 | 解释 |
---|---|
id | 消息唯一标识,客户端publish会生成唯一Id,当qos>0时候,Ack消息会携带此id,代表一次完成应答操作 |
payload | MQTT 消息体 |
messageId | MQTT 消息 ID |
topic | MQTT 主题 |
qos | MQTT 消息的 QoS |
timestamp | 事件触发时间 (单位:毫秒) |
clientId | 客户端 ID |
clientIp | 客户端的 IPAddress |
nodeIp | 事件触发所在节点 IPAddress |
desc | 回复描述,用于区分qos1,2消息回复 |
- desc字段枚举
字段 | 解释 |
---|---|
ack | QoS1 回复 |
rec | QoS2 发布确认 |
comp | QoS2 发布完成 |
示例
select
*
from
"$EVENT.ACK"
输出
{
"id": "8ace2bbab65b4fb1a0afc7c77419e6b5",
"messageId": 10001,
"clientId": "A1212313",
"payload": {},
"topic": "test",
"qos": 1,
"timestamp": "1690599987495",
"clientIp": "183.136.225.31",
"nodeIp": "192.168.1.1",
"desc": "ack"
}
# 建立连接事件 ("$EVENT.CONNECT")
当客户端连接成功时触发规则。
字段 | 解释 |
---|---|
clientIp | 客户端的 IPAddress |
nodeIp | 事件触发所在节点 IPAddress |
version | 使用的协议 版本 |
keepalive | MQTT 保活间隔 |
cleanSession | 是否清空会话 |
auth.username | 客户端用户名 |
auth.password | 客户端密码 |
will.isRetain | 遗嘱消息是否保持 |
will.willTopic | 遗嘱消息主题 |
will.willQos | 遗嘱消息语义级别 |
will.willMessage | 遗嘱消息内容 |
timestamp | 事件触发时间 (单位:毫秒) |
clientId | 客户端 ID |
示例
select
*
from
"$EVENT.CONNECT"
输出
{
"clientIp": "192.168.0.100",
"nodeIp": "127.0.0.1",
"version": "MQTT_3_1",
"keepalive": 120,
"cleanSession": false,
"auth": {
"username": "fluxmq",
"password": "fluxmq"
},
"will": {
"isRetain": false,
"willTopic": "willTest",
"willQos": 1,
"willMessage": ""
},
"timestamp": "1690599987495",
"clientId": "A1212313"
}
# 取消连接事件 ("$EVENT/DISCONNECT")
当客户端连接断开时触发规则。
字段 | 解释 |
---|---|
clientId | 客户端 ID |
time | 事件触发时间 |
clientIp | 客户端的 IPAddress |
nodeIp | 事件触发所在节点 IPAddress |
示例
select
*
from
"$EVENT.DISCONNECT"
输出
{
"clientId": "A1212313",
"time": "2022 12-22 12:00:00",
"clientIp": "183.136.225.31",
"nodeIp": "192.168.1.1"
}
# 订阅事件 ("$EVENT/SUBSCRIBE")
当客户端订阅成功时触发规则。
字段 | 解释 |
---|---|
messageId | MQTT 消息 ID |
topic | MQTT 主题 |
qos | MQTT 消息的 QoS |
clientId | 客户端 ID |
timestamp | 事件触发时间 |
clientIp | 客户端的 IPAddress |
nodeIp | 事件触发所在节点 IPAddress |
示例
select
*
from
"$EVENT.SUBSCRIBE"
输出
{
"messageId": 1,
"topic": "test",
"qos": 1,
"clientIp": "183.136.225.31:21950",
"timestamp": "1690599987495",
"clientId": "A1212313",
"nodeIp": "192.168.1.12"
}
# 客户端取消订阅成功事件 ("$EVENT/UNSUBSCRIBE")
当客户端取消订阅成功时触发规则。
字段 | 解释 |
---|---|
messageId | MQTT 消息 ID |
topic | MQTT 主题 |
clientId | 客户端 ID |
timestamp | 事件触发时间 |
clientIp | 客户端的 IPAddress |
nodeIp | 事件触发所在节点 IPAddress |
示例
select
*
from
"$EVENT.UNSUBSCRIBE"
输出
{
"messageId": 1,
"topic": "test",
"clientIp": "183.136.225.31:21950",
"timestamp": "1690599987495",
"clientId": "A1212313",
"nodeIp": "192.168.1.1"
}
# 心跳事件 ("$EVENT.PING")
当客户端推送保活心跳时触发规则。
字段 | 解释 |
---|---|
clientId | 客户端 ID |
timestamp | 事件触发时间 |
clientIp | 客户端的 IPAddress |
nodeIp | 事件触发所在节点 IPAddress |
示例
select
*
from
"$EVENT.PING"
输出
{
"clientId": "A1212313",
"timestamp": "1690599987495",
"clientIp": "183.136.225.31",
"nodeIp": "192.168.1.1"
}
# 连接断开事件 ("$EVENT.CLOSE")
当服务端关闭连接时触发规则。
字段 | 解释 |
---|---|
messageId | MQTT 消息 ID |
reason | 连接断开原因描述 |
clientId | 客户端 ID |
timestamp | 事件触发时间 |
clientIp | 客户端的 IPAddress |
nodeIp | 事件触发所在节点 IPAddress |
reason
连接断开原因:
字段 | 解释 |
---|---|
normal | 客户端关闭 |
kicked | 服务端主动关闭 |
timeout | 客户端心跳超时关闭 |
not_authorized | 认证失败关闭 |
other_kicked | 集群互踢 |
disconnect | 客户端主动关闭 |
示例
select
*
from
"$EVENT.CLOSE"
输出
{
"messageId": 1,
"reason": "",
"timestamp": "1690599987495",
"clientId": "A1212313",
"clientIp": "183.136.225.31",
"nodeIp": "192.168.1.1"
}
# 离线消息事件 ("$EVENT.OFFLINE")
- 离线消息必须是Qos=1或者Qos=2的消息
- 当客户端发布的消息没有客户端订阅时,那么此消息被定义为离线消息
- ACK未确认的qos=1或者qos=2的消息,也会被定义为离线消息
配置离线消息,将离线消息持久化到Mysql,Redis,PostgreSQL数据库中。
配置订阅消息,将离线消息根据订阅主题推送给客户端。
配置发布回复,根据客户端的ACK回复,从数据库中删除离线消息。
即同时选择:离线消息、订阅消息、发布回复三种类型。
可以为SQL添加topic过滤条件,比如topic =~ 'test/#'
字段 | 解释 |
---|---|
id | 消息唯一标识 |
payload | MQTT 消息体 |
messageId | MQTT 消息 ID |
topic | MQTT 主题 |
qos | MQTT 消息的 QoS |
retain | MQTT 消息是否保持 |
timestamp | 事件触发时间 (单位:毫秒) |
clientId | 客户端 ID |
clientIp | 客户端的 IPAddress |
nodeIp | 事件触发所在节点 IPAddress |
示例
select
*
from
"$EVENT.OFFLINE"
输出
{
"id": "8ace2bbab65b4fb1a0afc7c77419e6b5",
"payload": {},
"messageId": 1,
"topic": "test",
"qos": 1,
"retain": false,
"timestamp": "1690599987495",
"clientId": "A1212313",
"clientIp": "183.136.225.31",
"nodeIp": "192.168.1.1"
}