查看: 35937|回复: 13

[7月赛] 【我行我秀】各类应用MQTT接入OneNet心得

  [复制链接]

2

主题

5

帖子

23

积分

新手上路

Rank: 1

积分
23
发表于 2017-7-26 22:09:45 | 显示全部楼层 |阅读模式
本帖最后由 157******** 于 2017-7-26 22:09 编辑


各位朋友大家好,本人为重庆工商大学物联网工程大二在校学生,欢迎各位与我交流。同时,我们在学校创立了一个创意科技团队,CO团队,平时做一些物联网相关的小创意项目。希望大家了解一下我们,也欢迎各位与我联系。我的邮箱是chnhawk@foxmail.com

前言:本篇主要记录一下我个人使用mqtt协议上OneNet云时踩过的一些坑。之前使用过mosquito在自己的服务器上搭建的mqtt转发服务,但是和OneNet的使用还是有一些不一样的地方。前面记录的大都是开发的过程和一些小心得,后面会附上自己的测试代码。对于大佬来说可能这算不上什么干货,但是对于高校学生或者初学者来讲,还没有较多的开发经验,故作此篇文章,记录一下上云的心得。本文存在有错误以及不正规的地方,还恳请各位批评斧正!

一、Android设备使用MQTT协议接入OneNet平台
1. 用到的MQTT库:eclipse.paho
直接在app的gradle内dependencies添加依赖
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

注意:使用eclipse.paho一定记得要在AndroidManifest.xml中添加服务
<serviceandroid:name="org.eclipse.paho.android.service.MqttService"/>
否则没有办法正常使用

2. 连接上OneNet的服务器
需要设置
clientId为deviceId,即创建设备时的设备号
userName为productId,即创建产品的产品号
password为APIkey或者authInfo(即自己设置鉴权信息字符串)
因为APIkey太长不好记忆,所以我选择鉴权

1) 首先配置连接设置
MqttConnectOptions option = newMqttConnectOptions();
option.setUserName(productId);
option.setPassword(authInfo.toCharArray());
option.setCleanSession(false);

2) 调用mqttclient的连接方法
MqttAndroidClient mqttClient = newMqttAndroidClient(mContext, ("tcp://" + serverIP + ":" +serverPort), deviceId);
mqttClient.connet(option);
//OneNet的ip为183.230.40.39
//mqtt端口为6002

这样基本上就连接上OneNet的服务器了,ConnAck包这些是paho库帮我们完成的接收确认,我们只需要调用连接的这个函数即可,非常方便。

ps.当初看开发文档的时候一开始没有看到设置用户名这些,因为照着接入流程做,看到了服务器的ip和端口就直接跑去连接了,发现一直连接不上。直到再次看了一遍开发文档,拖到文档下面的常见问题才发现还需要设置这些。感觉这些重要的设置内容应该写在前面比较好,第一次没注意看,很尴尬。

3.发布及订阅自己的topic
这个很简单,之前用自己的服务器也是这样做的,
1) 订阅:
mqttClient.subscribe("topic",0); //参数一为topic字符串, 参数二为QoS级别

2) 发布:
MqttMessage msg = new MqttMessage();
msg.setPayload(payload.getBytes());  //发布内容
msg.setQos(1);                                 //设置发布级别
mqttClient.publish(topic, msg);          //发布

我们的项目是以topic来区别设备的
譬如,Android应用就订阅一个topic为Android/id,id为编号
底层设备就订阅一个topic为Device/id
然后发送指令直接向对应的topic发送就行

ps.一开始接入OneNet我也就是这么做的,使用的自己的topic来通信,完全把OneNet平台当成一个broker(转发器)了,到后面才发现我们简直是用高射炮打了蚊子,根本没有使用正确的姿势来使用OneNet平台。

4.使用mqtt上传数据点/获取数据
OneNet为每一个产品提供了数据流,在平台上记录并且可以生成数据的曲线,要想让我们通过mqtt协议上传的数据能够保存在数据流上,就得向$dp这个topic发送一串规定格式的数据。
我翻阅了一下OneNet的mqtt协议手册,没能搞清楚,翻车了。试了几次向$dp这个topic发送数据,在设备数据界面看都没有成功更新到数据,最让人疑惑的是,一旦发送了数据,设备就会掉线,需要重连。不太清楚OneNet平台是不是有什么发布数据就让你强制下线的机制。并且因为平台不允许订阅$打头的数据,所以也不清楚数据成功发送出去了没有。
起初我以为是OneNet服务器的问题,不过其他的topic还是可以正常的使用,并且换用平台提供的官方客户端,又可以成功发送且对数据进行更新。

