当前位置: 首页 > news >正文

11_Pulsar Adaptors适配器、kafka适配器、Spark适配器

2.3. Pulsar Adaptors适配器
2.3.1.kafka适配器
2.3.2.Spark适配器

2.3. Pulsar Adaptors适配器

2.3.1.kafka适配器

Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。
在生产者中, 如果想不改变原有kafka的代码架构, 就切换到Pulsar的平台中, 那么Pulsar adaptor on kafka就变的非常的有用了, 它可以帮助我们在不改变原有kafka的代码基础上, 即可接入pulsar, 但是需要注意, 相关配置信息需要进行一些调整, 例如: 地址与topic

  • 1- 需要导入Pulsar集成kafka的依赖包, 删除掉原有Kafka-client包
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-kafka</artifactId> <version>2.8.0</version> 
</dependency>

注: 目前Pulsar并在Maven中央仓库中并没有提供Pulsar-client-kafka 2.8.1的包, 故此处导入2.8.0

  • 2-编写生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaAdaptorProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//1. 创建kafka生产者的核心类对象: KafkaProducer// 1.1: 创建生产者配置对象: 设置相关配置Properties props = new Properties();props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");// 消息的确认方案props.put("acks", "all");// key序列化类型props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value 序列化类型props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props); //2. 发送数据 for (int i = 0; i < 10; i++) { //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i)); producer.send(producerRecord).get(); }//3. 释放资源 producer.close();}}
  • 3-编写消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class KafkaAdaptorConsumer {public static void main(String[] args) {//1. 创建kafka的消费者的核心对象: KafkaConsumer//1.1: 创建消费者配置对象, 并设置相关的参数:Properties props = new Properties();props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");//消费者组的 idprops.setProperty("group.id", "test");//是否启动消费者自动提交消费偏移量props.setProperty("enable.auto.commit", "true");//每间隔多长时间提交一次偏移量:单位 毫秒props.setProperty("auto.commit.interval.ms","1000");//key 反序列化props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//val 发序列化props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//2. 给消费者设置订阅topic:consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1"));//3. 循环获取相关的消息数据while (true) {//3.1: 从kafka中获取消息数据: 参数表示等待超时时间//注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息for (ConsumerRecord<String, String> record : records) {String massage = record.value();System.out.println("消息数据为:"+massage);}} } 
}
  • 4- 先运行消费者, 进行监听, 然后运行生产者, 观察消费者是否可以正常消费到数据
    在这里插入图片描述

2.3.2.Spark适配器

Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从 Pulsar 接
收原始数据。

应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可
以通过多种方式对其进行处理。

  • 1-导入相关的依赖包
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>2.8.0</version>
</dependency>
  • 2-编写spark的流式代码
String serviceUrl = "pulsar://localhost:6650/"; 
String topic = "persistent://public/default/test_src"; 
String subs = "test_sub"; 
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example"); 
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60)); 
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData(); 
Set<String> set = new HashSet<>(); 
set.add(topic); 
pulsarConf.setTopicNames(set); 
pulsarConf.setSubscriptionName(subs); 
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver( 
serviceUrl, 
pulsarConf, 
new AuthenticationDisabled()); 
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

相关文章:

11_Pulsar Adaptors适配器、kafka适配器、Spark适配器

2.3. Pulsar Adaptors适配器 2.3.1.kafka适配器 2.3.2.Spark适配器 2.3. Pulsar Adaptors适配器 2.3.1.kafka适配器 Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。 在生产者中, 如果想不改变原有kafka的代码架构, 就切换到Pulsar的…...

jupyter文档转换成markdown

背景 上一篇文章**《如何优雅地用python生成模拟数据》**我就使用jupyter写的&#xff0c;这个真的是万能的&#xff0c;可以插入markdown格式的内容&#xff0c;也可写代码&#xff0c;关键是像ipython一样&#xff0c;可以分步执行。 我可以这样自由的写我的博客内容&#x…...

日志框架及其使用方法

log4j和logBack,同一个人写的&#xff0c;logBack为log4j的升级版&#xff0c;SpringBoot中默认集成logBack 作用&#xff1a;记录软件发布后的一些bug,以及数据是怎样被操作的 传统开发弊端&#xff1a; 1.日志直接输出在控制台&#xff0c;关闭控制台后&#xff0c;日志消…...

ZIG:理解未来编程语言的视角

文章目录 摘要&#xff1a;引言&#xff1a;性能简洁性和模块化避免常见错误和陷阱总结&#xff1a;参考资料&#x1f4d1;: 摘要&#xff1a; 本文介绍了新兴编程语言ZIG的目标和特点&#xff0c;包括高性能、简洁性和模块化&#xff0c;并分析了这些特点是如何通过语言设计来…...

让三驾马车奔腾:华为如何推动空间智能化发展?

上个月&#xff0c;国务院常务会议审议通过了《关于促进家居消费的若干措施》&#xff0c;其中明确提出了“推动单品智能向全屋智能发展创新培育智能消费”“开展数字家庭建设试点”等推动全屋智能拼配发展的建议与方案。 可以说&#xff0c;以整屋为单位的空间智能品类&#x…...

