物联网设备实时监控预警
  • 概览
  • 需求场景
  • 方案概述
  • 配置步骤
  • 定义 mqtt source 表
  • 定义 mqtt sink 表
  • 编写数据提取dml语句
  • 相关产品

物联网设备实时监控预警-奇异果体育app竞彩官网下载

更新时间:

概览

监控、预警工厂设备的用电情况。

需求场景

用户拥有大量的大功率设备,如果没有在下班之前及时关闭,会造成用电浪费,甚至引起重大安全事故。每个设备上的传感器定时(5~30秒不等)将设备当前的情况推送到 物联网核心套件(iot core)或 物接入(iot hub) 的 mqtt 当中作为 source,在我们 bsc 中创建 flink_stream/sql 类型的作业用于设备关键信息的提取,并实时将处理结果推送到 物联网核心套件(iot core)或 物接入(iot hub) 的 mqtt 当中,方便下游 规则引擎(rule engine) 和 时序时空数据库(tsdb)消费。用户可以基于 智能小程序 开发小程序或第三方平台调用 tsdb 的数据 api,并完成数据展示、历史数据分析、故障预警等功能。可以有效发现安全隐患、及时更换老旧设备、发现异常用电情况,为工厂运转节省成本、提升安全系数。

方案概述

用户设备 → iot hub → bsc → iot hub → rule engine → tsdb → 小程序

配置步骤

一个完整的 flink sql 作业由 source 表、sink 表和 dml 语句构成。

定义 mqtt source 表

```sql label=flink
set job.stream.timetype = 'processtime'; -- 设置 proctime
create table source_mqtt_table (
        `modbus` row(
            `request` row(
                `startaddr` bigint,
                `length` bigint
            ),
            `response` string,
            `parsedresponse` array < row(
                `desc` string,
                `type` string,
                `unit` string,
                `value` string,
                `errno` bigint
            ) >
        )
        `metrics` row(
            `settingtime_m` bigint,
            `building` bigint,
            `floor` bigint,
            `company` bigint,
            `equipment` bigint,
            `c_temperature` double,
            `s_temperature` bigint,
            `cabinet` bigint,
            `runningtime_m` bigint,
            `runningtime_h` bigint,
            `settingtime_h` bigint
        )
    ) with (
        'connector.type' = 'mqtt',
        'format.encode' = 'json',
        'connector.url' = 'tcp://xxxxxxxxxx.mqtt.iot.gz.baidubce.com:1883',
        'connector.topic' = 'device1',
        'connector.username' = 'xxxxxxxxxx/yyyyyy',
        'connector.password' = 'xxxxxxxx',
        'connector.semantic' = 'at_least_once'
    );
```

定义 mqtt sink 表

```sql label=flink
create table sink_mqtt_table (
        `field` string,
        `timestamp` bigint,
        `value` double,
        `company` bigint,
        `building` bigint,
        `floor` bigint,
        `cabinet` bigint,
        `equipment` bigint
    ) with (
        'connector.type' = 'mqtt',
        'format.encode' = 'json',
        'connector.url' = 'tcp://xxxxxxx.mqtt.iot.gz.baidubce.com:1883',
        'connector.topic' = 'device1_unnested',
        'connector.username' = 'xxxxxx/yyyyy',
        'connector.password' = 'xxxxxxxx'
    );
```

编写数据提取dml语句

解析 source 表中的复杂嵌套 json, 提取出设备的关键信息,如位置编号、运行状态,并使用 proctime 来为输出结果添加一条记录

```sql label=flink
insert into
    sink_mqtt_table
select
    `desc` as field,
    to_bigint(current_timestamp) as `timestamp`,
    cast(`value` as double) as `value`,
    company,
    building,
    `floor`,
    cabinet,
    equipment
from
    source_mqtt_table,
    unnest(sink_mqtt_table.parsedresponse) as a(`desc`, `type`, `unit`, `value`, `errno`)
where
    `desc` not in (
        'company',
        'building',
        'floor',
        'cabinet',
        'equipment'
    )
```

相关产品

物联网核心套件 iot core时序时空数据库tsdb

网站地图