一站式演示 · 内部分享版

开源 EMQX 4.3 Kafka 分流改造

目标:不再依赖 terminal-gateway 作为“汇聚型 MQTT Client”, 让 Broker 直接把消息按业务 Topic 分流到 Kafka,同时保留规则引擎的灵活性。

EMQX 4.3 Kafka Bridge Topic Mapping Rule Engine (Plan B)
变更效果预览
MQTT 设备消息
分流规则
Kafka 多 Topic
mqueue 压力 显著降低
链路延迟 更短
可维护性 更高

原版 vs 改造版

快速对比核心能力差异

原版(固定 Kafka topic)

  • 所有 MQTT 消息写入同一个 Kafka topic
  • 无 Topic 级别分流能力
  • 无法支持业务/租户隔离

改造后(Topic → Topic 分流)

  • 按 MQTT topic 规则映射 Kafka topic
  • 支持 +/# 通配符
  • 新增动态 producer 保障稳定
维度 原版 改造后
Kafka 目标 topic 固定一个 多 topic 分流
配置能力 仅支持 topic topic_mapping + 兜底策略
分流逻辑 忽略 MQTT topic 规则匹配 + 顺序优先级
producer 管理 仅默认 topic 动态确保可用

整体链路

从设备到 Kafka 的改造路径

Broker 内部转发链路
设备 MQTT EMQX Hook + 分流逻辑 Kafka 生产者 Kafka Topic topic_mapping + 兜底

改造收益

  • Broker 侧直接转发,减少终端网关瓶颈
  • Kafka 承担持久化与消费扩展
  • 更易配置与观测

实时接口联动

从集群拉取真实配置与状态(K8s API)

集群状态

EMQX Ready -
Kafka Ready -
更新时间 -

插件配置(实时)

默认 Kafka Topic -
兜底策略 -
Kafka Brokers -
等待载入配置...

Topic Mapping 交互演示

输入 MQTT topic,实时计算目标 Kafka topic

启用
命中规则 -
输出 Kafka Topic -
支持 + / # 通配符,按规则顺序优先匹配

测试 Demo(消息变换路径)

模拟从 EMQX → Kafka 的消息内容变化(Plan B 规则引擎)

Step 1 · 解析 device_id -
Step 2 · 注入 JSON

          
Step 3 · republish 主题 -
Step 4 · Kafka 目标 Topic -
真实链路验证提示

页面演示为可视化逻辑,实际链路可通过 mosquitto_pub + Kafka consumer 验证。命令示例见下方“验证脚本”板块。

闭环演示(真实链路)

点击按钮将消息发布到 EMQX,并从 Kafka 中实时读取回传结果

发布状态 -
Kafka 目标 Topic -
耗时(ms) -
Trace ID -
等待执行...
差异高亮(输入 vs Kafka 消息)
输入 Payload
-
Kafka 消息(新增字段高亮)
-
新增字段: -

代码讲解(关键改动)

从“固定 Kafka topic”到“分流 + 动态 producer”

分流入口(on_message_publish)

KafkaTopic = select_kafka_topic(Message#message.topic),
produce_kafka_payload(ClientId, Payload, KafkaTopic).
  • 先计算目标 Kafka topic,再写入
  • 避免所有消息落到一个固定 topic

动态 producer 保证

case ensure_producer(KafkaTopic) of
  ok -> brod:produce_cb(...);
  {error, Reason} -> ...
end.
  • 每个 Kafka topic 都能启动 producer
  • 避免“写入时发现 producer 不存在”

规则匹配(topic_mapping)

match_mapping(MqttTopic, [{Pattern, KafkaTopic} | Rest]) ->
  case emqx_topic:match(MqttTopic, Pattern) of
    true -> {ok, KafkaTopic};
    false -> match_mapping(MqttTopic, Rest)
  end.
  • 按顺序匹配,第一条命中即返回
  • 支持 + / # 通配符

兜底策略

not_found when UseMqttTopic =:= true -> mqtt_topic_to_kafka(...);
not_found -> DefaultTopic.
  • 未命中时可直接使用 MQTT topic
  • 保持 100% 可写入 Kafka

后续规则引擎改造(方案 B)

SQL 处理 + republish + Kafka 插件转发

SQL 示例

SELECT
  nth(2, split(topic, '/')) AS device_id,
  json_encode(
    map_put('device_id', nth(2, split(topic, '/')), json_decode(payload))
  ) AS payload
FROM "ota/+"

将 topic 中的 device_id 注入 payload,实现数据增强。

republish 配置

  • Target Topic:bridge/ota/${device_id}
  • Payload Template:${payload}

Kafka 插件仅转发 bridge/ota/+,避免重复。

验证脚本

快速验证链路:MQTT → Kafka

# 1) 发布 MQTT 消息
mosquitto_pub -h emqx.yangpangpang.com -t ota/DEVICE_001 -m '{"speed": 88, "soc": 73}'

# 2) 消费 Kafka 消息
kafka-console-consumer.sh --bootstrap-server 192.168.50.190:30092 \
  --topic kafka-ota --from-beginning