简介: 满足云端下发指令给设备端,同时需要设备端返回响应结果的场景
基于Pub/Sub模式的同步调用实战
1.同步调用场景
1.1 背景MQTT协议是基于PUB/SUB的异步通信模式,无法实现服务端下发指令给设备端,同时需要设备端返回响应结果的场景。
IoT物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。应用服务器通过POP API发起Rrpc调用,IoT设备端只需要在timeout内,按照固定的格式回复Pub消息,服务端即可同步获取IoT设备端的响应结果。
具体流程如下:[img=1602,358][/img]
1.2 Topic格式约定请求:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}
响应:/sys/${productKey}/${deviceName}/rrpc/**response**/**${messageId}**
$表示变量,每个设备不同
messageId为IoT平台生成的消息ID,
设备端回复responseTopic里的messageId要与requestTopic一致
示例:设备端需要订阅:
/sys/${productKey}/${deviceName}/rrpc/request/+
运行中设备收到Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
收到消息后,在timeout时间内回复Topic:
/sys/PK100101/DN213452/rrpc/response/443859344534
2.同步调用RRPC示例
2.1 设备端代码
- const mqtt = require('aliyun-iot-mqtt');
- const options = require("./iot-device-config.json");
- const client = mqtt.getAliyunIotMqttClient(options);
- client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
- client.on('message', function(topic, message) {
-
- if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
- handleRrpc(topic, message)
- }
- })
- function handleRrpc(topic, message){
- topic = topic.replace('/request/','/response/');
- console.log("topic=" + topic)
-
- const payloadJson = {code:200,msg:"handle ok"};
- client.publish(topic, JSON.stringify(payloadJson));
- }
复制代码
2.2 服务端POP调用Rrpc
const co = require('co');
- const RPCClient = require('@alicloud/pop-core').RPCClient;
- const options = require("./iot-ak-config.json");
- const client = new RPCClient({
- accessKeyId: options.accessKey,
- secretAccessKey: options.accessKeySecret,
- endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
- apiVersion: '2018-01-20'
- });
- const payload = {
- "msg": "hello Rrpc"
- };
- const params = {
- ProductKey:"a1gMu82K4m2",
- DeviceName:"h5@nuwr5r9hf6l@1532088166923",
- RequestBase64Byte:new Buffer(JSON.stringify(payload)).toString("base64"),
- Timeout:3000
- };
- co(function*() {
-
- const response = yield client.request('Rrpc', params);
- console.log(JSON.stringify(response));
- });
rrpc响应: {
- "MessageId": "1037292594536681472",
- "RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",
- "PayloadBase64Byte": "eyJ***2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",
- "Success": true,
- "RrpcCode": "SUCCESS"
- }
复制代码
3.物模型-服务同步调用InvokeThingService示例注意:物模型 服务调用 接口InvokeThingService,不是Rrpc
设备订阅subTopic注意:服务同步调用API是InvokeThingService
/sys/${productKey}/${deviceName}/rrpc/request/+
IoT云端下行的payload格式
{
"id": 3536123,
"version": "1.0",
"params": {
"入参key1": "入参value1",
"入参key2": "入参value2"
},
"method": "thing.service.{tsl.service.identifier}"
}
设备响应replyTopic
/sys/${productKey}/${deviceName}/rrpc/response/request的消息Id
设备响应payload格式
{
"id": 3536123,
"code": 200,
"data": {
"出参key1": "出参value1",
"出参key2": "出参value2"
}
}
3.1 物模型-同步服务定义[img=960,458][/img]
3.2 设备端实现
- const mqtt = require('aliyun-iot-mqtt');
- const options = require("./iot-device-config.json");
- const client = mqtt.getAliyunIotMqttClient(options);
- client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
- client.on('message', function(topic, message) {
-
- if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
- handleRrpc(topic, message)
- }
- })
- function handleRrpc(topic, message){
- topic = topic.replace('/request/','/response/');
- console.log("topic=" + topic)
-
- const payloadJson = {
- id: Date.now(),
- code:200,
- data: {
- currentMode: Math.floor((Math.random() * 20) + 10)
- }
- }
- client.publish(topic, JSON.stringify(payloadJson));
- }
复制代码
注意:设备端响应的payload要满足物模型定义的出参结构
3.3 服务端POP 接口InvokeThingService
const co = require('co');
- const RPCClient = require('@alicloud/pop-core').RPCClient;
- const options = require("./iot-ak-config.json");
- const client = new RPCClient({
- accessKeyId: options.accessKey,
- secretAccessKey: options.accessKeySecret,
- endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
- apiVersion: '2018-01-20'
- });
- const params = {
- ProductKey: "a1gMu82K4m2",
- DeviceName: "h5@nuwr5r9hf6l@1532088166923",
- Args: JSON.stringify({ "mode": "1" }),
- Identifier: "thing.service.setMode"
- };
- co(function*() {
- try {
-
- const response = yield client.request('InvokeThingService', params);
- console.log(JSON.stringify(response));
- } catch (err) {
- console.log(err);
- }
- });
调用结果: {
- "Data":{
- "Result": "{"currentMode":12}",
- "MessageId": "1536145625658"
- },
- "RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",
- "Success": true
- }
复制代码
|