Skip to content
约 0 字 · 预计阅读 0 分钟

MQTT 协议

什么是 MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,专为物联网设计。它具有以下特点:

  • 轻量级:协议头部最小仅 2 字节
  • 低带宽:适合网络不稳定的环境
  • 可靠传输:支持三种 QoS 级别
  • 双向通信:支持发布和订阅

MQTT 架构

┌─────────────────────────────────────────────────────────────┐
│                    MQTT 架构模型                             │
│                                                             │
│  ┌─────────┐     ┌─────────┐     ┌─────────┐              │
│  │ 发布者  │     │ 订阅者  │     │ 订阅者  │              │
│  │Publisher│     │Subscriber│    │Subscriber│             │
│  └────┬────┘     └────┬────┘     └────┬────┘              │
│       │               │               │                    │
│       │    ┌──────────┴──────────┐    │                    │
│       │    │                     │    │                    │
│       └───►│      Broker         │◄───┘                    │
│            │    (消息代理)        │                         │
│            │                     │                         │
│            └─────────────────────┘                         │
│                      │                                      │
│                      ▼                                      │
│            ┌─────────────────────┐                         │
│            │      Topic 树       │                         │
│            │  home/living/temp   │                         │
│            │  home/bedroom/light │                         │
│            └─────────────────────┘                         │
└─────────────────────────────────────────────────────────────┘

上述图示展示了 MQTT 的发布/订阅架构。

核心概念:

概念说明
Broker消息代理,负责转发消息
Publisher发布者,发送消息的客户端
Subscriber订阅者,接收消息的客户端
Topic主题,消息路由的地址
QoS服务质量等级

MQTT 消息格式

固定头部

MQTT 固定头部格式:
┌─────────────────────────────────────────────────────────────┐
│   Byte 1    │              Byte 2...N                       │
├──────┬──────┼───────────────────────────────────────────────┤
│ 类型 │ 标志 │              剩余长度                          │
│ 4位  │ 4位  │           (1-4 字节)                          │
└──────┴──────┴───────────────────────────────────────────────┘

消息类型:
┌─────────────────────────────────────────────────────────────┐
│ 1  - CONNECT     : 客户端请求连接                           │
│ 2  - CONNACK     : 连接确认                                 │
│ 3  - PUBLISH     : 发布消息                                 │
│ 4  - PUBACK      : 发布确认 (QoS 1)                         │
│ 5  - PUBREC      : 发布收到 (QoS 2 第一步)                  │
│ 6  - PUBREL      : 发布释放 (QoS 2 第二步)                  │
│ 7  - PUBCOMP     : 发布完成 (QoS 2 第三步)                  │
│ 8  - SUBSCRIBE   : 订阅请求                                 │
│ 9  - SUBACK      : 订阅确认                                 │
│ 10 - UNSUBSCRIBE : 取消订阅                                 │
│ 11 - UNSUBACK    : 取消订阅确认                             │
│ 12 - PINGREQ     : 心跳请求                                 │
│ 13 - PINGRESP    : 心跳响应                                 │
│ 14 - DISCONNECT  : 断开连接                                 │
└─────────────────────────────────────────────────────────────┘

上述图示展示了 MQTT 固定头部格式。

剩余长度编码

c
/*
 * 剩余长度使用变长编码
 * 每个字节的最高位表示是否有后续字节
 * 低 7 位为实际数据
 */
int encode_remaining_length(uint8_t *buf, int length)
{
    int bytes = 0;
    
    do {
        uint8_t byte = length % 128;
        length /= 128;
        
        // 如果还有更多字节,设置最高位
        if (length > 0) {
            byte |= 0x80;
        }
        
        buf[bytes++] = byte;
    } while (length > 0);
    
    return bytes;
}

int decode_remaining_length(uint8_t *buf, int *length)
{
    int multiplier = 1;
    int value = 0;
    int bytes = 0;
    uint8_t byte;
    
    do {
        byte = buf[bytes++];
        value += (byte & 0x7F) * multiplier;
        multiplier *= 128;
        
        if (multiplier > 128 * 128 * 128) {
            return -1;  // 错误:长度过长
        }
    } while (byte & 0x80);
    
    *length = value;
    return bytes;
}

