最佳实践 > 流式应用场景
流式应用场景
  • 概览
  • 需求场景
  • 事件流
  • 持续计算
  • 方案概述
  • 数据采集
  • 创建消息服务bms topic
  • 安装bls收集器
  • 创建bls传输任务
  • 数据计算(python)
  • 创建bmr spark集群
  • 下载spark kafka streaming依赖
  • 编写spark streaming程序
  • 下载百度消息服务的证书
  • 创建连接kafka配置文件
  • 提交streaming作业
  • 查看作业输出
  • 注意事项
  • 数据计算(scala)
  • 创建bmr spark集群
  • 下载spark kafka streaming依赖
  • 下载百度消息服务的证书
  • 创建连接kafka配置文件
  • 提交streaming作业
  • 查看作业输出
  • 注意事项
  • 相关产品

流式应用场景-奇异果体育app竞彩官网下载

更新时间:

概览

实现云上流式场景下数据流打通,方便用户在百度智能云上使用各个产品实现流式需求,实现流式数据处理全流程。

需求场景

事件流

事件流具能够持续产生大量的数据,这类数据最早出现与传统的银行和股票交易领域,也在互联网监控、无线通信网等领域出现、需要以近实时的方式对更新数据流进行复杂分析如趋势分析、预测、监控等。简单来说,事件流采用的是查询保持静态,语句是固定的,数据不断变化的方式。

持续计算

比如对于大型网站的流式数据:网站的访问pv/uv、用户访问了什么内容、搜索了什么内容等,实时的数据计算和分析可以动态实时地刷新用户访问数据,展示网站实时流量的变化情况,分析每天各小时的流量和用户分布情况;比如金融行业,毫秒级延迟的需求至关重要。一些需要实时处理数据的场景也可以应用flink/kafka,比如根据用户行为产生的日志文件进行实时分析,对用户进行商品的实时推荐等。

方案概述

本场景应用于数据流式处理,使用到bls(百度log service)、bms(百度消息服务)以及bmr(mapreduce)三个产品。

整个流程分为数据采集和数据计算两部分。

数据采集

数据采集过程通过bls以及百度消息服务bms实现。

创建消息服务bms topic

参考文档:。

目前百度消息服务bms支持“华北-北京”、“华南-广州”以及“香港2区”三个地区,创建主题前可以根据具体需求选择不同的区域。

安装bls收集器

参考文档:安装收集器

  1. 选择“收集器安装”,选择相应的操作系统后,点击“复制”;

  2. 登录需要传输日志的主机,在root权限下执行所“复制”的安装命令。

创建bls传输任务

具体步骤为: 1. 在传输任务列表页面,点击“创建传输任务”,进入创建传输任务页面; 2. 在“任务信息”区,输入任务名称; 3. 在“源端设置”区,根据源数据类型,选择不通的源端类型以及进行相应的配置; 4. 在“目的端设置”区,选择“kafka”作为日志投递目的; 5. 在“主机列表”区,点击“添加主机”,选择安装好“收集器”的主机; 6. 在“主机列表”区,选择需部署该传输任务的主机,点击“创建”;

详细操作步骤请参考文档:。

目前百度消息奇异果体育app竞彩官网下载的服务支持“华北-北京”、“华南-广州”两个地区,创建topic前可以根据具体需求选择不同的区域。

数据计算(python)

数据计算过程通过bmr的spark streaming连接百度消息服务。本文以使用pyspark为例,spark版本1.6,线上kafka版本0.10。具体步骤如下:

创建bmr spark集群

参考文档:创建集群

注意:在“集群配置”区,选择“spark”内置模板,并将spark选上。

下载spark kafka streaming依赖

# eip可以在bmr console集群详情页的实例列表获取
ssh root@eip
# 切换到hdfs用户
su hdfs
cd
# 下载依赖
wget http://bmr-public-bj.bj.bcebos.com/sample/spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar

备注

获取集群登录公网ip

编写spark streaming程序

以kafka_wordcount为例,使用前请删除文中注释:

from __future__ import print_function
import sys
import configparser
from pyspark import sparkcontext
from pyspark.streaming import streamingcontext
from pyspark.streaming.kafka010 import kafkautils
from pyspark.streaming.kafka010 import preferconsistent
from pyspark.streaming.kafka010 import subscribe
# 读取配置文件
def read_config(file_name):
    cf = configparser.configparser()
    # read config file
    cf.read(file_name)
    # read kafka config
    section = "kafka"
    opts = cf.options(section)
    config = {}
    for opt in opts:
        # topics should be a list
        if opt == "topics":
            config[opt] = str.split(cf.get(section, opt), ",")
        else:
            config[opt] = cf.get(section, opt)
    return config
if __name__ == "__main__":
    """
    if len(sys.argv) != 3:
        print("usage: kafka_wordcount.py  ", file=sys.stderr)
        exit(-1)
    """
    # 建立sparkcontext和streamingcontext,demo处理间隔为20s
    sc = sparkcontext(appname="pythonstreamingkafkawordcount")
    ssc = streamingcontext(sc, 20)
    # 读取配置文件test.conf,获取连接百度kafka参数
    config_file = "test.conf"
    kafkaparams = read_config(config_file)
    # 建立kafka输入流
    topics = kafkaparams["topics"]
    kvs = kafkautils.createdirectstream(ssc, preferconsistent(), subscribe(topics, kafkaparams))
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatmap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reducebykey(lambda a, b: a b)
    counts.pprint()
    # 启动streamingcontext
    ssc.start()
    ssc.awaittermination()

下载百度消息服务的证书

下载证书:

创建连接kafka配置文件

以“test.conf"为例:

vi test.conf
# 写入如下内容
[kafka]
bootstrap.servers = kafka.bj.baidubce.com:9091
topics = test_for_demo
group.id = test
# 以下为ssl配置,根据client.properties中的内容进行更换
security.protocol = ssl
ssl.truststore.password = test_truststore_password
ssl.truststore.location = client.truststore.jks
ssl.keystore.location = client.keystore.jks
ssl.keystore.password = test_keystore_password

参数说明:

bootstrap.servers: kafka服务地址
topics: 需要消费的topic,如需消费多个topic,以逗号分割,如topic1,topic2,topic3
group.id: consumer group的id,请不要随意设置,以免与其他用户冲突(kafka服务未来将支持groupid隔离)

更多配置见:

提交streaming作业

按照如上四步,当前目录(hdfs home目录)有五个文件:test.conf、client.truststore.jks、client.keystore.jks、kafka_wordcount.py、spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar:

使用spark-submit提交streaming作业:

/usr/bin/spark-submit --master yarn --deploy-mode cluster  --files test.conf,client.keystore.jks,client.truststore.jks --jars spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar kafka_wordcount.py

查看作业输出

在“集群详情页”点开“hadoop yarn web ui”,即可打开yarn console:

在yarn console可以查看对应application的日志,用以查看程序的输出,以kafka_wordcount为例,输出在stdout中:

注意事项

  1. 如果需要停掉某个作业,可以使用“yarn application -kill applicationid”命令,例如:

    yarn application -kill application_1488868742896_0002
  2. 同时跑多个作业,请注意修改test.conf中的配置

数据计算(scala)

数据计算过程通过bmr的spark streaming连接百度消息服务bms。本文以使用scala为例,spark版本2.1,线上kafka版本0.10。具体步骤如下:

创建bmr spark集群

参考文档:创建集群

注意:在“集群配置”区,选择“spark2”内置模板,并将spark选上。

下载spark kafka streaming依赖

# eip可以在bmr console集群详情页的实例列表获取
ssh root@eip
# 切换到hdfs用户
su hdfs
cd
# 下载依赖
wget https://bmr-public-bj.bj.bcebos.com/sample/original-kafke-read-streaming-1.0-snapshot.jar

下载百度消息服务的证书

下载证书:

创建连接kafka配置文件

以“test.conf"为例:

vi test.conf
# 写入如下内容
[kafka]
bootstrap.servers = kafka.bj.baidubce.com:9091
topics = test_for_demo
group.id = test
# 以下为ssl配置,根据client.properties中的内容进行更换
security.protocol = ssl
ssl.truststore.password = test_truststore_password
ssl.truststore.location = client.truststore.jks
ssl.keystore.location = client.keystore.jks
ssl.keystore.password = test_keystore_password