OneNet提供的mqtt测试客户端

OneNet提供的mqtt测试客户端

(使用OneNet提供的mqtt测试程序上传数据点,同时可以看到平台上显示设备的数据更新了)

数据点显示更新

数据点显示更新
于是我在想是不是发送的格式还是不正确,就发现了这个

测试客户端的调试窗口显示刚刚发布的内容

测试客户端的调试窗口显示刚刚发布的内容
这个是官方软件的调试输出窗口,可以看到刚刚发布出去的payload为上图所示的一串16进制数据。于是我尝试将这段数据复制到mqtt.fx软件并进行发布。

ps.mqttfx是一款非常方便的测试mqtt的软件。不过在win7系统下经常容易出现窗口消失的bug,实质上是窗口跑到屏幕外面去了。可以通过在状态栏-右键-移动(M)-使用方向键让窗口回到屏幕内。

移动mqtt.fx窗口

移动mqtt.fx窗口
使用mqtt.fx进行测试

通过mqtt.fx发布消息

通过mqtt.fx发布消息
经过测试,很遗憾还是没有成功,这让我有些郁闷。

于是决定还是通过wireshark软件对OneNet平台提供的客户端进行抓包,看看他究竟发送的是什么。

OneNet平台提供的客户端发送的数据

OneNet平台提供的客户端发送的数据

