OneNET物联网平台 平台介绍 入门手册 设备开发指南 应用开发指南
API
API使用 API列表 API返回码说明 SDK MQTT LwM2M EDP Modbus TCP
消息队列MQ HTTP推送
服务定价 扩展服务 常见问题 发布公告

消息队列MQ消费端开发指南

消费协议

消息队列MQ目前支持客户端使用MQTT协议连接并消费数据,仅支持TLS加密

默认支持标准的 MQTT v3.1.1版本
连接协议 证书 地址 端口
MQTT 证书下载 183.230.40.96 8883

SDK 下载

推荐使用第三方SDK,请访问https://github.com/mqtt/mqtt.github.io/wiki/libraries

开发流程

消费端开发流程如下:

1 客户端连接

进入消息队列MQ服务 MQ

开通消息队列MQ服务实例,输入实例名称(实例名称平台唯一),如下图,实例名称用于客户端连接鉴权参数之一。

MQ 客户端可通过发送MQTT connnect报文与服务器建立连接,connect报文中三要素填写方法如下:

参数 是否必须 参数说明
clientId 用户自定义合法的UTF-8字符串,可为空
username 填写MQ实例名称
password 填写token,算法见安全鉴权
其中:res=mqs/$MQ_NAME

另外,connnect报文中,keepalive、will、session字段的使用限制如下:

功能是否支持说明
keepalive支持支持范围为:30~4800s
will不支持will、will retain 的flag必须为0,will qos必须为0
session不支持cleansession标记必须为1

2 订阅主题

  • 订阅报文

    订阅报文中topic格式如下,不支持通配符

    $sys/pb/consume/$MQ_NAME/$TOPIC/$SUB

    订阅时request QoS必须大于0,否则订阅失败

  • 订阅确认报文

    平台采用MQTT SubAck报文进行订阅确认,返回成功时返回码固定为0x01

Request QoSSubAck返回码说明
00x80订阅失败
10x01订阅成功,最大QoS为1
20x01订阅成功,最大QoS为1
  • 取消订阅报文

    客户端使用MQTT Unsubscribe报文进行订阅取消

  • 取消订阅确认报文

    服务端使用MQTT UnsubAck报文进行订阅取消确认

3 消息消费

  • 订阅成功后MQ会根据生产消息的情况,通过publish报文的主动推送给客户端
  • 消费时MQ推送的消息只会是QoS1
  • 客户端消费到数据后需要按照获取到的数据先后顺序使用puback报文回复

4 数据解析

客户端接收到publish报文后,按照MQTT协议解析其payload数据段,然后按照如下步骤解析payload数据内容

Step 1 安装protobuf

Step 2 下载下述.proto文件

onenet-mq.proto接口文件如下:

syntax = "proto3";

package mq;

message Msg{
    uint64 msgid          = 1; //MQ中该消息的真实id
    bytes data            = 2; //具体的数据
    uint64 timestamp      = 3; //精确到ms
}

保存onenet-mq.proto文件至本地

Step 3 编译proto文件

根据语言编译该文件,以Java为例

protoc --java_out=$DST_DIR $SRC_DIR/onenet-mq.proto

Step 4 将编译生成的源文件添加到项目

以Java为例,编译后生成OnenetMq.java文件,将该源文件添加到项目,同时需添加java grpc的依赖,这里以maven管理为例,需要在pom.xml文件中添加如下内容:

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>1.18.0</version>
        </dependency>

Step 5 读取数据

  • 调用 parseFrom() 方法,创建Msg对象obj
  • 调用 obj.getMsgid() 获取消息id
  • 调用 obj.getData() 获取消息数据(查看数据格式
  • 调用 obj.getTimestamp() 获取消息毫秒级时间戳

Java示例如下:

OnenetMq.Msg obj;
obj = OnenetMq.Msg.parseFrom(mqttPayload);
System.out.println(obj.getMsgid());
System.out.println(new String(obj.getData().toByteArray()));
System.out.println(obj.getTimestamp());

更多帮助

个搜索结果,搜索内容 “

    0 个搜索结果,搜索内容 “