上述代码展示了剩余长度的编解码方式。

编码示例:

长度 64:    [0x40]                    // 单字节
长度 321:   [0xC1, 0x02]              // 双字节
            321 = 1 + 2*128 = 0xC1 + 0x02*128
长度 100000: [0xA0, 0x8D, 0x06]       // 三字节

连接建立

CONNECT 消息

c
/*
 * CONNECT 消息格式
 * 客户端发送给 Broker 请求建立连接
 */
typedef struct {
    // 协议名(固定为 "MQTT")
    char protocol_name[4];
    uint8_t protocol_level;     // 协议版本 (4 = MQTT 3.1.1)
    
    // 连接标志
    uint8_t clean_session:1;    // 清除会话
    uint8_t will_flag:1;        // 遗嘱消息标志
    uint8_t will_qos:2;         // 遗嘱消息 QoS
    uint8_t will_retain:1;      // 遗嘱消息保留
    uint8_t password_flag:1;    // 密码标志
    uint8_t username_flag:1;    // 用户名标志
    
    // 保活时间(秒)
    uint16_t keep_alive;
    
    // 可变头部
    char *client_id;            // 客户端 ID(必需)
    char *will_topic;           // 遗嘱主题
    char *will_message;         // 遗嘱消息
    char *username;             // 用户名
    char *password;             // 密码
} mqtt_connect_t;

上述代码展示了 CONNECT 消息的结构。

连接标志详解:

连接标志字节:
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ Bit7│ Bit6│ Bit5│ Bit4│ Bit3│ Bit2│ Bit1│ Bit0│
├─────┼─────┼─────┼─────┼─────┼─────┼─────┼─────┤
│User │Pass │Retain│Will │Will │Will │Clean│  0  │
│Name │Word │      │QoS1 │QoS0 │Flag │Sess │     │
└─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┘

Clean Session:
  0 = 保持会话状态(订阅、未确认消息等)
  1 = 清除会话状态,重新开始

Will Flag:
  1 = 客户端异常断开时,Broker 发布遗嘱消息
  0 = 不发送遗嘱消息

上述图示展示了连接标志的位定义。

CONNACK 消息

c
/*
 * CONNACK 消息格式
 * Broker 响应客户端连接请求
 */
typedef struct {
    uint8_t session_present;    // 会话存在标志
    uint8_t return_code;        // 连接返回码
} mqtt_connack_t;

/*
 * 返回码说明
 */
#define CONNACK_ACCEPTED              0  // 连接成功
#define CONNACK_REFUSED_VERSION       1  // 协议版本不支持
#define CONNACK_REFUSED_ID            2  // 客户端 ID 无效
#define CONNACK_REFUSED_SERVER        3  // 服务器不可用
#define CONNACK_REFUSED_USER_PASS     4  // 用户名或密码错误
#define CONNACK_REFUSED_AUTH          5  // 未授权

上述代码展示了 CONNACK 消息的结构。

主题与订阅

主题层级

主题层级示例:
┌─────────────────────────────────────────────────────────────┐
│                    主题树结构                                │
│                                                             │
│  home                                                       │
│  ├── living                                                 │
│  │   ├── temperature    (home/living/temperature)          │
│  │   └── light          (home/living/light)                │
│  └── bedroom                                                │
│      ├── temperature    (home/bedroom/temperature)         │
│      └── light          (home/bedroom/light)               │
│                                                             │
│  通配符订阅:                                                │
│  home/+/temperature     (单层通配符 +)                      │
│  home/bedroom/#         (多层通配符 #)                      │
└─────────────────────────────────────────────────────────────┘

上述图示展示了主题层级结构。

通配符说明:

通配符说明示例
+单层通配符,匹配一个层级home/+/temp 匹配 home/living/temp
#多层通配符,匹配多个层级home/# 匹配 home/living/temp
$系统主题前缀$SYS/broker/version

SUBSCRIBE 消息

c
/*
 * SUBSCRIBE 消息格式
 */
typedef struct {
    uint16_t packet_id;         // 报文标识符
    char *topic;                // 主题过滤器
    uint8_t qos;                // 订阅的 QoS 级别
} mqtt_subscribe_t;

