最佳实践 > cdn 接口日志聚合统计
cdn 接口日志聚合统计
  • 概览
  • 需求场景
  • 方案概述
  • 配置步骤
  • 定义 bkafka source 表
  • 定义 palo sink 表
  • 编写数据聚合dml语句
  • 相关产品

cdn 接口日志聚合统计-奇异果体育app竞彩官网下载

更新时间:

概览

用户对 cdn 接口日志进行聚合统计。

需求场景

所有的 cdn 接口调用日志通过 flume 直接推送到 百度消息服务(bkafka)中作为流式计算 source , 在我们 bsc 中创建 spark_stream/sql 类型的作业用于 cdn 接口调用日志的聚合统计,并实时将聚合结果写到 百度数据仓库(palo)当中,用户可以利用 数据可视化工具(如 sugar bi)等调用 palo 的 api 完成数据展示。

方案概述

服务器 → bkafka → bsc → palo → sugar bi

配置步骤

一个完整的 spark sql 作业由 source 表、sink 表和 dml 语句构成。

定义 bkafka source 表

```sql label=spark
create table source_kafka_table (
    `prefix` string,
    `region` string,
    `useridsrc` string,
    `clusternamesrc` string,
    `transdurationsrc` double,
    `srcdurationsrc` string,
    `ts` bigint
) with (
    'connector.type' = 'bkafka',
    'format.encode' = 'csv',
    'format.attributes.field-delimiter' = ' ', -- 分隔符为空格
    'connector.topic' = 'xxxxxxxxx__bsc-source',
    'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
    'connector.properties.ssl.filename' = 'kafka-key_bd.zip'
);
```

定义 palo sink 表

```sql label= spark
create table sink_palo_table (
    `field` string
) with (
    'connector.type' = 'palo',
    'format.encode' = 'txt',
    'format.attributes.field-delimiter' = ','  -- 默认分隔符为逗号
    'connector.cluster-id' = 'xxxxxxxx',
    'connector.username' = 'admin',
    'connector.password' = 'xxxxx',
    'connector.database' = 'mct',
    'connector.table' = 'mct_statistics',
    'connector.mysql-host' = 'xxxxxxxxx-xxxxxxx-fe.palo.bd.baidubce.com',
    'connector.mysql-port' = '9030',
    'connector.compute-node-host' = 'xxxxxxxxx-xxxxxxx-be.palo.bd.baidubce.com',
    'connector.compute-node-port' = '8040'
);
```

编写数据聚合dml语句

按照某些值和指定的时间进行聚合,没有使用窗口,而是定义 5 分钟的微批触发时间来完成聚合,并且聚合状态要设置为 no state

```sql label=spark
insert into
    sink_palo_table outputmode append
select
    format_string('%s,%d,%s,%s,%s,%d,%f,%f\n',
        from_unixtime(`ts`/1000-(`ts`/1000)`,'yyyy-mm-dd hh'),
        unix_timestamp(from_unixtime(`ts`/1000-(`ts`/1000)`,'yyyy-mm-dd hh:mm'),'yyyy-mm-dd hh:mm'),
        `region`,
        `useridsrc`,
        `clusternamesrc`,
        count(*),
        sum(if(`srcdurationsrc` != 'null', cast(`srcdurationsrc` as double), 0)/(if(`transdurationsrc` != 0, `transdurationsrc`, 0.01))),
        sum(`transdurationsrc`)
     )
from
    source_kafka_table
where
    prefix = 'xxxxxxxx'
group by
    `useridsrc`,
    `clusternamesrc`,
    `region`,
    from_unixtime(`ts`/1000-(`ts`/1000)`,'yyyy-mm-dd hh'),
    unix_timestamp(from_unixtime(`ts`/1000-(`ts`/1000)`,'yyyy-mm-dd hh:mm'),'yyyy-mm-dd hh:mm') aggregate no state;  -- 聚合过程设置为无状态
```

相关产品

消息服务for kafka百度数据仓库 palo doris版数据可视化sugar bi

网站地图