2022年03月 Python(一级)真题解析#中国电子学会#全国青少年软件编程等级考试

一、单选题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 第1题 已知a“161”&#xff0c;b“16”&#xff0c;c“8”,执行语句da>b and a>c&#xff0c;变量d的值为是&#xff1f; A&#xff1a;0 B&#xff1a;1 C&#xff1a;True D&am…...

WIN大恒工业相机SDK开发

大恒工业相机SDK开发概览 一、开发环境搭建1、C# 环境配置&#xff08;VS2019&#xff09;2、C 环境配置&#xff08;VS2019&#xff09;3、python 环境配置&#xff08;Pycharm&#xff09; 二、相机二次开发流程三、相机相机属性参数配置四、图像采集单帧采集回调采集 注意事…...

qt qml中各种Layout之间是如何对齐的?

问题描述&#xff1a; qt qml中下一个RowLayout如何对齐顶部到上方的ColumnLayout的底部略低一些间隔的位置&#xff1f; 我们怎么使用achors去锚定位置&#xff1f; 这些都是可以用anchors锚定属性&#xff0c;以及margin来设置的。 解决办法&#xff1a; 要实现将下一个R…...

Immutable.js 进行js的复制

介绍 在提供不可变&#xff08;Immutable&#xff09;数据结构的支持。不可变数据是指一旦创建后就不能被修改的数据&#xff0c;每次对数据进行更新都会返回一个新的数据对象&#xff0c;而原始数据保持不变。 使用 日常中我们使用的拷贝 (1) var arr { } ; arr2 arr ; …...

java动态生成excel并且需要合并单元格

java动态生成excel并且需要合并单元格 先上图看一下预期效果 集成poi <dependency><groupId>cn.afterturn</groupId><artifactId>easypoi-base</artifactId><version>4.0.0</version> </dependency> <dependency><…...

JMeter启动时常见的错误

很多小伙伴在学工具这一块时&#xff0c;安装也是很吃力的一个问题&#xff0c;之前记得有说过怎么安装jmeter这个工具。那么你要启动jmeter的时候&#xff0c;一些粉丝就会碰到如下几个问题。 1.解压下载好的jmeter安装&#xff0c;Windows 平台&#xff0c;双击 jmeter/bin …...

python pandas 排序

Series的排序&#xff1a; Series.sort_values(ascendingTrue, inplaceFalse) 参数说明&#xff1a; ascending&#xff1a;默认为True升序排序&#xff0c;为False降序排序inplace&#xff1a;是否修改原始Series DataFrame的排序&#xff1a; DataFrame.sort_values(by, as…...

前后端分离式项目架构流程复盘之宿舍管理系统

文章目录 &#x1f412;个人主页&#x1f3c5;JavaEE系列专栏&#x1f4d6;前言&#xff1a;【&#x1f387;前端】先创建Vue-cli项目&#xff08;版本2.6.10&#xff0c;仅包含babel&#xff09;&#xff0c;请选择此项目并创建 【整理简化项目模板】【&#x1f380;创建路由】…...

Linux nohup 命令详解

nohup是Linux/Unix系统中非常有用的命令之一。它允许您在后台运行命令或脚本&#xff0c;并且在退出终端会话后仍然保持运行。这对于长时间运行的任务或进程非常有用&#xff0c;特别是当您需要离开终端但希望任务继续运行时。 nohup命令语法 nohup命令的基本语法如下&#x…...

VoxWeekly|The Sandbox 生态周报|20230731

欢迎来到由 The Sandbox 发布的《VoxWeekly》。我们会在每周发布&#xff0c;对上一周 The Sandbox 生态系统所发生的事情进行总结。 如果你喜欢我们内容&#xff0c;欢迎与朋友和家人分享。请订阅我们的 Medium 、关注我们的 Twitter&#xff0c;并加入 Discord 社区&#xf…...

编程导航算法村第九关 | 二分查找

编程导航算法村第九关 | 二分查找 LeetCode852.这个题的要求有点啰嗦&#xff0c;核心意思就是在数组中的某位位置i开始&#xff0c;从0到i是递增的&#xff0c;从i1 到数组最后是递减的&#xff0c;让你找到这个最高点。 详细要求是&#xff1a;符合下列属性的数组 arr 称为山…...

linux 下安装部署flask项目

FlaskDemo 命名为test.py # codingutf-8 from flask import Flaskapp Flask(__name__)app.route("/") def index():return "test"if __name__ __main__:app.debug True# 这里host一定要写0.0.0.0 写127.0.0.1的无法访问 ——_——app.run(host"0.…...

在Vue里,将当前窗口截图,并将数据base64转为png格式传给服务器

目录 前言 1、将当前窗口截图&#xff0c;并将数据存储下来 2、定义将base64转png的方法 3、完整代码 总结 前言 记录来源于需求 1、将当前窗口截图&#xff0c;并将数据存储下来 export default { data() {return {image: // 存储数据} }mounted() {setTimeout(() >…...

Echarts图表Java后端生成Base64图片格式,POI写入Base64图片到Word中

