快速启动 – 从MySQL 抽取数据到Kafka

by Norman Wang 2018-08-01

简介

Kafka Connect是一种分布式,可扩展,容错的服务,旨在可靠地在Kafka和其他数据系统之间传输数据。 数据从数据源生成并消耗到数据需要被接受的地方。

本快速启动教程,旨在通过在BDOS上面建立两种类型的Kafka Connect把来源于MySQL中的数据用Source Connector写入到Kafka,然后用Sink Connector写出到HDFS中的过程,来快速引导你去制定你的企业需要的生产需求

Kafka Connector可以分为Source和Sink两种类型:
– Source类型: 可以把整个数据库或从应用程序服务器收集消息数据送到Kafka Topic((消息归类)),使数据可用于低延迟的流处理
– Sink类型: 可以将数据从Kafka Topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析


建立一个Kafka Source Connector

步骤1:从快速体验里面点击第5个快速入口“从MySQL抽取数据到Kafka” 进入Kafka Connector创建界面

BDOS Online用户默认使用系统内置的Kafka Connect Cluster(集群)(BDOS 企业版用户可以自定义Kafka集群,企业版详情)

在界面右上方点击按钮“+ 添加 Connector”,随后在弹窗里面右上角connector 类型为“JdbcSourceConnector”

使用下面的填写信息,先不使用“高级功能按钮”

填写栏位 填写数值 填写方式
name my_source_connector 手动填写
database.source demo-mysql 下拉选框选择
table.whitelist product 下拉选框选择
mode incrementing 下拉选框选择
incrementing.column.name id (类型:INT UNSIGNED ) 下拉选框选择
topic.prefix 默认prefix_test 在给的默认值后面填写

步骤2:可以看到操作记录窗口弹出,这会告诉你,新建Kafka集群的实时日志
步骤3:新建成功

查看通过Kafka Source Connect建立的Topic,并查看数据流

步骤1:点击左侧边栏的导航选择Kafka菜单,点击Kafka Topic
步骤2:在Topic 列表里面可以看到刚刚通过建立Source Connector得到的一个Topic(名字 = 系统默认前缀名 + 自己取的topic名字 + table.whitelist的名字)
步骤3:点击topic的名字查看详情
步骤4:可以看到原本在Mysql里面的数据,被以时间分区为单位的形流入Kafka
步骤5:继续浏览页面,或者关闭进入下一个步骤

建立一个Kafka Sink Connector

与建立Kafka Source Connector步骤类似

步骤1:从快速体验里面点击第5个快速入口“从MySQL抽取数据到Kafka” 进入Kafka Connector创建界面

在界面右上方点击按钮“+ 添加 Connector”,随后在弹窗里面右上角connector的下拉框中选择类型为“HdfsSinkConnector”

使用下面的填写信息,先不使用“高级功能按钮”

填写栏位 填写数值 填写方式
name my_sink_connector 手动填写
hdfs.url 默认 不需要填写
topics 在上一步查看过的topic(查看上一节步骤2) 下拉选框选择
topics.dir 默认 不需要填写
logs.dir 默认 不需要填写

步骤2:可以看到操作记录窗口弹出,这会告诉你,新建Kafka集群的实时日志
步骤3:新建成功

新建一个Hive程序

步骤1:点击左侧边栏的导航选择“程序管理”菜单,点击Hive
步骤2:在弹出窗口,右上角点击按钮“+ 添加程序”
步骤3:按照下表,填写基本信息

填写内容 填写内容 填写方式
名称 my_hive 手动填写
拥有者 选择为自己 下拉框选择
描述 任意描述 手动填写

使用Hive语句,去查看数据

步骤1:在编辑区域写Hive语句({数据库.}不写,默认是default,如果采集选择的数据是你的用户名,请填写用户名),去浏览现在处于流水线当中的数据

Show tables;
Select * from {数据库.}mysql_product limit 10;

步骤2:点击按钮“运行”
步骤3:查看运行结果


【附】
BDOS企业版可以在非常多的应用场景使用Kafka服务,例如:
1. 将SQL表(或整个SQL数据库)导入到Kafka中
2. 将Kafka的topic(消息归类)用于HDFS以进行批处理
3. 将Kafkatopic(消息归类)用于Elastic Search以进行二级索引
4. 将遗留系统与Kafka框架集成,或者其他的更多…

留言

评论

${{item['author_name']}} 回复 ${{idToContentMap[item.parent] !== undefined ? idToContentMap[item.parent]['author_name'] : ''}} · ${{item.date.slice(0, 10)}} 回复

暂时还没有一条评论.