最佳实践 > cdn 日志提取中转(etl)
cdn 日志提取中转(etl)
  • 概览
  • 需求场景
  • 方案概述
  • 配置步骤
  • 定义 bkafka source 表
  • 定义 bkafka / bos sink 表
  • 编写数据提取dml语句
  • 相关产品

cdn 日志提取中转(etl)-奇异果体育app竞彩官网下载

更新时间:

概览

用户对 cdn 日志进行提取中转,属于 etl 场景, 用于数据的实时清洗、归并和结构化。

需求场景

所有的 cdn 日志通过 flume 直接推送到 百度消息服务(bkafka)中作为流式计算 source , 在我们 bsc 中创建 spark_stream/sql 类型的作业用于 cdn 日志的提取中转,并实时将结果写到 百度消息服务(bkafka)或 对象存储(bos)当中,用户可以对 sink 端的 bkafka / bos 进行进一步的处理。

方案概述

服务器 → bkafka → bsc →; bkafka / bos → 其他

配置步骤

一个完整的 spark sql 作业由 source 表、sink 表和 dml 语句构成。

定义 bkafka source 表

```sql label=spark
create table source_kafka_table (
    `prefix` string,
    `region` string,
    `useridsrc` string,
    `clusternamesrc` string,
    `transdurationsrc` double,
    `srcdurationsrc` string,
    `ts` bigint
) with (
    'connector.type' = 'bkafka',
    'format.encode' = 'csv',
    'format.attributes.field-delimiter' = ' ',
    'connector.topic' = 'xxxxxxxxx__bsc-source',
    'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
    'connector.properties.ssl.filename' = 'kafka-key_bd.zip'
);
```

定义 bkafka / bos sink 表

```sql label= spark bkafka
create table sink_table (
    `timestamp` timestamp,
    `region` string,
    `useridsrc` string,
    `clusternamesrc` string
) with (
    'connector.type' = 'bkafka',
    'format.encode' = 'csv',
    'format.attributes.field-delimiter' = ',',
    'connector.topic' = 'xxxxxxxxx__bsc-source',
    'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
    'connector.properties.ssl.filename' = 'kafka-key_bd.zip'
);
```
```sql label= spark bos
create table sink_table (
    `timestamp` timestamp,
    `region` string,
    `useridsrc` string,
    `clusternamesrc` string
) with (
    'connector.type' = 'bos',
    'format.encode' = 'json',
    'connector.path' = 'bos://asc-sandbox-su/bos-source/json/'
);
```

编写数据提取dml语句

根据 prefix 对日志内容进行提取,并存放到下游的云服务中,为之后的其他处理做数据清洗。

```sql label=spark
insert into
    sink_table outputmode append
select
    from_unixtime(`ts`/1000-(`ts`/1000)`,'yyyy-mm-dd hh') as `timestamp`,
    `region`,
    `useridsrc`,
    `clusternamesrc`
from
    source_kafka_table
where
    prefix = 'xxxxxxxx';
```

相关产品

消息服务 for kafka对象存储 bos

网站地图