/*
 * 订阅示例
 */
void subscribe_example(mqtt_client_t *client)
{
    // 订阅单个主题
    mqtt_subscribe(client, "home/living/temperature", 1);
    
    // 使用通配符订阅
    mqtt_subscribe(client, "home/+/temperature", 1);
    
    // 订阅所有卧室主题
    mqtt_subscribe(client, "home/bedroom/#", 0);
}

上述代码展示了订阅消息的结构和使用。

QoS 服务质量

QoS 级别

QoS 0: 最多一次 (At most once)
┌─────────────────────────────────────────────────────────────┐
│ Publisher          Broker           Subscriber              │
│     │                │                   │                  │
│     │───PUBLISH────►│                   │                  │
│     │               │───PUBLISH────────►│                  │
│     │               │                   │                  │
│     ▼               ▼                   ▼                  │
│   发送即忘记,不保证送达                                    │
└─────────────────────────────────────────────────────────────┘

QoS 1: 至少一次 (At least once)
┌─────────────────────────────────────────────────────────────┐
│ Publisher          Broker           Subscriber              │
│     │                │                   │                  │
│     │───PUBLISH────►│                   │                  │
│     │               │───PUBLISH────────►│                  │
│     │◄──PUBACK─────│                   │                  │
│     │               │◄──PUBACK─────────│                  │
│     │               │                   │                  │
│     ▼               ▼                   ▼                  │
│   保证送达,可能重复                                        │
└─────────────────────────────────────────────────────────────┘

QoS 2: 恰好一次 (Exactly once)
┌─────────────────────────────────────────────────────────────┐
│ Publisher          Broker           Subscriber              │
│     │                │                   │                  │
│     │───PUBLISH────►│                   │                  │
│     │◄──PUBREC─────│                   │                  │
│     │───PUBREL────►│                   │                  │
│     │               │───PUBLISH────────►│                  │
│     │               │◄──PUBREC─────────│                  │
│     │               │───PUBREL────────►│                  │
│     │               │◄──PUBCOMP───────│                  │
│     │◄──PUBCOMP────│                   │                  │
│     │               │                   │                  │
│     ▼               ▼                   ▼                  │
│   保证送达且不重复,四次握手                                │
└─────────────────────────────────────────────────────────────┘

上述图示展示了三种 QoS 级别的消息流程。

QoS 选择建议:

QoS场景特点
0传感器数据、日志高吞吐、可丢失
1一般业务消息可靠、可能重复
2金融交易、计费可靠、不重复

心跳机制

c
/*
 * MQTT 心跳机制
 * 保持连接活跃,检测客户端是否在线
 */
void mqtt_keep_alive(mqtt_client_t *client)
{
    time_t now = time(NULL);
    time_t last_send = client->last_send_time;
    uint16_t keep_alive = client->keep_alive;
    
    /*
     * 检查是否需要发送心跳
     * 在 Keep Alive 时间的 1.5 倍内必须发送数据
     */
    if (now - last_send > keep_alive * 1.5) {
        // 发送 PINGREQ
        send_pingreq(client);
        client->last_send_time = now;
        
        // 等待 PINGRESP
        // 如果超时未收到,则断开连接
    }
}

上述代码展示了心跳机制的实现。

心跳流程:

实战:嵌入式 MQTT 客户端

客户端结构

c
/*
 * MQTT 客户端结构
 */
typedef struct {
    // 连接信息
    int socket;
    char *server;
    uint16_t port;
    char *client_id;
    
    // 认证信息
    char *username;
    char *password;
    
    // 会话信息
    uint16_t keep_alive;
    uint16_t packet_id;
    uint8_t clean_session;
    
    // 缓冲区
    uint8_t rx_buffer[1024];
    uint8_t tx_buffer[1024];
    
    // 回调函数
    void (*on_connect)(void);
    void (*on_message)(const char *topic, const uint8_t *payload, 
                       size_t len);
    void (*on_disconnect)(void);
} mqtt_client_t;

上述代码展示了 MQTT 客户端的结构定义。

连接函数

