最佳实践 > 使用dts实现mysql到kafka数据迁移
使用dts实现mysql到kafka数据迁移
  • 概览
  • 需求场景
  • 方案概述
  • 迁移前置条件
  • 源端mysql迁移权限要求
  • 目标端kafka环境准备
  • 操作步骤
  • 1.创建迁移任务
  • 2.配置任务
  • 3.启动迁移
  • 4.检查kafka集群数据

使用dts实现mysql到kafka数据迁移-奇异果体育app竞彩官网下载

更新时间:

概览

本文主要介绍通过dts数据迁移功能从自建mysql或rds for mysql迁移至百度消息服务或自建kafka集群

需求场景

适用于使用百度智能云数据传输服务dts(以下简称 dts),将自建mysql或rds for mysql数据库实例的数据迁移到百度消息服务自建kafka集群中。

方案概述

迁移前置条件

  • 已创建作为迁移源端的mysql数据库实例,版本为5.5、5.6或5.7。
  • 已创建作为迁移目标端的kafka集群或百度消息服务主题。自建kafka集群支持版本为0.9或0.10

源端mysql迁移权限要求

当源端为自建数据库时,用户需要提供一个满足权限要求的迁移账号。 用户可以对准备用于迁移的已有账号进行授权,也可以创建新的账号用于迁移,请参考如下授权语句:
grant select, lock tables, replication slave, replication client, show view on ∗.∗ to '迁移账号'@'主机名' identified by '迁移密码'; 其中「迁移账号」和「迁移密码」请按需自行填写;「主机名」可参考 dts公网ip段 进行填写,实现仅对dts服务器ip放开访问限制,或填写%对全部ip开放访问。
授权完成后,继续刷新系统权限表,执行flush privileges;,确保账号权限立即生效。

目标端kafka环境准备

目标端为百度消息服务主题

无需准备环境,可直接配置dts任务。dts操作步骤参见:【操作步骤】

目标端为自建kafka集群

由于dts服务管控节点和目标端自建kafka集群间存在网络隔离,因而需要配置自建kafka集群的访问路由规则。您可以根据需要选择不同的访问方式,并按步骤配置您的kafka集群。

  • 通过公网访问您的kafka集群

如果您希望dts通过公网链路访问您的kafka集群,您需要为kafka集群中的每一台机器配置公网访问。假设目标端kafka集群有三个broker,其公网ip分别为:106.0.0.1、106.0.0.2、106.0.0.3;其内部网络ip分别为:172.16.0.1、172.16.0.2、172.16.0.3。您需要在每个broker的配置文件server.properties做如下配置,以broker1为例(公网ip:106.0.0.1,内网ip:172.16.0.1):

  1. listeners
listeners=internal://172.16.0.1:9092,public://172.16.0.1:19092

listeners是用来定义broker的listener的配置项。 internal标签下的连接信息(172.16.0.1:9092),用于broker间的内部通信。这里配置了内网ip(172.16.0.1),表示broker间可以通过内部网络实现网络通信。如果您希望broker间的通信走公网链路,可以改为配置公网ip(106.0.0.1)
public标签标记的连接信息(172.16.0.1:19092),用于与公网进行网络通信。注意:这里配置的ip应当与internal标签下的ip保持一致,但端口务必要不同

  1. advertised.listeners
advertised.listeners=internal://172.16.0.1:9092,public://106.0.0.1:19092

advertised.listeners用于将broker的listener信息发布到zookeeper中,供客户端或其他broker查询。如果配置了advertised.listeners,那么就不会将listeners配置的信息发布到zookeeper中。
internal标签下的连接信息(172.16.0.1:9092)与上文listeners配置保持一致,但public标签下的连接信息(106.0.0.1:19092)需要填入公网ip

  1. listener.security.protocol.map
listener.security.protocol.map=internal:plaintext,public:plaintext

listener.security.protocol.map用于配置监听者的安全协议。 这里您可以按照自己的需要为不同的连接方式配置不同的安全协议。示例中默认为internal和public都配置了无访问控制(plaintext)的安全协议。

  1. inter.broker.listener.name
inter.broker.listener.name=internal

inter.broker.listener.name用于指定某一个标签作为internal listener的连接方式,这个标签所代表的listener专门用于kafka集群中broker之间的通信。 示例中将字段取值配置为internal,表示希望broker之间通过内部网络进行通信。

启动broker

完成上述4个参数的配置后,保存修改并退出到kafka根目录,执行如下命令启动broker1。然后按照相同步骤配置并启动broker2和broker3

nohup ./bin/kafka-server-start.sh ./config/server.properties &
通过百度云内部网络访问您的kafka集群

除了公网自建实例外,目前dts还支持下游是bbc或bcc自建的kafka集群。由于集群部署在百度云上,用户可以选择绑定eip后,让dts通过公网访问kafka集群,或直接通过百度云内部网络访问kafka集群。

公网访问请参见上一章节,本章节介绍如何配置bbc/bcc自建的kafka集群,使dts能够通过百度云内部网络访问kafka集群。