可以看到确实是向$dp这个topic发送了消息,7b是”{”转为16进制,那么前面的01 00 5e就是协议文档里面提到的发送类型、长度高位、长度低位了。
我之前发送的也确实是按照这个协议走的啊,为什么不成功呢?

于是再来抓一下我之前程序发送的内容(这里丢人了,捂脸)

我自己发送的数据

我自己发送的数据
附上我之前的代码:

错误的代码示范

错误的代码示范

这样问题就显而易见了。
协议中的格式讲的是发送16进制的01 00 length,而通过我之前编写的这段代码发送出去的是字符串”01 00length”,同理,之前测试通过mqttfx发送出去的就更不对了。

故赶紧修改代码为:

修改之后的代码

修改之后的代码
这样就能成功地将数据库上传到OneNet平台了。

总结一下,在这一步翻车的原因还是对mqtt协议理解的不够充分,错把16进制字符串当做16进制在发送。做物联网项目,会经常在16进制数组和16进制字符串之间来回切换,一定要弄清楚,不然像这样因为这样一个小问题而耗费大量时间去处理、测试就很不划算了。

下面附上自己编写的基于paho的mqtt使用帮助类:
  1. import android.content.Context;
  2. import android.util.Log;

  3. import org.eclipse.paho.android.service.MqttAndroidClient;
  4. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  5. import org.eclipse.paho.client.mqttv3.MqttCallback;
  6. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  7. import org.eclipse.paho.client.mqttv3.MqttException;
  8. import org.eclipse.paho.client.mqttv3.MqttMessage;

  9. import java.util.ArrayList;

  10. /**
  11. * Created by CHNhawk on 2017/3/7
  12. * 重庆工商大学 物联网工程 熊廷宇
  13. * update on 2017/5/16
  14. * 添加了消息队列转发以及MsgHandler接口
  15. * 支持更好地用于多Activity
  16. *
  17. */

  18. public class MqttUtil {

  19.     //region 网络设定
  20.     private static String serverIP = "183.230.40.39";               // 服务器ip
  21.     private static int serverPort = 6002;                           // 服务器端口
  22.     private static String deviceId = "填设备id";                            // 终端id
  23.     private static String userId = "填应用id";                           // 服务器登陆名
  24.     private static String password = "填鉴权或apiKey";                         // 服务器登陆密码
  25.     //endregion

  26.     //region 消息类型
  27.     public static final int MQTT_CONNECTED = 1000;                 // mqtt连接成功
  28.     public static final int MQTT_CONNECTFAIL = 1001;               // mqtt连接失败
  29.     public static final int MQTT_DISCONNECT = 1002;                // mqtt断开连接

  30.     public static final int MQTT_SUBSCRIBED = 1010;                // 订阅成功
  31.     public static final int MQTT_SUBSCRIBEFAIL = 1011;             // 订阅失败

  32.     public static final int MQTT_MSG_TEST = 2001;                  // 接收到TEST消息

  33.     public static final int MQTT_PUBLISHED = 2010;                 // 发布成功
  34.     public static final int MQTT_PUBLISHFAIL = 2011;               // 发布失败
  35.     //endregion

  36.     //region MQTT客户端服务
  37.     private static MqttUtil instance;                               // 单例对象
  38.     private MqttAndroidClient mqttClient;                           // mqtt客户端
  39.     private MqttConnectOptions option;                              // mqtt设置
  40.     private MqttCallback clientCallback;                            // 客户端回调
  41.     private Context mContext;                                       // 上下文
  42.     private ArrayList<MsgHandler> listenerList = new ArrayList<>(); // 消息接收者
  43.     //endregion

  44.     //封闭构造函数
  45.     private MqttUtil(){}

  46.     //获取单例
  47.     public static MqttUtil getInstance(){
  48.         if(instance == null){
  49.             instance = new MqttUtil();
  50.             //deviceId =  android.os.Build.SERIAL + (int)(Math.random() * 10);
  51.         }
  52.         return instance;
  53.     }

  54.     //初始化连接
  55.     public void initMqtt(Context context) {
  56.         mContext = context;
  57.         connect();
  58.     }

  59.     //重连
  60.     public void reConnect() {
  61.         //尝试重连
  62.         connect();

  63.     }

  64.     //发布消息
  65.     public void publish(String topic, String payload) {
  66.         MqttMessage msg = new MqttMessage();
  67.         msg.setPayload(payload.getBytes());
  68.         msg.setQos(1);
  69.         if (mqttClient != null){
  70.             if(mqttClient.isConnected()) {
  71.                 try {
  72.                     mqttClient.publish(topic, msg);
  73.                     DispachEvent(MQTT_PUBLISHED);
  74.                 } catch (MqttException e) {
  75.                     //发布失败
  76.                     Log.e("mqtt", "发布失败" + e);
  77.                     DispachEvent(MQTT_PUBLISHFAIL);
  78.                 }
  79.             } else {
  80.                 Log.e("mqtt", "网络未连接 - 尝试重新连接" );
  81.                 connect();
  82.                 DispachEvent(MQTT_PUBLISHFAIL);
  83.             }
  84.         } else {
  85.             Log.e("mqtt", "客户端初始化失败" );
  86.             DispachEvent(MQTT_PUBLISHFAIL);
  87.         }
  88.     }

  89.     //订阅主题
  90.     public void subscribe(String topic){
  91.         try {
  92.             //订阅消息
  93.             mqttClient.subscribe(topic, 0);
  94.             DispachEvent(MQTT_SUBSCRIBED);
  95.         } catch (MqttException e) {
  96.             DispachEvent(MQTT_SUBSCRIBEFAIL);
  97.             Log.e("mqtt", "订阅错误:" + e);
  98.         }
  99.     }

  100.     //返回是否连接
  101.     public boolean isConnected(){
  102.         return mqttClient.isConnected();
  103.     }

  104.     //获取本机deviceId
  105.     public String getDeviceId(){
  106.         return deviceId;
  107.     }

  108.     //连接
  109.     private void connect() {
  110.         if (mqttClient == null) {
  111.             option = new MqttConnectOptions();
  112.             option.setUserName(userId);
  113.             option.setPassword(password.toCharArray());
  114.             option.setCleanSession(false);
  115.             //设置回调
  116.             clientCallback = new MqttCallback() {
  117.                 @Override
  118.                 public void connectionLost(Throwable cause) {
  119.                     //断开连接
  120.                     DispachEvent(MQTT_DISCONNECT);
  121.                 }

  122.                 @Override
  123.                 public void messageArrived(String topic, MqttMessage message) throws Exception {
  124.                     //接收到消息
  125.                     Log.v("mqtt", "接收到信息:" + topic);
  126.                     DispachMessage(topic, message);
  127.                 }
  128.                 @Override
  129.                 public void deliveryComplete(IMqttDeliveryToken token) {
  130.                     //publish成功后调用
  131.                     Log.v("mqtt","发送成功");
  132.                 }
  133.             };
  134.             try {
  135.                 mqttClient = new MqttAndroidClient(mContext, ("tcp://" + serverIP + ":" + serverPort), deviceId);
  136.                 mqttClient.setCallback(clientCallback);
  137.             } catch (Exception e) {
  138.                 Log.e("mqtt", "启动服务错误:" + e);
  139.             }
  140.         }
  141.         if (!mqttClient.isConnected()) {
  142.             //匿名连接线程
  143.             new Thread(new Runnable() {
  144.                 @Override
  145.                 public void run() {
  146.                     try {
  147.                         int count = 0;  //重试次数
  148.                         while(count < 5 && !mqttClient.isConnected()) {
  149.                             mqttClient.connect(option);
  150.                             Thread.sleep(1000);
  151.                             count ++;
  152.                         }
  153.                         //连接成功
  154.                         if(mqttClient.isConnected()) {
  155.                             DispachEvent(MQTT_CONNECTED);
  156.                             Log.v("mqtt", "连接成功");
  157.                         } else {
  158.                             Log.e("mqtt", "连接网络错误");
  159.                             DispachEvent(MQTT_CONNECTFAIL);
  160.                         }
  161.                     } catch (Exception e) {
  162.                         Log.e("mqtt", "连接错误:" + e);
  163.                         DispachEvent(MQTT_CONNECTFAIL);
  164.                     }
  165.                 }
  166.             }).start();
  167.         }
  168.     }

  169.     //region 消息转发部分
  170.     //添加接收者
  171.     public void addListener(MsgHandler msgHandler){
  172.         if(!listenerList.contains(msgHandler)) {
  173.             listenerList.add(msgHandler);
  174.         }
  175.     }

  176.     //移除接收者
  177.     public void removeListener(MsgHandler msgHandler){
  178.         listenerList.remove(msgHandler);
  179.     }

  180.     //移除所有接收者
  181.     public void removeAll(){
  182.         listenerList.clear();
  183.     }

  184.     //发送消息
  185.     public void DispachMessage(String type, Object data){
  186.         if(listenerList.isEmpty()) {
  187.             Log.v("mqtt", "没有消息接收者:" + type);
  188.             return;
  189.         }
  190.         Log.v("mqtt", "发送消息:" + type);
  191.         for (MsgHandler msgHandler : listenerList)
  192.         {
  193.             msgHandler.onMessage(type, data);
  194.         }
  195.     }

  196.     //发送事件
  197.     public void DispachEvent(int event){
  198.         if(listenerList.isEmpty()) {
  199.             Log.v("mqtt", "没有消息接收者:");
  200.             return;
  201.         }
  202.         Log.v("mqtt", "派发事件:" + event);
  203.         for (MsgHandler msgHandler : listenerList)
  204.         {
  205.             msgHandler.onEvent(event);
  206.         }
  207.     }
  208.     //endregion

  209. }
复制代码
消息接口如下
  1. /**
  2. * Created by CHNhawk on 2017/5/16.
  3. * 重庆工商大学 物联网工程 熊廷宇
  4. * 自定义的消息接口
  5. */
  6. public interface MsgHandler
  7. {
  8.     /**
  9.      * 消息
  10.      * @param type 消息类型
  11.      * @param data 数据
  12.      */
  13.     void onMessage(String type, Object data);

  14.     /**
  15.      * 事件
  16.      * @param event x
  17.      */
  18.     void onEvent(int event);
  19. }
复制代码
这样就可以很方便的使用mqtt通过Android设备连接上OneNet服务器了。

二、C#使用MQTT协议接入OneNet平台
讲完了Android,再来简单介绍一下其他平台或者应用接入OneNet平台的方式吧。
网上虽然有大量关于mqtt的文章,不过我再复述一遍,增加一下自己的印象也是极好的。
这里说一句,C#用来编写.NET,同时也可以用在Unity引擎中,再导出到各个平台,C#也可以用来写Android程序,哈哈,也是非常方便的!

这里附上一张我使用Unity+mqtt实现的具有房间聊天功能的安卓对战小游戏,游戏可以同步玩家数据,实现登陆、同一房间聊天的功能。为了练习,网络通信部分我完全使用的mqtt协议,测试联机的效果还不错。

自制的mqtt小游戏mcio

自制的mqtt小游戏mcio

我们知道untiy用于AR以及VR项目也是很火的,而物联网+AR/VR这个组合也是很火爆,下面我就介绍一下unity怎么使用C#中的mqtt连接上OneNet平台。纯C#的.NET应用同理~

首先需要的mqtt库,M2Mqtt
这个网上可以下载到源码,在unity中我们需要把它导出为dll文件来调用。
注意,导出的时候记得选择3.5版的框架,否则Unity不兼容。
然后将导出的dll文件移动到项目的\Assets\Plugins文件夹下,注意这里只能放在这个文件夹下,不然没有用。
然后我们就可以愉快地使用m2mqtt了。
1. 导入包
using uPLibrary.Networking.M2Mqtt;
usinguPLibrary.Networking.M2Mqtt.Messages;
2. 创建mqtt客户端对象并且进行连接
MqttClient mqttClient;
//设置ip、端口
mqttClient = new MqttClient(“183.230.40.39”,6002, false, null);      
mqttClient. Connect(clientId, username,password);
//订阅主题
mqttClient.Subscribe(new string[] {topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
//取消订阅
mqttClient.Unsubscribe(new string[] {topic });
//发布信息
mqttClient.Publish(topic,Encoding.UTF8.GetBytes(payload));


下面附上自己编写的辅助类:
  1. using UnityEngine;
  2. using System.Collections;
  3. using System.Net;
  4. using System.Text;
  5. using uPLibrary.Networking.M2Mqtt;
  6. using uPLibrary.Networking.M2Mqtt.Messages;
  7. using System;

  8. // mqtt网络通讯模块
  9. public class MqttNetwork
  10. {

  11.     private static MqttNetwork instance; //单例的网络连接对象
  12.     private MqttClient mqttClient; //mqtt客户端
  13.     private EventManager manager; //事件管理

  14.     private MqttNetwork()
  15.     {
  16.         //构造函数
  17.         manager = EventManager.GetInstance();
  18.         //initNetWork();
  19.     }

  20.     public static MqttNetwork GetInstance()
  21.     {
  22.         //获取实例
  23.         if (instance == null)
  24.         {
  25.             instance = new MqttNetwork();
  26.         }
  27.         return instance;
  28.     }

  29.     public void initNetWork()
  30.     {
  31.         mqttClient = new MqttClient(“183.230.40.39”, 6002, false, null);
  32.         //注册服务器返回信息接受函数
  33.         mqttClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
  34.         //注册服务器断开函数
  35.         mqttClient.ConnectionClosed += client_ConnectionClosed;
  36.     }

  37.     public void subsribeTopics(string topic)
  38.     {  //订阅主题
  39.         mqttClient.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
  40.     }

  41.     public void unSubsribeTopics(string topic)
  42.     {  //取消订阅主题
  43.         if (mqttClient != null)
  44.         {
  45.             mqttClient.Unsubscribe(new string[] { topic });
  46.         }
  47.     }

  48.     public void publish(string topic, string payload)
  49.     {//发布消息
  50.         if (mqttClient != null)
  51.         {
  52.             mqttClient.Publish(topic, Encoding.UTF8.GetBytes(payload));
  53.         }
  54.     }

  55.     public void connect()
  56.     {
  57.         if (mqttClient == null)
  58.         {
  59.             //初始化mqtt客户端
  60.             initNetWork();
  61.         }
  62.         if (!mqttClient.IsConnected)
  63.         {
  64.             //若未连接则进行连接
  65.             try
  66.             {
  67.                 mqttClient.Connect(“clientId”);//客户端ID
  68.                 if (mqttClient.IsConnected)
  69.                 {
  70.                     //订阅需要的主题
  71.                     subsribeTopics("test")
  72.                 }
  73.             }
  74.             catch (System.Exception e)
  75.             {
  76.                 Debug.Log("连接出错 : " + e);
  77.                 manager.DispachEvent("mqtt-disconnected", null);
  78.               
  79.             }
  80.         }
  81.     }

  82.     public void disConnect()
  83.     {
  84.         //关闭连接
  85.         if (mqttClient != null && mqttClient.IsConnected)
  86.         {
  87.             mqttClient.Disconnect();
  88.         }
  89.     }

  90.     public bool isNetConnected()
  91.     {
  92.         if (mqttClient == null)
  93.         {
  94.             return false;
  95.         }
  96.         return mqttClient.IsConnected;        
  97.     }

  98.     void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs package)
  99.     {
  100.         //过滤并处理接收到的消息
  101.         string topic = package.Topic;
  102.         string msg = System.Text.Encoding.UTF8.GetString(package.Message);
  103. //To do sth.
  104.     }

  105.     private void client_ConnectionClosed(object sender, EventArgs e)
  106.     {
  107.         Debug.Log("[系统] : 服务器已断开");
  108.         manager.DispachEvent("mqtt-disconnected", null);
  109.     }
  110. }
复制代码
用到的一个事件转发接口
  1. public interface EventHandler
  2. {
  3.     /// <summary>
  4.     ///
  5.     /// </summary>
  6.     /// <param name="type"></param>
  7.     /// <param name="data"></param>
  8.     void OnEvent(string type, object data);
  9. }
复制代码
事件转发的实现
  1. using UnityEngine;
  2. using System.Collections.Generic;

  3. //事件管理类
  4. public class EventManager
  5. {
  6.     private static EventManager instance; // 同样使用单例模式

  7.     private Dictionary<string, List<EventHandler>> dicHandler; //事件句柄集合

  8.     private EventManager()
  9.     {
  10.         //内部的构造函数,不对外开放
  11.         dicHandler = new Dictionary<string, List<EventHandler>>();
  12.     }

  13.     public static EventManager GetInstance()
  14.     {
  15.         if (instance == null)
  16.         {
  17.             instance = new EventManager();
  18.         }
  19.         return instance;
  20.     }

  21.     /// <summary>
  22.     /// 注册事件监听
  23.     /// </summary>
  24.     /// <param name="type">监听类型</param>
  25.     /// <param name="listher">监听对象</param>
  26.     public void AddEventListener(string type, EventHandler listher)
  27.     {
  28.         if (!dicHandler.ContainsKey(type))
  29.         {
  30.             dicHandler.Add(type, new List<EventHandler>());
  31.         }
  32.         dicHandler[type].Add(listher);
  33.     }

  34.     /// <summary>
  35.     /// 移除对type的所有监听
  36.     /// </summary>
  37.     /// <param name="type"></param>
  38.     public void RemoveEventListener(string type)
  39.     {
  40.         if (dicHandler.ContainsKey(type))
  41.         {
  42.             dicHandler.Remove(type);
  43.         }
  44.     }

  45.     /// <summary>
  46.     /// 移除监听者的所有监听
  47.     /// </summary>
  48.     /// <param name="listener">监听者</param>
  49.     public void RemoveEventListener(EventHandler listener)
  50.     {
  51.         foreach (var item in dicHandler)
  52.         {
  53.             if (item.Value.Contains(listener))
  54.             {
  55.                 item.Value.Remove(listener);
  56.             }
  57.         }
  58.     }

  59.     /// <summary>
  60.     /// 清空所有监听事件
  61.     /// </summary>
  62.     public void ClearEventListener()
  63.     {
  64.         Debug.Log("清空对所有所有所有事件的监听");
  65.         if (dicHandler != null)
  66.         {
  67.             dicHandler.Clear();
  68.         }
  69.     }

  70.     /// <summary>
  71.     /// 派发事件
  72.     /// </summary>
  73.     /// <param name="type">事件类型</param>
  74.     /// <param name="data">事件传达的数据</param>
  75.     public void DispachEvent(string type, object data)
  76.     {
  77.         if (!dicHandler.ContainsKey(type))
  78.         {
  79.             Debug.Log("未添加任何" + type + " 类型的监听器");
  80.             return;
  81.         }

  82.         //Debug.Log("派发事件:" + type);
  83.         List<EventHandler> list = dicHandler[type];
  84.         for (int i = 0; i < list.Count; i++)
  85.         {
  86.             list[i].OnEvent(type, data);
  87.         }
  88.     }
  89. }
复制代码
这样,在自己的接收类实现OnEvent方法就行

三、NodeJS使用MQTT协议接入OneNet平台
NodeJs是近年比较火热的一个服务器js环境,可以在Linux系统上运行,配合mqtt实现一些功能,效率挺不错的。这里提供两个mqtt+nodejs的思路:
其一,用于树莓派等基于Linux系统的物联网设备连接至OneNet平台。
其二,用于自己的服务器配置mqtt转发服务以及一些自动处理服务,这个和OneNet平台就没有关系了,不过可以用在物联网相关项目上,让服务器自动完成一些数据的处理,我在这里也简单提一下。

首先讲其一:
树莓派一直是硬件爱好者比较喜欢的玩具设备(雾),也可以用树莓派来做一些物联网相关的东西,那么运行Linux系统的树莓派使用nodejs来连接上OneNet平台也是很不错的。不久前我尝试在树莓派上通过Python获取传感器数据并保存在本地文件中,然后再通过NodeJs上传到服务器,这样就可以在手机app上同步获取到传感器数据了。
废话不多说,首先下载配置nodejs+npm环境,这一块我不过多叙述,网上有太多太多的教程和步骤详细截图,本部分重点讲mqtt模块。
一般来说使用
apt-get install nodejs
apt-get install npm
这两个命令就可以安装,安装完毕可以输入node -v查看版本。
无法安装的可以先对apt update/upgrade一下,实在不行就或者直接wget下载nodejs解压。
要运行mqtt模块,nodejs版本不能太低,版本低的话可以安装n模块来升级nodejs,下面的内容假定你已经配置好了合适的nodejs环境。

nodejs中提供mqtt服务的有很多模块,我们就用mqtt这个模块,简单粗暴。
使用npm install mqtt命令安装mqtt模块
安装完成后,便可以实现一个mqtt的客户端连接至服务器的脚本了。


附上我自己编写的一个测试脚本:
  1. /// nodejs上传数据测试脚本
  2. /// 使用MQTT协议
  3. /// author CHNhawk (重庆工商大学 15物联网工程 熊廷宇)

  4. var mqtt = require('mqtt');                         // 导入mqttCilent模块
  5. var ip = '183.230.40.39';                           // oneNet服务器ip
  6. var port = '6002';
  7. var deviceId = '4734262';                           // 设备号
  8. var userName = '81695';                             // 用户名-产品id
  9. var passWord = '12345678';                          // 密码-鉴权

  10. // 创建连接
  11. var client = mqtt.connect('mqtt://' + ip + ':' + port, {
  12.     username: userName,
  13.     password: passWord,
  14.     clientId: deviceId
  15. });

  16. var isConnected = false;        // 是否连上服务器的标识

  17. console.log('Connecting to ' ,ip + ':' + port + '...');

  18. client.on('connect', function() {
  19.     // 连接成功回调函数
  20.     console.log('Server is connected to', ip + ':' + port);   
  21.     isConnected = true;
  22.     // 订阅主题 - 指令接收topic
  23.     client.subscribe('ctbu_cmd/' + deviceId);
  24.     console.log('Subscribe', 'ctbu_cmd/' + deviceId);
  25. });

  26. client.on('message', function(topic, message) {
  27.     // 接收信息回调函数
  28.     console.log(topic.toString() + ' - ' + message.toString());
  29.     var payloadMsg = message.toString();
  30.     switch (topic) {        
  31.         case 'ctbu_cmd/' + deviceId:
  32.             console.log('Received command: ', payloadMsg);
  33.             /////////////
  34.             //以下处理命令
  35.             /////////////
  36.             break;
  37.     }
  38. });

  39. client.on('offline', function(error){
  40.     // 掉线回调函数
  41.     isConnected = false;
  42.     console.log('Connecting failed cz:', error);
  43. });

  44. client.on('error', function(){
  45.     // 连接错误回调函数
  46.     isConnected = false;
  47.     console.log('Lost connecting :', ip + ':' + port);
  48. });

  49. setInterval(function() {
  50.     // 设置定时上传数据
  51.     if(isConnected){
  52.         var time = getTime();
  53.         var msg = "value" + time;   
  54.         var payload = buf.join("");
  55.         // 向topic发布数据
  56.         client.publish('ctbu_data', payload);
  57.     } else {
  58.         console.log('Offline state!');        
  59.     }
  60. }, 5000);

  61. //获取当前时间
  62. function getTime() {
  63.     return new Date().toLocaleString();
  64. }
复制代码
其二,在Linux服务器上搭建一个mqtt转发器
那么假定你已经配置好了nodejs。
使用npm install mosca 命令安装mosca模块
mosca是一个能够创建mqtt服务器的一个模块,使用起来也很方便。

我在之前提到的小游戏就是用这个模块加上mongodb数据库搭建的服务器,比较简单,效果也还好,700多行代码就搞定了。对于我那种高并发状态的游戏十几个人一起玩玩还是妥妥的,没做过压力测试,我的服务器的买的阿里云的学生专用版,9.9一个月那种,不敢奢求太多。不过用于物联网项目,还是够用。


也附上我自己编写的一个简单的测试脚本
  1. /// nodejs服务器测试脚本
  2. /// 使用MQTT协议
  3. /// author CHNhawk (重庆工商大学 15物联网工程 熊廷宇)

  4. var mosca = require('mosca');          // 导入mosca模块

  5. // 创建mqtt服务器对象并设置开放端口
  6. var MqttServer = new mosca.Server({  
  7.     port: 1883  
  8. });  
  9.   
  10. MqttServer.on('clientConnected', function(client){  
  11.         // 设备连接回调函数
  12.     console.log('client connected', client.id);  
  13. });

  14. MqttServer.on('published', function(packet, client) {  
  15.         // 发布topic回调函数
  16.     var topic = packet.topic;  
  17.     console.log('message-arrived--->','topic :'+topic+',message :'+ packet.payload.toString());  
  18. });  
  19.   
  20. MqttServer.on('ready', function(){  
  21.         // 成功运行服务
  22.     console.log('mqtt is running...');  
  23. });
复制代码
后记:这篇文章是参加实训需要完成的任务之一,之前很少在网上发布类似的帖子,虽然平时倒是经常写点文档做些记录,不过都是写给自己看的。由于本人技术水平及文字功底有限,如果大家有什么没有看明白或者质疑的地方,非常欢迎各位给我拍砖。希望与大家共同进步,最后祝OneNet平台越办越好!
在网上编辑帖子感觉比写文档还要麻烦一点,还不是很习惯....
回复

举报

0

主题

4

帖子

9

积分

新手上路

Rank: 1

积分
9
发表于 2020-8-11 16:50:09 | 显示全部楼层
157****6039 发表于 2017-7-26 22:41
为什么我的昵称显示是手机号啊.....尴尬,不过也不知道这个贴子有没有人看 ...

node.js用mosca模块转发,没有mqtt数据,请问你是怎么解决的

0

主题

4

帖子

9

积分

新手上路

Rank: 1

积分
9
发表于 2019-3-14 14:41:30 | 显示全部楼层
Javascript   能连接OneNet的MQTT服务器不,有代码没

0

主题

2

帖子

3

积分

新手上路

Rank: 1

积分
3
发表于 2018-5-13 11:58:24 | 显示全部楼层
你好,感谢你的分享,我是在做毕业设计的学生,项目和你的有类似之处,能加一下你QQ吗?

1

主题

8

帖子

21

积分

新手上路

Rank: 1

积分
21
发表于 2017-10-15 21:52:27 | 显示全部楼层
求助楼主
Mq**roidClient mqttClient = new Mq**roidClient(mContext, ("tcp://" + serverIP + ":" +serverPort), deviceId);
这里的mContext应该填什么参数

1

主题

8

帖子

21

积分

新手上路

Rank: 1

积分
21
发表于 2017-10-15 21:33:04 | 显示全部楼层
求助楼主
Mq**roidClient mqttClient = new Mq**roidClient(mContext, ("tcp://" + serverIP + ":" +serverPort), deviceId);
这句的mContext应该填什么参数啊?

0

主题

1

帖子

3

积分

新手上路

Rank: 1

积分
3
发表于 2017-9-3 19:13:34 | 显示全部楼层
你会的语言和平台真多!!

2

主题

3

帖子

8

积分

新手上路

Rank: 1

积分
8
发表于 2017-8-26 14:26:47 | 显示全部楼层
你好,写的不错,我这里有个小项目适合你们练手,有兴趣加我微信 18952630369

2

主题

5

帖子

23

积分

新手上路

Rank: 1

积分
23
 楼主| 发表于 2017-8-9 18:48:34 | 显示全部楼层
152******** 发表于 2017-8-2 10:31
有人看啊,,,就是太长了

哈哈,接触的平台比较多,就都写了一点...第一次写帖子,可能是有些啰嗦了,见谅

23

主题

1078

帖子

2219

积分

版主

Rank: 7Rank: 7Rank: 7

积分
2219
发表于 2017-8-2 10:31:18 | 显示全部楼层
有人看啊,,,就是太长了

2

主题

5

帖子

23

积分

新手上路

Rank: 1

积分
23
 楼主| 发表于 2017-7-27 10:08:33 | 显示全部楼层
cdd 发表于 2017-7-27 09:18
内容很详细的喔
昵称那个,针对其他用户是显示不全的,所以你不用担心。 ...

哦哦,好的,谢谢管理姐姐

2

主题

5

帖子

23

积分

新手上路

Rank: 1

积分
23
 楼主| 发表于 2017-7-26 22:41:14 | 显示全部楼层
为什么我的昵称显示是手机号啊.....尴尬,不过也不知道这个贴子有没有人看

95

主题

575

帖子

2201

积分

金牌会员

Rank: 5Rank: 5

积分
2201
发表于 2017-7-27 09:18:46 | 显示全部楼层
内容很详细的喔
昵称那个,针对其他用户是显示不全的,所以你不用担心。
1、OneNET交流群6:887624121
该群目前非常活跃,欢迎大家参与进来,交流,讨论,答疑,解惑~~
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表