c
/*
 * 连接到 MQTT Broker
 * 
 * 参数说明:
 * @client: MQTT 客户端结构
 * @server: 服务器地址
 * @port: 端口号
 * @client_id: 客户端唯一标识
 * 
 * 返回值:
 * 0 = 成功
 * < 0 = 错误码
 */
int mqtt_connect(mqtt_client_t *client, const char *server, 
                 uint16_t port, const char *client_id)
{
    int ret;
    uint8_t *ptr = client->tx_buffer;
    size_t len = 0;
    
    /*
     * 1. 建立 TCP 连接
     */
    client->socket = socket_connect(server, port);
    if (client->socket < 0) {
        return -1;
    }
    
    /*
     * 2. 构建 CONNECT 消息
     */
    // 固定头部
    ptr[len++] = 0x10;  // CONNECT 类型
    
    // 可变头部
    // 协议名
    ptr[len++] = 0x00;  // 长度高字节
    ptr[len++] = 0x04;  // 长度低字节
    memcpy(&ptr[len], "MQTT", 4);
    len += 4;
    
    // 协议级别
    ptr[len++] = 0x04;  // MQTT 3.1.1
    
    // 连接标志
    uint8_t flags = 0x02;  // Clean Session
    if (client->username) flags |= 0x80;
    if (client->password) flags |= 0x40;
    ptr[len++] = flags;
    
    // 保活时间
    ptr[len++] = (client->keep_alive >> 8) & 0xFF;
    ptr[len++] = client->keep_alive & 0xFF;
    
    // 载荷
    // Client ID
    uint16_t id_len = strlen(client_id);
    ptr[len++] = (id_len >> 8) & 0xFF;
    ptr[len++] = id_len & 0xFF;
    memcpy(&ptr[len], client_id, id_len);
    len += id_len;
    
    // 用户名
    if (client->username) {
        uint16_t user_len = strlen(client->username);
        ptr[len++] = (user_len >> 8) & 0xFF;
        ptr[len++] = user_len & 0xFF;
        memcpy(&ptr[len], client->username, user_len);
        len += user_len;
    }
    
    // 密码
    if (client->password) {
        uint16_t pass_len = strlen(client->password);
        ptr[len++] = (pass_len >> 8) & 0xFF;
        ptr[len++] = pass_len & 0xFF;
        memcpy(&ptr[len], client->password, pass_len);
        len += pass_len;
    }
    
    /*
     * 3. 计算并填充剩余长度
     */
    int remaining_len = len - 1;  // 减去固定头部字节
    int rem_bytes = encode_remaining_length(&ptr[1], remaining_len);
    
    /*
     * 4. 发送 CONNECT 消息
     */
    ret = socket_send(client->socket, ptr, 1 + rem_bytes + remaining_len);
    if (ret < 0) {
        socket_close(client->socket);
        return -2;
    }
    
    /*
     * 5. 等待 CONNACK 响应
     */
    ret = socket_recv(client->socket, client->rx_buffer, 4);
    if (ret < 4) {
        socket_close(client->socket);
        return -3;
    }
    
    // 检查 CONNACK
    if (client->rx_buffer[0] != 0x20) {  // CONNACK 类型
        socket_close(client->socket);
        return -4;
    }
    
    if (client->rx_buffer[3] != 0x00) {  // 返回码
        socket_close(client->socket);
        return -5;
    }
    
    // 连接成功
    client->server = strdup(server);
    client->port = port;
    client->client_id = strdup(client_id);
    
    if (client->on_connect) {
        client->on_connect();
    }
    
    return 0;
}

上述代码展示了 MQTT 连接函数的实现。

发布函数

c
/*
 * 发布消息
 * 
 * 参数说明:
 * @client: MQTT 客户端
 * @topic: 主题
 * @payload: 消息内容
 * @len: 消息长度
 * @qos: 服务质量等级
 * @retain: 是否保留消息
 */