Echarts图表Java后端生成请看上篇&#xff0c;此篇为Base64图片插入Word文档中Java后台生成ECharts图片,并以Base64字符串返回_青冘的博客-CSDN博客 try {XWPFParagraph xwpfParagraphimage doc.createParagraph(); // 创建图片段落xwpfParagraphimage.setAlignment(Paragraph…...

【AI】《动手学-深度学习-PyTorch版》笔记(十二):从零开始实现softmax回归

AI学习目录汇总 1、什么是特征? 对于图像算法,每个像素可以视为一个特征,例如图像的分辨率为28x28,则有784个特征。而且常常将二维的图像像素矩阵展开为长度为784的向量。 2、权重和偏置的规模 本例中,将使用Fashion-MNIST数据集,它是一个服装分类数据集,可以将服装…...

Unity哥特UI资源包:SDF字体与Shader Graph工程化实践

1. 为什么哥特UI在游戏开发中长期被低估&#xff0c;又为何现在必须认真对待“哥特UI”这个词&#xff0c;很多Unity开发者第一反应是&#xff1a;不就是黑底、尖角、浮雕字、带玫瑰纹样的按钮吗&#xff1f;配个暗红渐变完事。我2019年接手一个中世纪黑暗奇幻RPG时也这么想——…...

2026免费照片去水印软件app排行榜 | 照片去水印怎么去?最新推荐工具对比

照片水印去除需求在2026年越来越普遍&#xff0c;无论是整理个人相册还是做内容素材处理&#xff0c;找到一款趁手的去水印工具能节省大量时间。本文对标当前免费照片去水印软件app的主流选择进行了全面测评&#xff0c;并整理了一份排行榜式的推荐清单&#xff0c;帮你快速定位…...

1987年7月14日晚上19-21点出生性格、运势和命运

1987年6月28日&#xff0c;距离二十四节气中的“小暑”&#xff08;通常在7月6-8日&#xff09;约8-10天。小暑意为“天气开始炎热但未到极致”&#xff0c;是盛夏的序曲。这个时节的哲学&#xff0c;与个人成长有着奇妙的呼应。性格的“小暑特质”&#xff1a;温润与韧性 小暑…...

智能驾驶系统场景下的自动化仿真测试评价技术【附仿真】

✨ 长期致力于智能驾驶系统、有效性评价、测试用例生成、测试场景优化、自动化仿真测试平台研究工作&#xff0c;擅长数据搜集与处理、建模仿真、程序编写、仿真设计。 ✅ 专业定制毕设、代码 ✅ 如需沟通交流&#xff0c;点击《获取方式》 &#xff08;1&#xff09;基于复杂度…...

5分钟快速上手!网易云无损音乐下载完整指南:免费获取高品质音乐

5分钟快速上手&#xff01;网易云无损音乐下载完整指南&#xff1a;免费获取高品质音乐 【免费下载链接】Netease_url 网易云无损解析 项目地址: https://gitcode.com/gh_mirrors/ne/Netease_url 想要免费获取网易云音乐的无损音质歌曲吗&#xff1f;Netease_url项目让你…...

深度解密:如何彻底掌控Windows Defender的系统级权限与持久化配置

深度解密&#xff1a;如何彻底掌控Windows Defender的系统级权限与持久化配置 【免费下载链接】defender-control An open-source windows defender manager. Now you can disable windows defender permanently. 项目地址: https://gitcode.com/gh_mirrors/de/defender-con…...

上海AI实验室发布WildClawBench:AI智能体究竟能走多远?

这项由上海人工智能实验室联合香港中文大学、复旦大学、中国科学技术大学、上海交通大学、清华大学、浙江大学及南洋理工大学等多所顶尖机构共同完成的研究&#xff0c;于2026年5月11日以预印本形式发布&#xff0c;论文编号为arXiv:2605.10912v1。感兴趣的读者可通过该编号在a…...

CSDN热门文章评论区运营心法——从技术答疑到社区共建的进阶之路

评论区&#xff0c;是技术内容的第二战场。你发出去的文章只是第一招&#xff0c;真正的对话从这里开始。 引言&#xff1a;为什么评论区是"第二战场" 技术写作圈有个不成文的共识&#xff1a;文章发出去&#xff0c;战斗才刚开始。 很多人把写完文章当成终点&…...

markdownReader:3分钟快速上手,彻底告别Chrome中Markdown文件的混乱显示

markdownReader&#xff1a;3分钟快速上手&#xff0c;彻底告别Chrome中Markdown文件的混乱显示 【免费下载链接】markdownReader markdownReader is a extention for chrome, used for reading markdown file. 项目地址: https://gitcode.com/gh_mirrors/ma/markdownReader …...

微信好友关系检测完整指南:快速找出谁删了你

微信好友关系检测完整指南&#xff1a;快速找出谁删了你 【免费下载链接】WechatRealFriends 微信好友关系一键检测&#xff0c;基于微信ipad协议&#xff0c;看看有没有朋友偷偷删掉或者拉黑你 项目地址: https://gitcode.com/gh_mirrors/we/WechatRealFriends 你是否曾…...