Skip to content
ogofly edited this page Jul 5, 2023 · 15 revisions

目标

参照目前各大公有云对 iothub 的成熟的核心抽象和设计,完成一个最小化的 iothub,主要内容包括

  • 设备连接:当前只需支持 MQTT
  • 影子模型:tags、state(desired、reported)
  • 设备直接方法(direct method)

目的:

  • 方便私有化部署
  • 统一解决一类问题(设备和服务端的交互),不重复造轮子,提高效率,有所积累

影子模型 主要参考 aws iot 和 azure

基本架构

  • api : 对外提供 RESTful API ,包括设备管理、设备影子操作、设备直接方法调用等
  • core: iothub 核心层,包括设备管理、设备影子模型、设备认证、任务管理
    • thing: 设备增删改查、启用禁用,认证方式,定位为最基础必要的管理,更多需求用 shadow 实现
    • shadow: 设备影子模型,设备配置、控制、状态
    • auth: 设备认证和授权(authn, authz)。目前只支持密钥认证,后续可扩展多种认证,比如设备证书;授权方面限定设备对指定 topic 的读写
    • job: 任务管理,可以创建一些自动执行的任务,完成设备的管理、批量处理,如 OTA、配置下发
  • connector: 设备连接,可适配多类协议。目前只支持 MQTT(包括 MQTT over Websocket)

为了避免和各种语境下的 "device" 产生混淆,在 iothub 中统一用 thing 的概念, “device" 可能是一个独立物理设备,一整套业务意义的软硬件整体,或是实际上分开但业务逻辑上一体的东西 等等 对于 iothub ,thing 则是被管理的基础单位,至于开发意义上或者业务意义上是不是 device 并不在其范畴内

tio 与设备、业务系统交互关系 —— 具体到实现的范畴

shadow、direct-method、job的区别与使用

区别:

  • shadow 重在声明和异步 —— 记录 state ,且能通过 state 的 desired 和 reported 的比对,异步地实现对设备的配置、控制等功能
  • direct method 重在及时 —— 服务端使用 HTTP 调用随即得到响应
  • job 批量、定时、异步,且有历史记录 —— 适合于类似 OTA 这样的任务

使用选择:

  • 只关心最终 state 用 shadow
  • 想立即得到结果用 direct method
  • 关心各个执行状态和记录,希望异步批量执行,用 job

基础数据格式

thing

{
  "thingId": "xxxx" , // 设备唯一 ID
  "enabled": true,  // 默认都是启用
  "authType": "password", // 或 ca 证书--当前未实现 ca 证书
  "authValue": "xxxx" // 根据 authType 类型用于设备认证的数据:密码、证书
}

shadow

主要参考 aws

{
  "thingId": "myThingId",
  "connected": true,
  "connectedAt": "2022-07-04T03:28:34.515Z",
  "disconnectedAt": "2022-07-04T03:28:34.515Z",

  "version": 1,  // 用于控制并发更新
  
  "state": {
    "desired": {
      "config": {
        "period": 300
      }
    },
    "reported": {
      "connectivity": {
        "type": "cellular"
      },
      "config": {
        "period": 300,
        "status": "Success"
      }
    }
  },
  
  "metadata": {
    "desired": {
      "config": {
        "period": {
          "timstamp": 1662514061000
        }
      }
    },
    "reported": {
      "connectivity": {
        "type": : {
          "timstamp": 1662514061000
        }
      },
      "config": {
        "period": : {
          "timstamp": 1662514061000
        },
        "status": ": {
          "timstamp": 1662514061000
        }"
      }
    }
  }
}

消息定义

以下灰色字体部分暂不实现

消息 topic

d2s 代表是消息流向是设备到服务端,同理理解 s2d 和 s2s

设备消息前缀:devprefix = $iothub/things/{thingId}

thing

  • {devprefix}/messages/property 设备指标属性上报 d2s,数据格式待定义
  • {devprefix}/messages/will/# 设备遗言消息 d2s ,格式由设备自定义
  • {devprefix}/presence 设备状态变化(上下线) s2s 数据格式

用户消息

  • $iothub/user/things/{thingId}/# 自定义设备 topic , d2s s2s,数据格式用户自定义

shadow

prefix = $iothub/things/{thingId}/shadows/name/default

shadow 目前默认只有一个 default ,此处参照了 aws 定义,设计上预留了后续可以一个设备多个 shadow 的可能性

设备获取 shadow

  • {prefix}/get 设备获取 shadow,d2s, payload 空对象 {}
  • {prefix}/get/accepted 成功获取 shadow,s2d , shadow 格式
  • {prefix}/get/rejected 拒绝获取 shadow,s2d, shadow 请求响应

