You must be signed in to change notification settings - Fork 3
参照目前各大公有云对 iothub 的成熟的核心抽象和设计,完成一个最小化的 iothub,主要内容包括
- 设备连接:当前只需支持 MQTT
- 影子模型:tags、state(desired、reported)
- 设备直接方法(direct method)
- 方便私有化部署
- 统一解决一类问题(设备和服务端的交互),不重复造轮子,提高效率,有所积累
影子模型 主要参考 aws iot 和 azure
- api : 对外提供 RESTful API ,包括设备管理、设备影子操作、设备直接方法调用等
- core: iothub 核心层,包括设备管理、设备影子模型、设备认证、任务管理
- thing: 设备增删改查、启用禁用,认证方式,定位为最基础必要的管理,更多需求用 shadow 实现
- shadow: 设备影子模型,设备配置、控制、状态
- auth: 设备认证和授权(authn, authz)。目前只支持密钥认证,后续可扩展多种认证,比如设备证书;授权方面限定设备对指定 topic 的读写
- job: 任务管理,可以创建一些自动执行的任务,完成设备的管理、批量处理,如 OTA、配置下发
- connector: 设备连接,可适配多类协议。目前只支持 MQTT(包括 MQTT over Websocket)
为了避免和各种语境下的 "device" 产生混淆,在 iothub 中统一用 thing 的概念, “device" 可能是一个独立物理设备,一整套业务意义的软硬件整体,或是实际上分开但业务逻辑上一体的东西 等等 对于 iothub ,thing 则是被管理的基础单位,至于开发意义上或者业务意义上是不是 device 并不在其范畴内
tio 与设备、业务系统交互关系 —— 具体到实现的范畴
- shadow 重在声明和异步 —— 记录 state ,且能通过 state 的 desired 和 reported 的比对,异步地实现对设备的配置、控制等功能
- direct method 重在及时 —— 服务端使用 HTTP 调用随即得到响应
- job 批量、定时、异步,且有历史记录 —— 适合于类似 OTA 这样的任务
- 只关心最终 state 用 shadow
- 想立即得到结果用 direct method
- 关心各个执行状态和记录,希望异步批量执行,用 job
"thingId": "xxxx" , // 设备唯一 ID
"enabled": true, // 默认都是启用
"authType": "password", // 或 ca 证书--当前未实现 ca 证书
"authValue": "xxxx" // 根据 authType 类型用于设备认证的数据:密码、证书
主要参考 aws
"thingId": "myThingId",
"connected": true,
"connectedAt": "2022-07-04T03:28:34.515Z",
"disconnectedAt": "2022-07-04T03:28:34.515Z",
"version": 1, // 用于控制并发更新
"state": {
"desired": {
"config": {
"period": 300
"reported": {
"connectivity": {
"type": "cellular"
"config": {
"period": 300,
"status": "Success"
"metadata": {
"desired": {
"config": {
"period": {
"timstamp": 1662514061000
"reported": {
"connectivity": {
"type": : {
"timstamp": 1662514061000
"config": {
"period": : {
"timstamp": 1662514061000
"status": ": {
"timstamp": 1662514061000
d2s 代表是消息流向是设备到服务端,同理理解 s2d 和 s2s
设备消息前缀:devprefix = $iothub/things/{thingId}
- {devprefix}/messages/property 设备指标属性上报 d2s,数据格式待定义
- {devprefix}/messages/will/# 设备遗言消息 d2s ,格式由设备自定义
- {devprefix}/presence 设备状态变化(上下线) s2s 数据格式
- $iothub/user/things/{thingId}/# 自定义设备 topic , d2s s2s,数据格式用户自定义
prefix = $iothub/things/{thingId}/shadows/name/default
shadow 目前默认只有一个 default ,此处参照了 aws 定义,设计上预留了后续可以一个设备多个 shadow 的可能性
设备获取 shadow
- {prefix}/get 设备获取 shadow,d2s, payload 空对象
- {prefix}/get/accepted 成功获取 shadow,s2d , shadow 格式
- {prefix}/get/rejected 拒绝获取 shadow,s2d, shadow 请求响应
设备更新 shadow(只能对 reported
- {prefix}/update 更新请求 ,d2s, shadow更新请求
- {prefix}/update/accepted 接受更新结果的回复 , s2d, shadow /accepted 请求响应
- {prefix}/update/rejected 拒绝更新的回复,s2d, shadow /rejected 请求响应
设备对 shadow reported
更新或者服务端对shadow desired
更新成功后,产生以下两个 topic 的数据通知
- {prefix}/update/documents 设备 shadow 属性上报变化,s2s, shadow更新格式
- {prefix}/update/delta shadow delta 通知, s2d, shadow更新格式
对于没有能力集成常规 ntp 服务的设备来说, tio 提供了 ntp 相关的 topic 和消息,供设备得到尽量准确的时间校准。参考了阿里云物联网平台ntp服务的设计。
- {devprefix}/methods/{name}/req s2d
"clientToken": "xxx", // iothub 生成
"data": { // 所有合法 json 值
"input1": "someInput",
"input2": "anotherInput"
- {devprefix}/methods/{name}/resp d2s
"clientToken": "xxx", // 和 req 收到的对应
"code": 200, // 类似 http,400 请求参数错误, 404 设备不存在, 504 设备超时未响应
"messge": "OK",
"data" : {...}
设备调用云端方法?? 必要性待讨论
- {devprefix}/rpc/{methodName}/req d2s
- {devprefix}/rpc/{methodName}/resp s2d
未在 tio 中实现,可在 tio 外部参照该定义实现
- {devprefix}/ota/task 下发OTA升级任务, s2d
- {devprefix}/ota/task/result OTA 升级任务状态上报 d2s
"thingId": "xxx",
"op": "create", // delete, enable, disable
"timestamp": 1662514061000 // ms 时间戳,消息时间
"thingId": "xxx",
"timestamp": 1573002230757,
"eventType": "connected", // disconnected
"disconnectReason": "xxx", // 当 eventType 为 disconnected 时
"remoteAddr": "" // 客户端来源地址
"state": {
"desired": { // 上层业务系统设置该字段(暂未实现,而是使用 HTTP 接口)
"attribute1": integer2,
"attribute2": "string2",
"attributeN": boolean2
"reported": { // 设备设置该字段
"attribute1": integer1,
"attribute2": "string1",
"attributeN": boolean1
"clientToken": "token",
"version": version // 可选,填写大于 0 的值则会验证更新的 shadow 当前版本是否是该值,否则拒绝更新
"state": {
"desired": {
"attribute1": integer2,
"attribute2": "string2",
"attributeN": boolean2
"reported": {
"attribute1": integer3,
"attribute2": "string3",
"attributeN": boolean2
"delta": {
"attribute1": integer2,
"attribute2": "string2"
"metadata": {
"desired": {
"attribute1": {
"timestamp": timestamp
"attribute2": {
"timestamp": timestamp
"attributeN": {
"timestamp": timestamp
"reported": {
"attribute1": {
"timestamp": timestamp
"attribute2": {
"timestamp": timestamp
"attributeN": {
"timestamp": timestamp
"timestamp": timestamp,
"clientToken": "token",
"version": version
"clientToken": "xxx", // 原请求 clientToken
"code": 200, // 类 http status code
"message": "OK", // 错误信息
"timestamp": 1662514061000 // 响应产生时间,精度到毫秒
"previous" : {
"state": {
"desired": {
"attribute1": integer2,
"attribute2": "string2",
"attributeN": boolean2
"reported": {
"attribute1": integer1,
"attribute2": "string1",
"attributeN": boolean1
"metadata": {
"desired": {
"attribute1": {
"timestamp": timestamp
"attribute2": {
"timestamp": timestamp
"attributeN": {
"timestamp": timestamp
"reported": {
"attribute1": {
"timestamp": timestamp
"attribute2": {
"timestamp": timestamp
"attributeN": {
"timestamp": timestamp
"version": version-1
"current": {
"state": {
"desired": {
"attribute1": integer2,
"attribute2": "string2",
"attributeN": boolean2
"reported": {
"attribute1": integer2,
"attribute2": "string2",
"attributeN": boolean2
"metadata": {
"desired": {
"attribute1": {
"timestamp": timestamp
"attribute2": {
"timestamp": timestamp
"attributeN": {
"timestamp": timestamp
"reported": {
"attribute1": {
"timestamp": timestamp
"attribute2": {
"timestamp": timestamp
"attributeN": {
"timestamp": timestamp
"version": version
"timestamp": timestamp,
"clientToken": "token"
"state": {
"attribute1": integer2,
"attribute2": "string2",
"attributeN": boolean2
"metadata": {
"attribute1": {
"timestamp": timestamp
"attribute2": {
"timestamp": timestamp
"attributeN": {
"timestamp": timestamp
"timestamp": timestamp,
"clientToken": "token",
"version": version
规范 OTA 的消息和 topic ,但 tio 本身暂不实现 OTA 功能
"clientToken": "xxx", // 任务生成方指定
"taskId": "xxx", // ota 任务 id
"type": "app", // ota 类型,app/appconfig/os 等
"meta": { // meta 内字段信息可选、可扩展
"version": "1.0.3",
"md5": "5d41402abc4b2a76b971017c592", // ota 文件 md5
"content-type": "application/octet-stream", // 文件格式
"content-length": 1570000 // 文件大小
"fileUrl" "https://a/fw-v1.0.3.bin" // 更新包地址,一般是 http/https 文件地址
, ongoing
, done
, failed
, canceled
, done
, failed
"clientToken": "xxx", // 原请求 clientToken
"taskId": "xxx", // ota 任务 id
"status": "ongoing", // 状态
"progress": 80, // 0-100 百分比 , 在 status 为 ongoing 时有
"errCode": 1400, // 错误码,在 status 为 failed 时有
"errMsg": "invalid url", // 错误 message,在 status 为 failed 时有
"timestamp": 1662514061000, // ms
"clientToken": "xxx", // 原请求 clientToken
"status": "ongoing",
"progress": 80, // 0-100 百分比
"timestamp": 1662514061000, // ms
"clientToken": "xxx", // 原请求 clientToken
"status": "done",
"timestamp": 1662514061000, // ms
"clientToken": "xxx", // 原请求 clientToken
"status": "failed",
"errCode": 1400, // 错误码
"errMsg": "invalid url", // 错误 message
"timestamp": 1662514061000, // ms
错误码 | 错误内容 |
400 | PARAM_ERROR, 参数错误 |
500 | INTERNAL_ERROR,未知错误 |
1400 | FILE_URL_ERROR, url 无效 |
1403 | FILE_CONTENT_ERROR,文件内容错误 |
1404 | FILE_NO_SPACE,设备空间不足 |
"clientSendTime": 1685428658000 // 当前时间戳 ms
默认 resp 数据结构 RespData
code: int;
message: string;
data: T;
以下在说明接口返回值时仅说明 data 部分。
详细接口定义见 tio swagger 文档
对设备状态、配置等更多的功能建议用 shadow, thing 接口只提供最基础必要的功能
- POST /api/v1/things 创建设备
- DELETE /api/v1/things/{id} 删除设备
- GET /api/v1/things 查询设备
- GET /api/v1/things/{id} 获取设备
- PUT / api/v1/things/{thingId}/shadows/default/state/desired 设置
- GET / api/v1/thigns/shadows/query 通过 SQL 语句查询 shadow 信息列表
- GET / api/v1/things/{thingId}/shadows/default 获取 shadow 信息
- POST /api/v1/things/{thingId}/methods/{name} 调用设备方法
golang + mysql/sqlite + 内嵌mqtt服务/emqx