查询pnet ip

在百度云内部网络中,pnet ip用于唯一标识某一虚机实例,dts使用pnet ip才能在百度云内部网络中正确访问到您的kafka集群。在自己的bbc/bcc实例命令行执行如下命令,可以获得实例的pnet ip

curl http://169.254.169.254/2009-04-04/meta-data/public-ipv4

这里同样以broker1为例(pnet ip:10.0.0.1,内网ip:192.168.0.1),修改server.properties中的4个网络通信配置项,各个配置项含义详见上文通过公网访问章节。

listeners
listeners=internal://192.168.0.1:9092,external://192.168.0.1:19092

这里internal配置的ip是百度云vpc内网ip,您可以在bcc或bbc的实例详情页查询到实例的内网ip

external标签下的listener表示通过pnet ip访问broker的连接信息,注意:这里配置的ip应当与internal标签下的ip保持一致,但端口务必要不同

advertised.listeners
advertised.listeners=internal://192.168.0.1:9092,external://10.0.0.1:19092

这里external标签对应的advertised.listeners配置为pnet ip:监听端口,internal标签的内容与配置项listeners一致

listener.security.protocol.map
listener.security.protocol.map=internal:plaintext,external:plaintext

这里您可以按照自己的需要为不同的连接方式配置不同的安全协议。示例中默认为internal和external都配置了无访问控制(plaintext)的安全协议。

inter.broker.listener.name
inter.broker.listener.name=internal

示例中将字段取值配置为internal,表示broker之间可以通过百度云vpc子网进行通信。

操作步骤

1.创建迁移任务

在dts的管理控制台点击左侧【数据迁移】tab标签,点击【新建迁移任务】按钮

如果您的源端是rds for mysql实例,则源端位置选择百度智能云数据库,如果是公网/bbc/bcc自建mysql实例,则源端位置选择自建数据存储。目标端位置选择自建数据存储

然后点击【下一步】,完成购买配置。页面会跳转到管理控制台任务列表页,列表页的最上方会新增一个未配置状态的dts任务,就是您刚刚创建的迁移任务,点击【配置任务】即可进行任务配置。如果页面跳转后未发现列表页出现新任务,建议等待一段时间刷新页面

2.配置任务

首先进入任务连接配置页,图中以源端为百度智能云数据库为例,按要求选择源端rds for mysql实例即可

配置目标端连接信息时,首先要根据目标端kafka集群的访问方式选择接入类型。

若目标端为百度消息服务主题,则接入类型选择百度消息服务,并选择相应的地域和主题id

若目标端为自建kafka集群,则如下图所示,按照公网访问和百度云内部网络访问两种方式选择对应的接入类型。

注意:broker列表中的ip必须填pnet ip

然后按要求填入其他信息即可。注意:目前dts仅支持0.9和0.10版本的kafka集群,并支持0.10版本kafka集群配置sasl访问控制

点击【授权白名单进入下一步】,选择源端实例的迁移对象。

点击【保存并预检查】,完成新建任务,然后在任务列表查看任务状态。

  • 状态列显示“前置检查通过”,可以勾选并启动迁移任务,任务启动后可以在任务进度列查看迁移进度。

  • 状态列显示“前置检查失败”,点击旁边的按钮查看失败原因并修改,重新启动检查直到成功后再启动迁移任务。

前置检查项详细解释参见:数据迁移操作指南-预检查

3.启动迁移

前置检查通过后,可以在任务列表页启动任务

4.检查kafka集群数据

任务启动后,dts将从源端数据库实例拉取到全量基准增量变更数据,并以固定的格式写入目标端kafka集群的指定topic中。具体格式如下:

//json结构
[   //最外层是数组
{   //第一行记录,一条message可能包含1到多行记录
    "time":"20180831165311",          //时间戳
    "global_id":"xxxxxxxx",           //全局唯一id,消费者可用此id来去重
    "database":"db1",                 //数据库名
    "schema":"schema1",               //schema名,仅在上游为postgresql、sqlserver时存在
    "table":"tbl1",                   //表名
    "type":"u",                       //变更类型,i为insert,u为update,d为delete
    "old_values":{                    //变更前每列的”列名”:”列值”组,变更类型为i时无old_values
        "key1":"old-value1",
        "key2":"old-value2",
        ...
    },
    "new_values":{                    //变更后每列的”列名”:”列值”组,变更类型为d时无new_values
        "key1":"new-value1",
        "key2":"new-value2",
        ...
    },
    "offset":{                        //该条记录的位置信息,仅在上游为mysql时的增量迁移阶段有值,对消费者无用
        "binlog_name":"mysql-bin.xxx",
        "binlog_pos":"xxx",
        "gtid":"server_id:transaction_id"   //仅在使用gtid方式同步时存在
    }
},  
{   //第二行记录
    "time":"20180831165311",
    "database":"db1",
    ...
}
... //更多行记录
]
网站地图