设备更新 shadow(只能对 reported更新)

设备对 shadow reported 更新或者服务端对shadow desired 更新成功后,产生以下两个 topic 的数据通知

ntp

对于没有能力集成常规 ntp 服务的设备来说, tio 提供了 ntp 相关的 topic 和消息,供设备得到尽量准确的时间校准。参考了阿里云物联网平台ntp服务的设计。

method

设备直接方法

  • {devprefix}/methods/{name}/req s2d
{		
  "clientToken": "xxx",  			 // iothub 生成
  "data": {                    // 所有合法 json 值
    "input1": "someInput",
    "input2": "anotherInput"
  }
}
  • {devprefix}/methods/{name}/resp d2s
{
  "clientToken": "xxx",    // 和 req 收到的对应
  "code": 200, 					 	 // 类似 http,400 请求参数错误, 404 设备不存在, 504 设备超时未响应
  "messge": "OK",
  "data" : {...}
}

rpc

设备调用云端方法?? 必要性待讨论

  • {devprefix}/rpc/{methodName}/req d2s
  • {devprefix}/rpc/{methodName}/resp s2d

OTA

未在 tio 中实现,可在 tio 外部参照该定义实现

  • {devprefix}/ota/task 下发OTA升级任务, s2d
  • {devprefix}/ota/task/result OTA 升级任务状态上报 d2s

数据格式

thing lifecycle 通知

暂未实现

{
  "thingId": "xxx",
  "op": "create", 			// delete, enable, disable
  "timestamp": 1662514061000 // ms 时间戳,消息时间
}

presence 上下线通知

{
  "thingId": "xxx",
  "timestamp": 1573002230757,
  "eventType": "connected", // disconnected
  "disconnectReason": "xxx", // 当 eventType 为 disconnected 时
  "remoteAddr": "192.168.35.23:48542" // 客户端来源地址
}

shadow 更新请求

{
    "state": {
        "desired": { // 上层业务系统设置该字段(暂未实现,而是使用 HTTP 接口)
            "attribute1": integer2,
            "attribute2": "string2",
            ...
            "attributeN": boolean2
        },
        "reported": { // 设备设置该字段
            "attribute1": integer1,
            "attribute2": "string1",
            ...
            "attributeN": boolean1
        }
    },
    "clientToken": "token",
    "version": version // 可选,填写大于 0 的值则会验证更新的 shadow 当前版本是否是该值,否则拒绝更新
}

shadow /accepted 响应

{
    "state": {
        "desired": {
            "attribute1": integer2,
            "attribute2": "string2",
            ...
            "attributeN": boolean2
        },
        "reported": {
            "attribute1": integer3,
            "attribute2": "string3",
            ...
            "attributeN": boolean2
        },
        "delta": {
            "attribute1": integer2,
            "attribute2": "string2"
        }
    },
    "metadata": {
        "desired": {
            "attribute1": {
                "timestamp": timestamp
            },
            "attribute2": {
                "timestamp": timestamp
            },
            ...
            "attributeN": {
                "timestamp": timestamp
            }
        },
        "reported": {
            "attribute1": {
                "timestamp": timestamp
            },
            "attribute2": {
                "timestamp": timestamp
            },
            ...
            "attributeN": {
                "timestamp": timestamp
            }
        }
    },
    "timestamp": timestamp,
    "clientToken": "token",
    "version": version
}

shadow /rejected 请求响应

{
  "clientToken": "xxx",				// 原请求 clientToken
  "code": 200, 							  // 类 http status code
  "message": "OK",  					// 错误信息
  "timestamp": 1662514061000  // 响应产生时间,精度到毫秒
}

shadow 更新通知

{
  "previous" : {
    "state": {
        "desired": {
            "attribute1": integer2,
            "attribute2": "string2",
            ...
            "attributeN": boolean2
        },
        "reported": {
            "attribute1": integer1,
            "attribute2": "string1",
            ...
            "attributeN": boolean1
        }
    },
    "metadata": {
        "desired": {
            "attribute1": {
                "timestamp": timestamp
            },
            "attribute2": {
                "timestamp": timestamp
            },
            ...
            "attributeN": {
                "timestamp": timestamp
            }
        },
        "reported": {
            "attribute1": {
                "timestamp": timestamp
            },
            "attribute2": {
                "timestamp": timestamp
            },
            ...
            "attributeN": {
                "timestamp": timestamp
            }
        }
    },
    "version": version-1
  },
  "current": {
    "state": {
        "desired": {
            "attribute1": integer2,
            "attribute2": "string2",
            ...
            "attributeN": boolean2
        },
        "reported": {
            "attribute1": integer2,
            "attribute2": "string2",
            ...
            "attributeN": boolean2
        }
    },
    "metadata": {
        "desired": {
            "attribute1": {
                "timestamp": timestamp
            },
            "attribute2": {
                "timestamp": timestamp
            },
            ...
            "attributeN": {
                "timestamp": timestamp
            }
        },
        "reported": {
            "attribute1": {
                "timestamp": timestamp
            },
            "attribute2": {
                "timestamp": timestamp
            },
            ...
            "attributeN": {
                "timestamp": timestamp
            }
        }
    },
    "version": version
  },
  "timestamp": timestamp,
  "clientToken": "token"
}

