OneNET物联网平台 平台介绍 入门手册 设备开发指南 应用开发指南
API
API使用 API列表 SDK MQTT LwM2M EDP Modbus TCP
消息队列MQ(公测) HTTP推送
服务定价 扩展服务 常见问题 发布公告

消息队列MQ消费端开发指南

消费协议

消息队列MQ目前支持客户端使用MQTT协议连接并消费数据,仅支持TLS加密

默认支持标准的 MQTT v3.1.1版本
连接协议 证书 地址 端口
MQTT 证书下载 183.230.40.96 8883

SDK 下载

推荐使用第三方SDK,请访问https://github.com/mqtt/mqtt.github.io/wiki/libraries

开发流程

消费端开发流程如下:

1 客户端连接

开通消息队列MQ服务之后,OneNET会为其分配一个唯一的Id,如下图,用于客户端连接鉴权参数之一。

MQ_ID

客户端可通过发送MQTT connnect报文与服务器建立连接,connect报文中三要素填写方法如下:

参数 是否必须 参数说明
clientId 用户自定义合法的UTF-8字符串,可为空
username 填写MQ_ID
password 填写token,算法见安全鉴权
其中:res=mqs/$MQ_ID

另外,connnect报文中,keepalive、will、session字段的使用限制如下:

功能是否支持说明
keepalive支持支持范围为:30~4800s
will不支持will、will retain 的flag必须为0,will qos必须为0
session不支持cleansession标记必须为1

2 订阅主题

  • 订阅报文

    订阅报文中topic格式如下,不支持通配符

    $sys/pb/consume/$MQ_ID/$TOPIC/$SUB

    订阅时request QoS必须大于0,否则订阅失败

  • 订阅确认报文

    平台采用MQTT SubAck报文进行订阅确认,返回成功时返回码固定为0x01

Request QoSSubAck返回码说明
00x80订阅失败
10x01订阅成功,最大QoS为1
20x01订阅成功,最大QoS为1
  • 取消订阅报文

    客户端使用MQTT Unsubscribe报文进行订阅取消

  • 取消订阅确认报文

    服务端使用MQTT UnsubAck报文进行订阅取消确认

3 消息消费

  • 订阅成功后MQ会根据生产消息的情况,通过publish报文的主动推送给客户端
  • 消费时MQ推送的消息只会是QoS1
  • 客户端消费到数据后需要按照获取到的数据先后顺序使用puback报文回复

4 数据解析

客户端接收到publish报文后,按照MQTT协议解析其payload数据段,然后按照如下步骤解析payload数据内容

Step 1 安装protobuf

Step 2 下载下述.proto文件

onenet-mq.proto接口文件如下:

syntax = "proto3";

package mq;

message Msg{
    uint64 msgid          = 1; //MQ中该消息的真实id
    bytes data            = 2; //具体的数据
}

保存onenet-mq.proto文件至本地

Step 3 编译proto文件

根据语言编译该文件,以Java为例

protoc --java_out=$DST_DIR $SRC_DIR/onenet-mq.proto

Step 4 将编译生成的源文件添加到项目

以Java为例,编译后生成OnenetMq.java文件,将该源文件添加到项目,同时需添加java grpc的依赖,这里以maven管理为例,需要在pom.xml文件中添加如下内容:

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>1.18.0</version>
        </dependency>

Step 5 读取数据

  • 调用parseFrom()方法,创建Msg对象obj
  • 调用 obj.getMsgid() 获取消息id
  • 调用 obj.getData() 获取消息数据(查看数据格式

Java示例如下:

OnenetMq.Msg obj;
obj = OnenetMq.Msg.parseFrom(mqttPayload);
System.out.println(obj.getMsgid());
System.out.println(new String(obj.getData().toByteArray()));

数据格式

用户可以通过配置消息队列MQ的数据源,将不同类型数据数据以统一的数据格式(json)分发至后续服务中,目前支持如下消息:

  • 数据流消息

    支持用户配置转发数据流中发生的新增数据点消息

  • 设备生命周期事件消息

    支持用户配置转发产品中设备生命周期事件消息(目前仅支持设备上下线消息

消息数据格式如下:

数据流消息

数据流消息数据格式如下:

参数属性类型说明示例
sysProperty messageTypestring消息类型:固定为deviceDatapoint
productIdstring产品ID90273
appProperty deviceIdstring设备ID102839
dataTimestampint设备数据点生产时间戳,单位毫秒,设备上传时可自定义携带15980987429000
datastreamstring数据流名称weather
body object/string/...详细的数据点消息内容见如下示例

数据json示例1,json数据

{
    "sysProperty": {
        "messageType": "deviceDatapoint",
        "productId": "90273",
    },
    "appProperty":{
        "deviceId": "102839",
        "dataTimestamp": 15980987429000,
        "datastream":"weather"
    },
    "body":{
        "temperature": 30,
        "humidity": "47%"
    }
}

数据json示例2,数值型数据

{
    "sysProperty": {
        "messageType": "deviceDatapoint",
        "productId": "90273",
    },
    "appProperty":{
        "deviceId": "102839",
        "dataTimestamp": 15980987429000,
        "datastream":"temperature"
    },
    "body": 10
}

数据json示例3,字符串型数据

{
    "sysProperty": {
        "messageType": "deviceDatapoint",
        "productId": "90273",
    },
    "appProperty":{
        "deviceId": "102839",
        "dataTimestamp": 15980987429000,
        "datastream":"weather"
    },
    "body":"sunny with wind"
}

数据json示例4,二进制数据

说明

  • 数据格式为二进制数据时,body中数据为二进制数据的索引号 index,示例如下,用户可以通过该索引号通过API获取该数据,见API详情
{
    "sysProperty": {
        "messageType": "deviceDatapoint",
        "productId": "90273",
    },
    "appProperty":{
        "deviceId": "102839",
        "dataTimestamp": 15980987429000,
        "datastream":"weather"
    },
    "body":{
        "index": "3491506_1475204886914_bin"
    }
}

设备生命周期事件消息

设备生命周期事件消息数据格式如下:

参数属性类型说明示例
sysProperty messageTypestring消息类型:固定为deviceLifeCycle
productIdstring产品ID90273
appProperty deviceIdstring设备ID102839
dataTimestampint设备消息生产时间戳,单位毫秒15980987429000
body object创建、删除、启用、应用、上线、下线 created/deleted/enabled/disabled/online/offline目前仅支持
online/offline

数据json示例

{
    "sysProperty": {
        "messageType": "deviceLifeCycle",
        "productId": "90273",
    },
    "appProperty":{
        "deviceId": "102839",
        "dataTimestamp": 15980987429000,
    },
    "body":{
        "event": "online"
    }
}

更多帮助

个搜索结果,搜索内容 “

    0 个搜索结果,搜索内容 “