参数说明:

bootstrap.servers: kafka服务地址
topics: 需要消费的topic,如需消费多个topic,以逗号分割,如topic1,topic2,topic3
group.id: consumer group的id,请不要随意设置,以免与其他用户冲突(kafka服务未来将支持groupid隔离)

更多配置见:

提交streaming作业

按照如上四步,当前目录有四个文件:test.conf、client.truststore.jks、client.keystore.jks、

使用spark-submit提交streaming作业:

spark-submit --class com.baidu.inf.spark.wordcount --master yarn --deploy-mode cluster  --files test.conf,client.keystore.jks,client.truststore.jks ./original-kafke-read-streaming-1.0-snapshot.jar "$topics" "$bootstrap.servers" "$group.id"  "$ssl.truststore.password" "$ssl.keystore.password"

(注意:这里请替换掉命令中最后面5个参数值为实际的值,即文件test.conf中描述的字段。)

例子:

spark-submit --class com.baidu.inf.spark.wordcount --master yarn --deploy-mode cluster --files test.conf,client.keystore.jks,client.truststore.jks ./original-kafke-read-streaming-1.0-snapshot.jar "868313b92dbe474b80ee4ef0904df26d__test" "kafka.bj.baidubce.com:9091" "test" "kafka" "k7ynher0"

附上wordcount 示例代码:

package com.baidu.inf.spark
import org.apache.kafka.common.serialization.stringdeserializer
import org.apache.spark.sparkconf
import org.apache.spark.streaming.kafka010.consumerstrategies.subscribe
import org.apache.spark.streaming.kafka010.kafkautils
import org.apache.spark.streaming.kafka010.locationstrategies.preferconsistent
import org.apache.spark.streaming.{seconds, streamingcontext}
object wordcount {
  def classname = this.getclass.getname.stripsuffix("$")
  def main(args: array[string]): unit = {
if (args.length < 4) {
  system.err.println(
    s"usage:input params: "
        "  "
        "  "
        "  "
        " "
        " "
  )
  sys.exit(1)
}
val array(topic, bootstrap, group, 
truststore, keystore, _*) = args
val conf = new sparkconf().setappname(classname).setifmissing("spark.master", "local[2]")
conf.set("spark.serializer", "org.apache.spark.serializer.kryoserializer")
val ssc = new streamingcontext(conf, seconds(5))
val kafkaparams = map[string, object](
  "bootstrap.servers" -> bootstrap,
  "key.deserializer" -> classof[stringdeserializer].getname,
  "value.deserializer" -> classof[stringdeserializer].getname,
  "group.id" -> group,
  "auto.offset.reset" -> "latest",
  "serializer.class" -> "kafka.serializer.stringencoder",
  "ssl.truststore.location" -> "client.truststore.jks",
  "ssl.keystore.location" -> "client.keystore.jks",
  "security.protocol" -> "ssl",
  "ssl.truststore.password" -> truststore,
  "ssl.keystore.password" -> keystore,
  "enable.auto.commit" -> (true: java.lang.boolean)
)
ssc.sparkcontext.setloglevel("warn")
val topics = array(topic)
// 消费kafka数据
val stream = kafkautils.createdirectstream[string, string](
  ssc,
  preferconsistent,~~~~
  subscribe[string, string](topics, kafkaparams)
).map(record => record.value())
val counts = stream.flatmap(_.split(" "))
  .map(word => (word, 1))
  .reducebykey(_   _)
  
  counts.print()
  
  ssc.start()
  ssc.awaittermination()
  ssc.stop(true, true)
  }
 }

查看作业输出

在“集群详情页”点开“hadoop yarn web ui”,即可打开yarn console:

在yarn console可以查看对应application的日志,用以查看程序的输出,以kafka_wordcount为例,输出在stdout中:

注意事项

  1. 如果需要停掉某个作业,可以使用“yarn application -kill applicationid”命令,例如:

    yarn application -kill application_1488868742896_0002
  2. 同时跑多个作业,请注意修改test.conf中的配置

相关产品

bls(百度log service)bms(百度消息服务)以及bmr(mapreduce)弹性公网ip(eip)

网站地图