shadow /delta 变化通知

{
    "state": {
        "attribute1": integer2,
        "attribute2": "string2",
        ...
        "attributeN": boolean2
        }
    },
    "metadata": {
        "attribute1": {
            "timestamp": timestamp
        },
        "attribute2": {
            "timestamp": timestamp
        },
        ...
        "attributeN": {
            "timestamp": timestamp
        }
    },
    "timestamp": timestamp,
    "clientToken": "token",
    "version": version
}

OTA 任务

规范 OTA 的消息和 topic ,但 tio 本身暂不实现 OTA 功能

{
  "clientToken": "xxx",										// 任务生成方指定
  "taskId": "xxx", 												// ota 任务 id
  "type": "app", 													// ota 类型,app/appconfig/os 等
  "meta": {                               // meta 内字段信息可选、可扩展
    "version": "1.0.3",				
    "md5": "5d41402abc4b2a76b971017c592", // ota 文件 md5 
		"content-type": "application/octet-stream", // 文件格式
		"content-length": 1570000             // 文件大小
  },
  "fileUrl" "https://a/fw-v1.0.3.bin"  		// 更新包地址,一般是 http/https 文件地址
}

OTA 任务响应

任务一共有这几种状态:pending, ongoing, done, failed, canceled 设备上报的状态只有:ongoing, done, failed

{
  "clientToken": "xxx",				 // 原请求 clientToken
  "taskId": "xxx", 						 // ota 任务 id
  "status": "ongoing",         // 状态
  "progress": 80,              // 0-100 百分比 , 在 status 为 ongoing 时有
  "errCode": 1400,             // 错误码,在 status 为 failed 时有
  "errMsg": "invalid url",     // 错误 message,在 status 为 failed 时有
  "timestamp": 1662514061000,  // ms
}

示例:

{
  "clientToken": "xxx",				// 原请求 clientToken
  "status": "ongoing",
  "progress": 80,              // 0-100 百分比
  "timestamp": 1662514061000,  // ms
}
{
  "clientToken": "xxx",				// 原请求 clientToken
  "status": "done",
  "timestamp": 1662514061000,  // ms
}
{
  "clientToken": "xxx",				// 原请求 clientToken
  "status": "failed",
  "errCode": 1400,             // 错误码
  "errMsg": "invalid url",     // 错误 message
  "timestamp": 1662514061000,  // ms
}
错误码 错误内容
400 PARAM_ERROR, 参数错误
500 INTERNAL_ERROR,未知错误
1400 FILE_URL_ERROR, url 无效
1401 FILE_DOWNLOAD_FAILED,下载失败
1402 FILE_CHECKSUM_ERROR,校验错误
1403 FILE_CONTENT_ERROR,文件内容错误
1404 FILE_NO_SPACE,设备空间不足

ntp 客户端请求消息

{
  "clientSendTime": 1685428658000 // 当前时间戳 ms
}

ntp 服务的响应消息

{
  "clientSendTime":1685428658000,
  "serverRecvTime":1685428881508,
  "serverSendTime":1685428881508
}

对外接口定义

默认 resp 数据结构 RespData

{
  code: int;
  message: string;
  data: T;
}

以下在说明接口返回值时仅说明 data 部分。

详细接口定义见 tio swagger 文档

thing

对设备状态、配置等更多的功能建议用 shadow, thing 接口只提供最基础必要的功能

  • POST /api/v1/things 创建设备
  • DELETE /api/v1/things/{id} 删除设备
  • GET /api/v1/things 查询设备
  • GET /api/v1/things/{id} 获取设备

shadow

  • PUT / api/v1/things/{thingId}/shadows/default/state/desired 设置
  • GET / api/v1/thigns/shadows/query 通过 SQL 语句查询 shadow 信息列表
  • GET / api/v1/things/{thingId}/shadows/default 获取 shadow 信息

direct-mothod

  • POST /api/v1/things/{thingId}/methods/{name} 调用设备方法

技术选型

golang + mysql/sqlite + 内嵌mqtt服务/emqx

Clone this wiki locally