当前位置: 首页 > news >正文

Kafka-Connect自带示例

一、上下文

《Kafka-Connect》中已经阐述了Kafka-Connect的理论知识,为了更生动的理解它,我们今天通过官方的一个小例子来感受下它的妙用。

二、创建topic

kafka-topics --create --topic connect-test --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

三、编写配置文件

在cdh环境中,这些配置文件所在的目录为:

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/

1、connect-standalone.properties

# 用于建立与Kafka集群的初始连接的主机/端口对列表。以下是cdh中的例子
bootstrap.servers=cdh1:9092,cdh2:9092,cdh3:9092

# 转换器指定Kafka中数据的格式以及如何将其转换为Connect数据。每个Connect用户都需要根据他们希望从Kafka加载或存储数据时使用的格式进行配置
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 转换器特定的设置可以通过在转换器的设置前加上我们想要应用的转换器来传递
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# 刷新速度比正常情况快得多,这对测试/调试很有用
offset.flush.interval.ms=10000

# 设置为用逗号(,)分隔的文件系统路径列表,以启用插件(连接器、转换器、转换)的类加载隔离。该列表应由顶级目录组成,其中包括以下内容的任意组合:
# a) 直接包含带有插件及其依赖项的jar的目录
# b) uber包含插件及其依赖项
# c) 直接包含插件类及其依赖项的包目录结构的目录
# 注意:将遵循符号链接来发现依赖关系或插件。
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=

2、connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/source.txt
topic=connect-test

3、connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/sink.txt
topics=connect-test

四、运行

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/
./connect-standalone.sh /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-standalone.properties /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-source.properties /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-sink.properties

 启动成功后如图所示:

五、测试

我们项source.txt 中写入一些数据

echo 1 >> source.txt
echo 2 >> source.txt
echo 3 >> source.txt
echo 4 >> source.txt
echo 5 >> source.txt
echo 6 >> source.txt
echo 7 >> source.txt
echo 8 >> source.txt
echo 9 >> source.txt
echo 10 >> source.txt

从结果上看感觉sink.txt中结果是乱序的,这是因为我们创建topic时设置了2个分区,我们用consoumer来看看各个分区的顺序情况:

kafka-console-consumer --topic connect-test --from-beginning --bootstrap-server cdh1:9092,cdh2:9092,cdh3:9092 --partition 0

kafka-console-consumer --topic connect-test --from-beginning --bootstrap-server cdh1:9092,cdh2:9092,cdh3:9092 --partition 1

 

我们再次看sink.txt的结果就可以理解了,kafka只保证了分区有序,如果使用Kafka-Connect时想保证文件的输入和输出是有序的,就需要设定topic为1个分区。

相关文章:

Kafka-Connect自带示例

一、上下文 《Kafka-Connect》中已经阐述了Kafka-Connect的理论知识,为了更生动的理解它,我们今天通过官方的一个小例子来感受下它的妙用。 二、创建topic kafka-topics --create --topic connect-test --bootstrap-server cdh1:9092 --partitions 2 -…...

Hbase应用案例 随机号码生成

Hbase应用案例1 随机号码生成 在Hbase中插入如下格式的数据,数据内容随机生成 名称示例说明phonenumber158randomrowkey,号码dnum199randomcolumn,另一位通话者lengthrandomcolumn,时长valuerandomcolumn,接收或拨打…...

论文阅读——量子退火Experimental signature of programmable quantum annealing

摘要:量子退火是一种借助量子绝热演化解决复杂优化问题的通用策略。分析和数值证据均表明,在理想化的封闭系统条件下,量子退火可以胜过基于经典热化的算法(例如模拟退火)。当前设计的量子退火装置的退相干时间比绝热演…...

(长期更新)《零基础入门 ArcGIS(ArcMap) 》实验二----网络分析(超超超详细!!!)

相信实验一大家已经完成了,对Arcgis已进一步熟悉了,现在开启第二个实验 ArcMap实验--网络分析 目录 ArcMap实验--网络分析 1.1 网络分析介绍 1.2 实验内容及目的 1.2.1 实验内容 1.2.2 实验目的 2.2 实验方案 2.3 实验流程 2.3.1 实验准备 2.3.2 空间校正…...

go语言 Pool实现资源池管理数据库连接资源或其他常用需要共享的资源

go Pool Pool用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的goroutine之间共享及独立使用的资源。这种模式在需要共享一组静态资源的情况(如共享数据库连接或者内存缓冲区)下非 常有用。如果goroutine需要从池里得到这些资…...

