奇异果体育app竞彩官网下载的解决方案实践 > 物联网设备实时报警统计(流表join)
物联网设备实时报警统计(流表join)
  • 概览
  • 需求场景
  • 方案概述
  • 配置步骤
  • 定义 mqtt source表
  • 定义 rds source表
  • 定义rds sink表
  • 编写数据统计dml语句
  • 相关产品

物联网设备实时报警统计(流表join)-奇异果体育app竞彩官网下载

更新时间:

概览

统计每个设备每分钟报警次数。

需求场景

用户拥有1千多台设备,分布在不同城市的多个厂区,每个设备上的传感器大概每5秒采集并上传数据到 物联网核心套件(iot core)或 物接入(iot hub) 的 mqtt 当中作为第一个 source,一些维度信息存放在 云数据库(rds)作为第二个 source,在我们 bsc 中创建 spark_stream/sql 类型的作业用于每分钟报警次数的统计,并实时将处理结果推送到 云数据库(rds),最终在 数据可视化(sugar bi)的可视化报表中展示。

方案概述

用户设备 → iot hub → |

                                 |→ bsc → rds → sugar bi

                      rds → |

配置步骤

一个完整的 spark sql 作业由 source 表、sink 表和 dml 语句构成。

定义 mqtt source表

```sql label=spark
create table source_mqtt_table(
    sensorid string,
    time string,
    status integer
) with(
    type = 'mqtt',
    brokerurl = 'tcp://duig1nr.mqtt.iot.bj.baidubce.com:1883', --必填
    topic = 'sensor', --必填
    username = 'iotdemo', --必填
    password = 'iotdemo', --必填
    encode = 'json',
    connectiontimeout = '30', --非必填,访问超时设置,单位:s
    keepaliveinterval = '60', --非必填,规定时间段内不活动时连接会被断开,单位:s
    maxbatchmessagenum = 'int.max', --非必填,每个batch最大数据条数
    maxbatchmessagesize = 'int.max' --非必填,每个batch最大消息字节数
);
```

定义 rds source表

```sql label=spark
create table source_rds_table(
sensorid string,
sensortype string,
deviceid string,
usetime integer
) with(
type = 'rds',
user = 'rdsdemo', --必填,数据库用户名
password = 'rdsdemo', --必填,数据库访问密码
url = 'jdbc:mysql://mysql55.rdsmwi1zrjn5ww8.rds.bd.baidubce.com:3306/bsc_rds_test?useunicode=true&characterencoding=utf8', --必填,jdbc访问rds的url
dbtable = 'test' --必填,数据表名称
);
```

定义rds sink表

```sql label=spark
create table sink_rds_table(
deviceid string,
time timestamp,
nums integer
) with(
    type = 'rds',
    user = 'iotdemo', --必填,数据库用户名
    password = 'iotdemo11', --必填,数据库访问密码
    url = 'jdbc:mysql://mysql55.rdsmwi1zrjn5ww8.rds.bd.baidubce.com:3306/bsc_rds?       useunicode=true&characterencoding=utf8', --必填,jdbc访问rds的url
    dbtable = 'iotdemo' --必填,数据表名称
);
```

编写数据统计dml语句

统计这一分钟内每个设备的报警次数。由于使用的是滚动窗口,也就意味着数据将在每分钟结束时候产出一份并写入到rds。 ~~~codeset sql label=spark insert into sink_rds_table outputmode append select source_rds_table.deviceid, cast(from_unixtime(cast(source_mqtt_table.time as long)) as timestamp) as time, count(*) as nums from source_mqtt_table inner join source_rds_table on source_mqtt_table.sensorid = source_rds_table.sensorid where source_mqtt_table.status = 1 group by window(time, "1 minute"), deviceid ~~~

相关产品

物联网核心套件 iot core云数据库 rds数据可视化sugar bi

网站地图