int mqtt_publish(mqtt_client_t *client, const char *topic,
                 const uint8_t *payload, size_t len, 
                 uint8_t qos, bool retain)
{
    uint8_t *ptr = client->tx_buffer;
    size_t pos = 0;
    
    /*
     * 1. 固定头部
     */
    uint8_t type = 0x30;  // PUBLISH 类型
    if (retain) type |= 0x01;
    type |= (qos << 1);   // QoS 标志
    ptr[pos++] = type;
    
    /*
     * 2. 计算剩余长度
     */
    size_t topic_len = strlen(topic);
    size_t remaining = 2 + topic_len + len;
    if (qos > 0) {
        remaining += 2;  // Packet ID
    }
    
    pos += encode_remaining_length(&ptr[pos], remaining);
    
    /*
     * 3. 可变头部
     */
    // 主题
    ptr[pos++] = (topic_len >> 8) & 0xFF;
    ptr[pos++] = topic_len & 0xFF;
    memcpy(&ptr[pos], topic, topic_len);
    pos += topic_len;
    
    // Packet ID (QoS > 0)
    if (qos > 0) {
        client->packet_id++;
        ptr[pos++] = (client->packet_id >> 8) & 0xFF;
        ptr[pos++] = client->packet_id & 0xFF;
    }
    
    /*
     * 4. 载荷
     */
    memcpy(&ptr[pos], payload, len);
    pos += len;
    
    /*
     * 5. 发送消息
     */
    int ret = socket_send(client->socket, ptr, pos);
    if (ret < 0) {
        return -1;
    }
    
    /*
     * 6. 等待确认 (QoS > 0)
     */
    if (qos == 1) {
        // 等待 PUBACK
        ret = socket_recv(client->socket, client->rx_buffer, 4);
        if (ret < 4 || client->rx_buffer[0] != 0x40) {
            return -2;
        }
    } else if (qos == 2) {
        // 等待 PUBREC, 发送 PUBREL, 等待 PUBCOMP
        // ... 省略 QoS 2 完整流程
    }
    
    return 0;
}

上述代码展示了 MQTT 发布函数的实现。

订阅函数

c
/*
 * 订阅主题
 */
int mqtt_subscribe(mqtt_client_t *client, const char *topic, uint8_t qos)
{
    uint8_t *ptr = client->tx_buffer;
    size_t pos = 0;
    
    /*
     * 1. 固定头部
     */
    ptr[pos++] = 0x82;  // SUBSCRIBE 类型 + QoS 1
    
    /*
     * 2. 计算剩余长度
     */
    size_t topic_len = strlen(topic);
    size_t remaining = 2 + 2 + topic_len + 1;  // Packet ID + Topic + QoS
    pos += encode_remaining_length(&ptr[pos], remaining);
    
    /*
     * 3. 可变头部
     */
    client->packet_id++;
    ptr[pos++] = (client->packet_id >> 8) & 0xFF;
    ptr[pos++] = client->packet_id & 0xFF;
    
    /*
     * 4. 载荷
     */
    // 主题过滤器
    ptr[pos++] = (topic_len >> 8) & 0xFF;
    ptr[pos++] = topic_len & 0xFF;
    memcpy(&ptr[pos], topic, topic_len);
    pos += topic_len;
    
    // QoS
    ptr[pos++] = qos;
    
    /*
     * 5. 发送订阅请求
     */
    int ret = socket_send(client->socket, ptr, pos);
    if (ret < 0) {
        return -1;
    }
    
    /*
     * 6. 等待 SUBACK
     */
    ret = socket_recv(client->socket, client->rx_buffer, 5);
    if (ret < 5 || client->rx_buffer[0] != 0x90) {
        return -2;
    }
    
    return 0;
}

上述代码展示了 MQTT 订阅函数的实现。

常用 MQTT Broker

Broker特点适用场景
Mosquitto开源、轻量开发测试、小型部署
EMQX高性能、企业级大规模物联网
HiveMQ商业、高可用企业应用
VerneMQ分布式、可扩展大规模部署

总结

概念说明
发布/订阅解耦消息生产者和消费者
Broker消息代理,负责路由
Topic消息主题,支持通配符
QoS服务质量:0/1/2
心跳保持连接活跃

参考资料

[1] MQTT Version 3.1.1. OASIS Standard

[2] MQTT Version 5.0. OASIS Standard

[3] Mosquitto Documentation. https://mosquitto.org/

相关主题

基于 VitePress 构建