快速启动 – 从MySQL 抽取数据到Kafka
简介
Kafka Connect是一种分布式,可扩展,容错的服务,旨在可靠地在Kafka和其他数据系统之间传输数据。 数据从数据源生成并消耗到数据需要被接受的地方。
本快速启动教程,旨在通过在BDOS上面建立两种类型的Kafka Connect把来源于MySQL中的数据用Source Connector写入到Kafka,然后用Sink Connector写出到HDFS中的过程,来快速引导你去制定你的企业需要的生产需求
Kafka Connector可以分为Source和Sink两种类型:
– Source类型: 可以把整个数据库或从应用程序服务器收集消息数据送到Kafka Topic((消息归类)),使数据可用于低延迟的流处理
– Sink类型: 可以将数据从Kafka Topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析
建立一个Kafka Source Connector
步骤1:从快速体验里面点击第5个快速入口“从MySQL抽取数据到Kafka” 进入Kafka Connector创建界面
BDOS Online用户默认使用系统内置的Kafka Connect Cluster(集群)(BDOS 企业版用户可以自定义Kafka集群,企业版详情)
在界面右上方点击按钮“+ 添加 Connector”,随后在弹窗里面右上角connector 类型为“JdbcSourceConnector”
使用下面的填写信息,先不使用“高级功能按钮”
填写栏位 | 填写数值 | 填写方式 |
---|---|---|
name | my_source_connector | 手动填写 |
database.source | demo-mysql | 下拉选框选择 |
table.whitelist | product | 下拉选框选择 |
mode | incrementing | 下拉选框选择 |
incrementing.column.name | id (类型:INT UNSIGNED ) | 下拉选框选择 |
topic.prefix | 默认prefix_test | 在给的默认值后面填写 |
步骤2:可以看到操作记录窗口弹出,这会告诉你,新建Kafka集群的实时日志
步骤3:新建成功
查看通过Kafka Source Connect建立的Topic,并查看数据流
步骤1:点击左侧边栏的导航选择Kafka菜单,点击Kafka Topic
步骤2:在Topic 列表里面可以看到刚刚通过建立Source Connector得到的一个Topic(名字 = 系统默认前缀名 + 自己取的topic名字 + table.whitelist的名字)
步骤3:点击topic的名字查看详情
步骤4:可以看到原本在Mysql里面的数据,被以时间分区为单位的形流入Kafka
步骤5:继续浏览页面,或者关闭进入下一个步骤
建立一个Kafka Sink Connector
与建立Kafka Source Connector步骤类似
步骤1:从快速体验里面点击第5个快速入口“从MySQL抽取数据到Kafka” 进入Kafka Connector创建界面
在界面右上方点击按钮“+ 添加 Connector”,随后在弹窗里面右上角connector的下拉框中选择类型为“HdfsSinkConnector”
使用下面的填写信息,先不使用“高级功能按钮”
填写栏位 | 填写数值 | 填写方式 |
---|---|---|
name | my_sink_connector | 手动填写 |
hdfs.url | 默认 | 不需要填写 |
topics | 在上一步查看过的topic(查看上一节步骤2) | 下拉选框选择 |
topics.dir | 默认 | 不需要填写 |
logs.dir | 默认 | 不需要填写 |
步骤2:可以看到操作记录窗口弹出,这会告诉你,新建Kafka集群的实时日志
步骤3:新建成功
新建一个Hive程序
步骤1:点击左侧边栏的导航选择“程序管理”菜单,点击Hive
步骤2:在弹出窗口,右上角点击按钮“+ 添加程序”
步骤3:按照下表,填写基本信息
填写内容 | 填写内容 | 填写方式 |
---|---|---|
名称 | my_hive | 手动填写 |
拥有者 | 选择为自己 | 下拉框选择 |
描述 | 任意描述 | 手动填写 |
使用Hive语句,去查看数据
步骤1:在编辑区域写Hive语句({数据库.}不写,默认是default,如果采集选择的数据是你的用户名,请填写用户名),去浏览现在处于流水线当中的数据
Show tables;
Select * from {数据库.}mysql_product limit 10;
步骤2:点击按钮“运行”
步骤3:查看运行结果
【附】
BDOS企业版可以在非常多的应用场景使用Kafka服务,例如:
1. 将SQL表(或整个SQL数据库)导入到Kafka中
2. 将Kafka的topic(消息归类)用于HDFS以进行批处理
3. 将Kafkatopic(消息归类)用于Elastic Search以进行二级索引
4. 将遗留系统与Kafka框架集成,或者其他的更多…
留言
评论
${{item['author_name']}} 回复 ${{idToContentMap[item.parent] !== undefined ? idToContentMap[item.parent]['author_name'] : ''}}说 · ${{item.date.slice(0, 10)}} 回复
暂时还没有一条评论.