最佳实践 > api 日志调用统计
api 日志调用统计
  • 概览
  • 需求场景
  • 方案概述
  • 配置步骤
  • 定义 bkafka source 表
  • 定义 tsdb sink 表
  • 编写数据统计dml语句
  • 相关产品

api 日志调用统计-奇异果体育app竞彩官网下载

更新时间:

概览

用户拥有多台服务器,托管了一些 api 调用服务,现在想统计 api 的调用情况,形成图表。

需求场景

所有机器的 api 调用日志通过 自定义日志采集程序 进行日志采集后推送到 百度消息服务(bkafka)中作为流式计算 source , 在我们 bsc 中创建 flink_stream/sql 类型的作业用于 api 日志的聚合统计,并实时将聚合结果写到 时序时空数据库(tsdb)当中,用户可以通过 tsdb 的可视化面板或者利用 数据可视化工具(如 sugar bi)等调用 tsdb 的数据 api 完成数据展示。

方案概述

服务器 → 自定义日志采集程序 → bkafka → bsc → tsdb → sugar bi

配置步骤

一个完整的 flink sql 作业由 source 表、sink 表和 dml 语句构成。

定义 bkafka source 表

```sql label=flink
create table source_kafka_table (
    `timestamp` bigint,
    `status` integer,
    `contentlength` bigint,
    `latency` bigint,
    `groupuuid` string,
    `apiuuid` string
) with (
    'connector.type' = 'bkafka',
    'format.encode' = 'json',
    'connector.topic' = 'xxxxxxxxx__bsc-source',
    'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
    'connector.properties.ssl.filename' = 'kafka-key_bd.zip',
    'connector.properties.group.id' = 'test_group',
    'connector.read.startup.mode' = 'latest',
    'watermark.field' = 'timestamp',
    'watermark.threshold' = '1 minutes'
);
```

定义 tsdb sink 表

```sql label=flink
create table sink_tsdb_table (
    `datapoints` array < row(
        `timestamp` bigint,
        `metric` string,
        `value` bigint,
        `tags` map < string,
        string >
    ) >
) with (
    'connector.type' = 'tsdb',
    'format.encode' = 'json',
    'connector.emit' = 'batch',
    'connector.url' = 'http://xxxxxxx.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
    'connector.write.max-message-num-per-batch' = '2000'
);
```

编写数据统计dml语句

统计每分钟按照 apiuuid、groupuuid、status 进行聚合的结果,每个 query 产生3个 tsdb datapoints,并实时写入到 tsdb 中。这里通过嵌套子查询的方式来使sql结构更加清晰。选取 timestamp 字段作为 eventtime 的watermark,延迟设置为1分钟。聚合时采用滚动窗口,窗口大小为1分钟。

```sql label=flink
insert into
    sink_tsdb_table
select
    array [
        row(`timestamp`, `count_name` , `count`, `common_tags`),
        row(`timestamp`, `traffic_name`, `traffic`, `common_tags`),
        row(`timestamp`, `latency_name`, `latency`, `common_tags`)
    ]
from
    (
        select
            `timestamp`,
            'count' as `count_name`,
            `count`,
            'traffic' as `traffic_name`,
            `traffic`,
            'latency' as `latency_name`,
            `latency`,
            map ['apiuuid', `apiuuid`, 'groupuuid', `groupuuid`, 'status', `status`] as `common_tags`
        from
            (
                select
                    to_bigint(tumble_start(`timestamp`, interval '1' minute)) as `timestamp`,
                    count(1) as `count`,
                    sum(contentlength) as `traffic`,
                    sum(latency) as `latency`,
                    `apiuuid` as `apiuuid`,
                    `groupuuid` as `groupuuid`,
                    `status` as `status`
                from
                    (
                        select
                            `timestamp`,
                            `contentlength`,
                            `latency`,
                            `apiuuid`,
                            `groupuuid`,
                            case
                                when status >= 200
                                and status < 300 then '2xx'
                                when status >= 300
                                and status < 200 then '3xx'
                                when status >= 400
                                and status < 500 then '4xx'
                                when status >= 500
                                and status < 600 then '5xx'
                                else 'oth'
                            end as `status`
                        from
                            source_kafka_table
                    ) as taba
                group by
                    tumble(`timestamp`, interval '1' minute),
                    `apiuuid`,
                    `groupuuid`,
                    `status`
            ) as tabb
    ) as tabc
```

相关产品

消息服务 for kafka时序时空数据库tsdb数据可视化sugar bi

网站地图