统计每个设备每分钟报警次数。
用户拥有1千多台设备,分布在不同城市的多个厂区,每个设备上的传感器大概每5秒采集并上传数据到iot hub。
sensorid | time | status |
---|---|---|
传感器id | 发送时间 | 是否报警,status值为1代表报警 |
传感器分布在多个设备、多个厂区,用户在rds还记录如下传感器、设备、厂区维表信息,如下:
sensorid | sensortype | deviceid | usetime |
---|---|---|---|
传感器id | 传感器类型 | 设备id | 使用寿命 |
统计每个设备每分钟发生报警的次数,并将统计结果通过输出到下游的rds,最终展示在可视化报表中。
create table source_mqtt_table(
sensorid string,
time string,
status integer
) with(
type = 'mqtt',
brokerurl = 'tcp://duig1nr.mqtt.iot.bj.baidubce.com:1883', --必填
topic = 'sensor', --必填
username = 'iotdemo', --必填
password = 'iotdemo', --必填
encode = 'json',
connectiontimeout = '30', --非必填,访问超时设置,单位:s
keepaliveinterval = '60', --非必填,规定时间段内不活动时连接会被断开,单位:s
maxbatchmessagenum = 'int.max', --非必填,每个batch最大数据条数
maxbatchmessagesize = 'int.max' --非必填,每个batch最大消息字节数
);
create table source_rds_table(
sensorid string,
sensortype string,
deviceid string,
usetime integer
) with(
type = 'rds',
user = 'rdsdemo', --必填,数据库用户名
password = 'rdsdemo', --必填,数据库访问密码
url = 'jdbc:mysql://mysql55.rdsmwi1zrjn5ww8.rds.bd.baidubce.com:3306/bsc_rds_test?useunicode=true&characterencoding=utf8', --必填,jdbc访问rds的url
dbtable = 'test' --必填,数据表名称
);
create table sink_rds_table(
deviceid string,
time timestamp,
nums integer
) with(
type = 'rds',
user = 'iotdemo', --必填,数据库用户名
password = 'iotdemo11', --必填,数据库访问密码
url = 'jdbc:mysql://mysql55.rdsmwi1zrjn5ww8.rds.bd.baidubce.com:3306/bsc_rds?useunicode=true&characterencoding=utf8', --必填,jdbc访问rds的url
dbtable = 'iotdemo' --必填,数据表名称
);
统计这一分钟内每个设备的报警次数。由于使用的是滚动窗口,也就意味着数据将在每分钟结束时候产出一份并写入到rds。
insert into
sink_rds_table outputmode append
select
source_rds_table.deviceid,
cast(from_unixtime(cast(source_mqtt_table.time as long)) as timestamp) as time,
count(*) as nums
from
source_mqtt_table inner join source_rds_table on source_mqtt_table.sensorid = source_rds_table.sensorid
where
source_mqtt_table.status = 1
group by
window(time, "1 minute"),
deviceid