mysql一个事务最少几次IO操作

事务的IO操作过程 开始事务:用户发起一个事务,例如执行START TRANSACTION;,此时事务开始。读取和修改数据:用户读取和修改数据时,InnoDB首先从Buffer Pool查找所需的数据页。如果数据页不在Buffer Pool中,…...

运输层总结

运输层协议:端到端协议 面向连接的传输控制协议 TCP无连接的用户数据报协议 UDP - 主要任务:为相 互通信的应用进程 提供 逻辑通信服务 - 屏蔽:运输层向高层用户 屏蔽 了下面网络核心的细节(如网络拓扑、所采用 的路由选择协议等…...

【嵌套查询】.NET开源 ORM 框架 SqlSugar 系列

.NET开源 ORM 框架 SqlSugar 系列 【开篇】.NET开源 ORM 框架 SqlSugar 系列【入门必看】.NET开源 ORM 框架 SqlSugar 系列【实体配置】.NET开源 ORM 框架 SqlSugar 系列【Db First】.NET开源 ORM 框架 SqlSugar 系列【Code First】.NET开源 ORM 框架 SqlSugar 系列【数据事务…...

React 前端框架1

一、React 简介 (一)什么是 React React 是一个用于构建用户界面的 JavaScript 库,由 Facebook 开源并维护。它采用了组件化的开发思想,允许开发者将复杂的 UI 拆分成一个个独立、可复用的小组件,就如同搭积木一般&am…...

【真正离线安装】Adobe Flash Player 32.0.0.156 插件离线安装包下载(无需联网安装)

网上很多人声称并提供的flash离线安装包是需要联网才能安装成功的,其实就是在线安装包,而这里提供的是真正的离线安装包,无需联网即可安装成功。 点击下面地址下载离线安装包: Adobe Flash Player 32.0.0.156 for IE Adobe Fla…...

数据采集时,不同地区的动态IP数据质量有什么差异?

在数据采集的广阔世界中,动态IP扮演着至关重要的角色。它们不仅帮助我们突破地域限制,还能够提供多样化的数据来源。但是,不同地区的动态IP在数据质量上是否存在差异呢?本文将探讨这一问题,并为您提供实用的见解。 动…...

【Python爬虫五十个小案例】爬取猫眼电影Top100

博客主页:小馒头学python 本文专栏: Python爬虫五十个小案例 专栏简介:分享五十个Python爬虫小案例 🐍引言 猫眼电影是国内知名的电影票务与资讯平台,其中Top100榜单是影迷和电影产业观察者关注的重点。通过爬取猫眼电影Top10…...

等保测评和 ISO27001 都是信息保护,区别是什么?

ISO27001 和等级保护(等保)都是信息安全领域重要的标准和制度,但它们在多个方面存在区别: 定义和性质 ISO27001 它是国际标准化组织(ISO)发布的信息安全管理体系标准,其目的是帮助组织建立、实…...

Linux系统编程之进程创建

概述 在Linux系统中,通过创建新的进程,我们可以实现多任务处理、并发执行和资源隔离等功能。创建进程的主要方法为:fork、vfork、clone。下面,我们将分别进行介绍。 fork fork是最常用的创建新进程的方法。当一个进程调用fork时&a…...

JAVA-IO

目录 IO流 一 字节流 1 FileOutStream 1 书写: 2 换行书写与续写: 2 FileInputStream 1 读取数据 2 循环读取: 二 字符流 1 FileReader 1 空参的read()方法读取数据: 2 有参的read()方法读取数据: 3 指定字…...

动态系统特征分析:特征向量、特征值、频率与阻尼比、参与因子计算方法

特征值和特征向量在动态系统分析中是核心工具,广泛用于电力系统小信号稳定性、机械系统模态分析等领域。以下详细介绍计算方法及应用。 1. 求解特征值与特征向量 对于一个 n n n\times n nn的系统矩阵 A A A: 右特征向量与特征值 特征值( λ \lambd…...

乐鑫发布 esp-iot-solution v2.0 版本

今天,乐鑫很高兴地宣布,esp-iot-solution v2.0 版本已经发布,release/v2.0 分支下的正式版本组件将为用户提供为期两年的 Bugfix 维护(直到 2027.01.25 ESP-IDF v5.3 EOL)。该版本将物联网开发中常用的功能进行了分类整…...

动态代理如何加强安全性

在当今这个信息爆炸、网络无孔不入的时代,我们的每一次点击、每一次浏览都可能留下痕迹,成为潜在的安全隐患。如何在享受网络便利的同时,有效保护自己的隐私和信息安全,成为了每位网络使用者必须面对的重要课题。动态代理服务器&a…...

Flutter 之 InheritedWidget

InheritedWidget 是 Flutter 框架中的一个重要类,用于在 Widget 树中共享数据。它是 Flutter 中数据传递和状态管理的基础之一。通过 InheritedWidget,你可以让子 Widget 在不需要显式传递数据的情况下,访问祖先 Widget 中的数据。这种机制对…...

AI 助力开发新篇章:云开发 Copilot 深度体验与技术解析

本文 一、引言:技术浪潮中的个人视角1.1 AI 和低代码的崛起1.2 为什么选择云开发 Copilot? 二、云开发 Copilot 的核心功能解析2.1 自然语言驱动的低代码开发2.1.1 自然语言输入示例2.1.2 代码生成的模块化支持 2.2 实时预览与调整2.2.1 实时预览窗口功能…...

告别断电重启就丢程序:深入聊聊紫光同创FPGA的Flash固化与CPLD内置eFlash配置差异

紫光同创FPGA与CPLD配置存储机制深度解析:从瞬态下载到永久固化的技术实现 在数字电路设计领域,FPGA和CPLD的可重构特性为硬件开发带来了极大灵活性。然而,这种灵活性背后需要可靠的配置存储机制作为支撑——断电后程序能否自动恢复&#xf…...

MyBinder实战:零配置在iPad上运行Python数据分析

1. 项目概述:当iPad遇上Python,一次环境配置的“降维打击” 几年前,当我第一次在编程工作坊里,看到有学员掏出iPad,一脸期待地问我“老师,这个能跑今天的代码吗?”时,我的回答通常是…...

Jupyter C内核:在Notebook中实现C语言交互式编程的完整指南

Jupyter C内核:在Notebook中实现C语言交互式编程的完整指南 【免费下载链接】jupyter-c-kernel Minimal Jupyter C kernel 项目地址: https://gitcode.com/gh_mirrors/ju/jupyter-c-kernel Jupyter C内核是一个开源项目,为Jupyter Notebook提供完…...

计算机图形学——四、光栅化与消隐

第四章 光栅转化与消隐 重点总结 一、光栅转化(Rasterization) 定义:把用数学描述的图形(如三角形)变成屏幕上一个个像素点。 1. 多边形扫描转换 顶点表示 → 点阵表示:把多边形的顶点坐标,转成…...

FModel实战指南:UE4/5游戏pak资源提取与3D模型导出

1. 为什么是FModel?——当UE4/5游戏资源提取变成“开箱即用”的工程问题你刚下载完《堡垒之夜》最新赛季的离线安装包,或者拿到一份《黑神话:悟空》的测试版本地资源目录,双击打开后只看到一堆命名像WindowsNoEditor.pak、Content…...

通过Taotoken审计日志功能追踪与管理团队内部的API调用行为

🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 通过Taotoken审计日志功能追踪与管理团队内部的API调用行为 在团队协作使用大模型API进行开发时,一个常见的管理难题是…...

“AI点单员”真的能替代人工吗?——基于237家门店AB测试的转化率、客单价、复购率三重数据验证(含原始数据集索引)

更多请点击: https://kaifayun.com 第一章:AI Agent餐饮行业应用 AI Agent正以前所未有的深度融入餐饮行业全链路,从智能点餐、后厨协同到供应链优化与顾客情感分析,其核心价值在于将静态规则系统升级为具备感知、推理与自主决策…...

如何用t3mujinpack为你的Darktable照片添加经典胶片质感:新手完整指南

如何用t3mujinpack为你的Darktable照片添加经典胶片质感:新手完整指南 【免费下载链接】t3mujinpack Collection of film emulation presets for open-source RAW developer software Darktable. 项目地址: https://gitcode.com/gh_mirrors/t3/t3mujinpack 你…...

简单说明--程序系统如何对用户身份证实名认证接口api

程序系统对注册用户身份认证,接口将【身份证号码、姓名】上传至接口API判断是否匹配 请求数据: bodys.put("idNo", "330421190210182345"); bodys.put("name", "张某某");响应数据: {"name&quo…...

使用Taotoken多模型API为嵌入式项目提供智能对话辅助

🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 使用Taotoken多模型API为嵌入式项目提供智能对话辅助 对于使用Keil5等传统IDE进行嵌入式开发的工程师而言,为设备增添自…...