一站式演示 · 内部分享版
开源 EMQX 4.3 Kafka 分流改造
目标:不再依赖 terminal-gateway 作为“汇聚型 MQTT Client”, 让 Broker 直接把消息按业务 Topic 分流到 Kafka,同时保留规则引擎的灵活性。
变更效果预览
MQTT 设备消息
→
分流规则
→
Kafka 多 Topic
mqueue 压力
显著降低
链路延迟
更短
可维护性
更高
原版 vs 改造版
快速对比核心能力差异
原版(固定 Kafka topic)
- 所有 MQTT 消息写入同一个 Kafka topic
- 无 Topic 级别分流能力
- 无法支持业务/租户隔离
改造后(Topic → Topic 分流)
- 按 MQTT topic 规则映射 Kafka topic
- 支持 +/# 通配符
- 新增动态 producer 保障稳定
| 维度 | 原版 | 改造后 |
|---|---|---|
| Kafka 目标 topic | 固定一个 | 多 topic 分流 |
| 配置能力 | 仅支持 topic | topic_mapping + 兜底策略 |
| 分流逻辑 | 忽略 MQTT topic | 规则匹配 + 顺序优先级 |
| producer 管理 | 仅默认 topic | 动态确保可用 |
整体链路
从设备到 Kafka 的改造路径
Broker 内部转发链路
改造收益
- Broker 侧直接转发,减少终端网关瓶颈
- Kafka 承担持久化与消费扩展
- 更易配置与观测
实时接口联动
从集群拉取真实配置与状态(K8s API)
集群状态
EMQX Ready
-
Kafka Ready
-
更新时间
-
插件配置(实时)
默认 Kafka Topic
-
兜底策略
-
Kafka Brokers
-
等待载入配置...
Topic Mapping 交互演示
输入 MQTT topic,实时计算目标 Kafka topic
启用
命中规则
-
输出 Kafka Topic
-
支持 + / # 通配符,按规则顺序优先匹配
测试 Demo(消息变换路径)
模拟从 EMQX → Kafka 的消息内容变化(Plan B 规则引擎)
Step 1 · 解析 device_id
-
Step 2 · 注入 JSON
Step 3 · republish 主题
-
Step 4 · Kafka 目标 Topic
-
真实链路验证提示
页面演示为可视化逻辑,实际链路可通过 mosquitto_pub + Kafka consumer 验证。命令示例见下方“验证脚本”板块。
闭环演示(真实链路)
点击按钮将消息发布到 EMQX,并从 Kafka 中实时读取回传结果
发布状态
-
Kafka 目标 Topic
-
耗时(ms)
-
Trace ID
-
等待执行...
无
差异高亮(输入 vs Kafka 消息)
输入 Payload
-
Kafka 消息(新增字段高亮)
-
代码讲解(关键改动)
从“固定 Kafka topic”到“分流 + 动态 producer”
分流入口(on_message_publish)
KafkaTopic = select_kafka_topic(Message#message.topic),
produce_kafka_payload(ClientId, Payload, KafkaTopic).
- 先计算目标 Kafka topic,再写入
- 避免所有消息落到一个固定 topic
动态 producer 保证
case ensure_producer(KafkaTopic) of
ok -> brod:produce_cb(...);
{error, Reason} -> ...
end.
- 每个 Kafka topic 都能启动 producer
- 避免“写入时发现 producer 不存在”
规则匹配(topic_mapping)
match_mapping(MqttTopic, [{Pattern, KafkaTopic} | Rest]) ->
case emqx_topic:match(MqttTopic, Pattern) of
true -> {ok, KafkaTopic};
false -> match_mapping(MqttTopic, Rest)
end.
- 按顺序匹配,第一条命中即返回
- 支持 + / # 通配符
兜底策略
not_found when UseMqttTopic =:= true -> mqtt_topic_to_kafka(...);
not_found -> DefaultTopic.
- 未命中时可直接使用 MQTT topic
- 保持 100% 可写入 Kafka
后续规则引擎改造(方案 B)
SQL 处理 + republish + Kafka 插件转发
SQL 示例
SELECT
nth(2, split(topic, '/')) AS device_id,
json_encode(
map_put('device_id', nth(2, split(topic, '/')), json_decode(payload))
) AS payload
FROM "ota/+"
将 topic 中的 device_id 注入 payload,实现数据增强。
republish 配置
- Target Topic:
bridge/ota/${device_id} - Payload Template:
${payload}
Kafka 插件仅转发 bridge/ota/+,避免重复。
验证脚本
快速验证链路:MQTT → Kafka
# 1) 发布 MQTT 消息
mosquitto_pub -h emqx.yangpangpang.com -t ota/DEVICE_001 -m '{"speed": 88, "soc": 73}'
# 2) 消费 Kafka 消息
kafka-console-consumer.sh --bootstrap-server 192.168.50.190:30092 \
--topic kafka-ota --from-beginning