消息队列MQ目前支持客户端使用MQTT协议连接并消费数据,仅支持TLS加密
默认支持标准的 MQTT v3.1.1版本
连接协议 | 证书 | 地址 | 端口 |
---|---|---|---|
MQTT | 证书下载 | 183.230.40.96 | 8883 |
推荐使用第三方SDK,请访问https://github.com/mqtt/mqtt.github.io/wiki/libraries
消费端开发流程如下:
进入消息队列MQ服务
开通消息队列MQ服务实例,输入实例名称(实例名称平台唯一),如下图,实例名称用于客户端连接鉴权参数之一。
客户端可通过发送MQTT connnect报文与服务器建立连接,connect报文中三要素填写方法如下:
参数 | 是否必须 | 参数说明 |
---|---|---|
clientId | 否 | 用户自定义合法的UTF-8字符串,可为空 |
username | 是 | 填写MQ实例名称 |
password | 是 | 填写token,算法见安全鉴权 其中:res=mqs/$MQ_NAME |
另外,connnect报文中,keepalive、will、session字段的使用限制如下:
功能 | 是否支持 | 说明 |
---|---|---|
keepalive | 支持 | 支持范围为:30~4800s |
will | 不支持 | will、will retain 的flag必须为0,will qos必须为0 |
session | 不支持 | cleansession标记必须为1 |
订阅报文
订阅报文中topic格式如下,不支持通配符
$sys/pb/consume/$MQ_NAME/$TOPIC/$SUB
订阅时request QoS必须大于0,否则订阅失败
订阅确认报文
平台采用MQTT SubAck报文进行订阅确认,返回成功时返回码固定为0x01
Request QoS | SubAck返回码 | 说明 |
---|---|---|
0 | 0x80 | 订阅失败 |
1 | 0x01 | 订阅成功,最大QoS为1 |
2 | 0x01 | 订阅成功,最大QoS为1 |
取消订阅报文
客户端使用MQTT Unsubscribe报文进行订阅取消
取消订阅确认报文
服务端使用MQTT UnsubAck报文进行订阅取消确认
客户端接收到publish报文后,按照MQTT协议解析其payload数据段,然后按照如下步骤解析payload数据内容
onenet-mq.proto接口文件如下:
syntax = "proto3";
package mq;
message Msg{
uint64 msgid = 1; //MQ中该消息的真实id
bytes data = 2; //具体的数据
uint64 timestamp = 3; //精确到ms
}
保存onenet-mq.proto文件至本地
根据语言编译该文件,以Java为例
protoc --java_out=$DST_DIR $SRC_DIR/onenet-mq.proto
以Java为例,编译后生成OnenetMq.java文件,将该源文件添加到项目,同时需添加java grpc的依赖,这里以maven管理为例,需要在pom.xml文件中添加如下内容:
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.18.0</version>
</dependency>
Java示例如下:
OnenetMq.Msg obj;
obj = OnenetMq.Msg.parseFrom(mqttPayload);
System.out.println(obj.getMsgid());
System.out.println(new String(obj.getData().toByteArray()));
System.out.println(obj.getTimestamp());