当前位置: 首页 > news >正文

java(kotlin)和 python 通过DoubleCloud的kafka进行线程间通信

进入

DoubleCloud
https://www.double.cloud
创建一个kafka
1 选择语言
2 运行curl 的url命令启动一个topic
3 生成对应语言的token
4 复制3中的配置文件到本地,命名为client.properties
5 复制客户端代码
对python和java客户端代码进行了重写,java改成了kotlin:

配置文件

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=
sasl.password=
group.id=
auto.offset.reset=earliest
# Best practice for higher availability in librdkafka clients prior to 1.7
session.timeout.ms=45000
import timefrom confluent_kafka import Producer, Consumer
import asyncio
import threadingclass KafkaClient:def __init__(self, config_file):self.config = self.read_config(config_file)def read_config(self, config_file):config = {}with open(config_file) as fh:for line in fh:line = line.strip()if len(line) != 0 and line[0] != "#":parameter, value = line.strip().split('=', 1)config[parameter] = value.strip()return configdef produce(self, topic, key, value):# Creates a new producer instanceproducer = Producer(self.config)# Produces a sample messageproducer.produce(topic, key=key, value=value)print(f"Produced message to topic {topic}: key = {key:12} value = {value:12}")# Send any outstanding or buffered messages to the Kafka brokerproducer.flush()def consume_async(self, topic, callback=None, group_id="python-group-1", auto_offset_reset="earliest"):# Sets the consumer group ID and offsetself.config["group.id"] = group_idself.config["auto.offset.reset"] = auto_offset_resetconsumer = Consumer(self.config)consumer.subscribe([topic])loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)if callback is not None:loop.run_until_complete(callback(consumer))def consume(self, topic, callback=None):thread = threading.Thread(target=self.consume_async, args=(topic, callback,))thread.start()return threadasync def consume_async(consumer):try:while True:msg = consumer.poll(1.0)if msg is not None:breakif not msg.error():key = msg.key().decode("utf-8")value = msg.value().decode("utf-8")print(f"Consumed message: key = {key:12} value = {value:12}")except KeyboardInterrupt:passfinally:consumer.close()config_file_path = ".\\client.properties"
topic = "test"
key = "key"
value = "value"kafka_client = KafkaClient(config_file_path)
kafka_client.produce(topic, key, value)
thread = kafka_client.consume(topic, consume_async)

配置文件

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='GHFXZDIOMQW3IPKA' password='TimUk7hj/EwTiB031lA95LeKfXN3t2Ddnw+izhKx3+7wFxZKMLGEqTOnneTKrlQQ';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000
topic=
group.id=
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
# Best practice for Kafka producer to prevent data loss
acks=all

java(kotiln)


import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.io.Closeable
import java.io.FileInputStream
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Duration
import java.util.*class KafkaClient<T, V> : Closeable {private var producer: KafkaProducer<T, V>? = nullprivate var fileConfig: Properties? = nullval TOPIC = "topic"private val DURATION = 100Lprivate val POOLSIZE = 10private val DISPATCHER = newFixedThreadPoolContext(POOLSIZE, "CoroutinePool")private val SCOPE = CoroutineScope(DISPATCHER)constructor(configPath: String? = null, config: Properties? = null) {if (config == null && configPath == null) throw Exception("don't have any config")var config1 = Properties()if (configPath != null) {fileConfig = readConfig(configPath)fileConfig?.let { config1.putAll(it) }}if (config != null) {config1.putAll(config)}producer = KafkaProducer(config1)}fun produce(key: T, value: V, topic: String? = null) {producer?.send(ProducerRecord(topic ?: (fileConfig?.getProperty(TOPIC) as String), key, value))}fun consume(func: suspend (ConsumerRecords<T, V>) -> Unit) {val consumer: KafkaConsumer<T, V> = KafkaConsumer(fileConfig)consumer.subscribe(Arrays.asList(fileConfig?.getProperty(TOPIC)))SCOPE.launch {while (true) {val records: ConsumerRecords<T, V> = consumer.poll(Duration.ofMillis(DURATION))func(records)delay(DURATION)}}}@Throws(IOException::class)fun readConfig(configFile: String): Properties {if (!Files.exists(Paths.get(configFile))) {throw IOException("$configFile not found.")}val config = Properties()FileInputStream(configFile).use { inputStream -> config.load(inputStream) }return config}override fun close() {producer?.close()}
}fun main() {val cli =KafkaClient<String, String>("D:\\src\\main\\java\\com\\tr\\robot\\io\\kafka\\client.properties")cli.consume {println("test beg")for (record in it) {println(String.format("Consumed message from topic %s: key = %s value = %s", cli.TOPIC, record.key(), record.value()))}println("test end")}// Give some time for the consumer to startThread.sleep(2000)cli.produce("key1", "test")// Give some time for the consumer to consume the messageThread.sleep(5000)
}

相关文章:

java(kotlin)和 python 通过DoubleCloud的kafka进行线程间通信

进入 DoubleCloud https://www.double.cloud 创建一个kafka 1 选择语言 2 运行curl 的url命令启动一个topic 3 生成对应语言的token 4 复制3中的配置文件到本地&#xff0c;命名为client.properties 5 复制客户端代码 对python和java客户端代码进行了重写&#xff0c;java改成…...

vivado DIAGRAM、HW_AXI

图表 描述 块设计&#xff08;.bd&#xff09;是在IP中创建的互连IP核的复杂系统 Vivado设计套件的集成商。Vivado IP集成器可让您创建复杂的 通过实例化和互连Vivado IP目录中的IP进行系统设计。一块 设计是一种分层设计&#xff0c;可以写入磁盘上的文件&#xff08;.bd&…...

学习分享-为什么把后台的用户验证和认证逻辑放到网关

将后台的用户验证和认证逻辑放到网关&#xff08;API Gateway&#xff09;中是一种常见的设计模式&#xff0c;这种做法在微服务架构和现代应用中有许多优势和理由&#xff1a; 1. 集中管理认证和授权 统一的安全策略 在一个包含多个微服务的系统中&#xff0c;如果每个服务…...

27 ssh+scp+nfs+yum进阶

ssh远程管理 ssh是一种安全通道协议&#xff0c;用来实现字符界面的远程登录。远程复制&#xff0c;远程文本传输。 ssh对通信双方的数据进行了加密。 用户名和密码登录 密钥对认证方式&#xff08;可以实现免密登录&#xff09; ssh 22 网络层 传输层 数据传输的过程中是…...

LabVIEW液压伺服压力机控制系统与控制频率选择

液压伺服压力机的控制频率是一个重要的参数&#xff0c;它直接影响系统的响应速度、稳定性和控制精度。具体选择的控制频率取决于多种因素&#xff0c;包括系统的动态特性、控制目标、硬件性能以及应用场景。以下是一些常见的指导原则和考量因素&#xff1a; 常见的控制频率范…...

阿里云(域名解析) certbot 证书配置

1、安装 certbot ubuntu 系统&#xff1a; sudo apt install certbot 2、申请certbot 域名证书&#xff0c;如申请二级域名aa.example.com 的ssl证书&#xff0c;同时需要让 bb.aa.example.com 也可以使用此证书 1、命令&#xff1a;sudo certbot certonly -d “域名” -d “…...

Web LLM 攻击技术

概述 在ChatGPT问世以来&#xff0c;我也尝试挖掘过ChatGPT的漏洞&#xff0c;不过仅仅发现过一些小问题&#xff1a;无法显示xml的bug和错误信息泄露&#xff0c;虽然也挖到过一些开源LLM的漏洞&#xff0c;比如前段时间发现的Jan的漏洞&#xff0c;但是不得不说传统漏洞越来…...

Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier)

Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier) Async如何使用 使用Async标注在方法上&#xff0c;可以使该方法异步的调用执行。而所有异步方法的实际执行是交给TaskExecutor的。 1.启动类添加EnableAsync注解 2. 方法上添加A…...

秒杀优化+秒杀安全

1.Redis预减库存 1.OrderServiceImpl.java 问题分析 2.具体实现 SeckillController.java 1.实现InitializingBean接口的afterPropertiesSet方法&#xff0c;在bean初始化之后将库存信息加载到Redis /*** 系统初始化&#xff0c;将秒杀商品库存加载到redis中** throws Excepti…...

48、Flink 的 Data Source API 详解

a&#xff09;概述 本节将描述 FLIP-27 中引入的新 Source API 的主要接口。 b&#xff09;Source Source API 是一个工厂模式的接口&#xff0c;用于创建以下组件。 Split EnumeratorSource ReaderSplit SerializerEnumerator Checkpoint Serializer 此外&#xff0c;Sou…...

深入解析Java扩展机制:SPI与Spring.factories

目录 Java SPI概述 1.1 什么是SPI&#xff1f;1.2 SPI的工作原理1.3 SPI的优缺点 SPI的应用 2.1 Java标准库中的SPI应用2.2 自定义SPI示例 Spring.factories概述 3.1 什么是spring.factories&#xff1f;3.2 spring.factories的工作原理3.3 spring.factories的优缺点 spring.f…...

Vue2之模板语法

文章目录 1.模板语法1.1 插值语法{{}}可以写什么1.2 指令语法1.2.1 指令概述1.2.2 v-bind指令1.2.3 v-model指令 1.模板语法 1.1 插值语法{{}}可以写什么 &#xff08;1&#xff09;在data中声明的 &#xff08;2&#xff09;常量 &#xff08;3&#xff09;合法的JavaScript…...

java基础练习题

1、一个".java"源文件中是否可以包括多个类&#xff1f;有什么限制&#xff1f; 可以包含多个类。但是只有一个类可以声明为public&#xff0c;且要求声明为public的类的类名与源文件名相同。 2、java的优势&#xff1f; a、跨平台性 b、安全性高 c、简单性 d、…...

unity中通过实现底层接口实现非按钮(图片)的事件监听

编写监听脚本 PEListenter 继承自MonoBehaviour类&#xff0c;并实现了IPointerDownHandler、IPointerUpHandler和IDragHandler接口&#xff0c;按照需求定义需要接收事件&#xff08;鼠标按下、抬起、拖拽&#xff09;的回调函数 //监听类&#xff08;需要挂载在物体上面&am…...

重庆耶非凡科技有限公司的选品师项目加盟靠谱吗?

在当今电子商务的浪潮中&#xff0c;选品师的角色愈发重要。而重庆耶非凡科技有限公司以其独特的选品师项目&#xff0c;在行业内引起了广泛关注。对于想要加盟该项目的人来说&#xff0c;项目的靠谱性无疑是首要考虑的问题。 首先&#xff0c;我们来看看耶非凡科技有限公司的背…...

《青少年编程与数学》课程方案:4、课程策略

《青少年编程与数学》课程方案&#xff1a;4、课程策略 一、工程师思维二、使命感驱动三、价值观引领四、学习现代化五、工作生活化六、与时代共进 《青少年编程与数学》课程策略强调采用工程师思维&#xff0c;避免重复造轮子&#xff0c;培养使命感&#xff0c;通过探索兴趣、…...

用爬虫实现---模拟填志愿

先来说实现逻辑&#xff0c;首先我要获取到这个网站上所有的信息&#xff0c;那么我们就可以开始对元素进行检查 我们发现他的每一个学校信息都有一个对应的属性&#xff0c;并且是相同的&#xff0c;那么我们就可以遍历这个网页中的所有属性一样的开始爬取 在来分析&#xff0…...

vscode Run Code输出出现中文乱码情况问题解决方案

主要解决方案是通过修改计算机默认的编码格式,来完成的。 chcp 是 Windows 操作系统中的一个命令,用于显示或设置控制台的代码页(code page)。代码页决定了控制台如何解释和显示字符,特别是非 ASCII 字符(例如 Unicode 字符)。 使用方法 显示当前代码页: 输入 chcp 而…...

代码随想录训练营Day30

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、重新安排行程 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 今天是跟着代码随想录刷题的第30天&#xff0c;主要是复习了回溯算法…...

Swift 序列(Sequence)排序面面俱到 - 从过去到现在(二)

概览 在上篇 Swift 序列(Sequence)排序面面俱到 - 从过去到现在(一)博文中,我们讨论了 Swift 语言中序列和集合元素排序的一些基本知识,我们还给出了以自定义类型中任意属性排序的“康庄大道”。 不过在实际的撸码场景中,我们往往需要的是“多属性”同时参与到排序的考…...

别再乱配了!SpringBoot配置文件加载顺序的实战避坑指南(附优先级图解)

SpringBoot配置加载优先级深度解析&#xff1a;从原理到实战避坑 SpringBoot的配置加载机制看似简单&#xff0c;实则暗藏玄机。许多开发者在使用过程中都曾遇到过配置不生效、覆盖关系混乱的问题&#xff0c;尤其是在多环境部署或使用外部配置时。本文将深入剖析SpringBoot配置…...

程序员裸辞转行网络安全,我只用了 90 天

程序员如何90天成功转行黑客&#xff08;网络安全&#xff09;&#xff1f; 有人说&#xff1a;”黑客到底比程序员高在哪&#xff0c;为什么很多人开始转行了“其实黑客都是程序员&#xff0c;但是并不是所有的程序员都是黑客. 从企业和社会需求来看&#xff0c;现在真不缺程…...

Win10/Win11网络适配器‘罢工’终极排查指南:从驱动、服务到协议栈的完整修复流程

Win10/Win11网络适配器深度修复指南&#xff1a;从驱动到协议栈的全面诊断 当你的Windows设备突然无法联网&#xff0c;只剩下孤零零的飞行模式图标时&#xff0c;那种焦虑感每个IT从业者都深有体会。上周我的主力开发机就遭遇了这样的"罢工"事件——所有网络连接突然…...

工业网关、电机控制、人机界面:ATSAME70Q21B-AN的应用版图

ATSAME70Q21B-AN&#xff1a;300MHz Cortex-M7工业MCU的嵌入式应用解析在工业控制、人机界面和物联网网关等领域&#xff0c;微控制器需要在处理性能、外设集成度和环境适应性之间取得平衡。ATSAME70Q21B-AN是Microchip推出的基于ARM Cortex-M7内核的高性能32位微控制器&#x…...

2026遥感、地球科学与人工智能国际学术会议(RSGAI 2026)

随着人工智能&#xff08;AI&#xff09;技术的迅猛发展&#xff0c;特别是机器学习和深度学习在数据处理与复杂模式识别中的卓越能力&#xff0c;地球科学研究与遥感观测技术正迎来革命性的变革。将人工智能与遥感对地观测、地球信息科学、以及资源环境监测等领域的理论研究和…...

当大模型认不出一个具体名字:MiniMax 回答失灵,问题未必只在模型本身

当大模型认不出一个具体名字&#xff1a;MiniMax 回答失灵&#xff0c;问题未必只在模型本身 围绕“为什么 MiniMax 大模型无法识别马嘉祺是谁”的一次能力拆解&#xff1a;真正暴露的&#xff0c;往往是知识覆盖、检索策略与风控边界的耦合问题 直接回答 先给结论。 如果 Mi…...

免费LLM API集成实战:从选型到构建高可用AI服务

1. 项目概述&#xff1a;一个汇聚免费LLM API的宝藏仓库如果你正在开发一个需要AI对话、文本生成或代码补全功能的应用&#xff0c;但又被高昂的API调用费用或复杂的申请流程劝退&#xff0c;那么你很可能需要这个项目。Clovenhoofed-loadingarea139/awesome-free-llm-apis是一…...

网络优化工具开发全解析:从协议选型到多平台实现与运维

1. 网络连接优化工具的技术原理与实现思路在当今的互联网环境下&#xff0c;许多用户会遇到访问特定在线服务或资源时速度缓慢、连接不稳定甚至无法访问的情况。这背后通常涉及到复杂的网络路由、地域性内容分发策略以及网络基础设施的差异。为了解决这类问题&#xff0c;一些开…...

JACC Cardiovasc Imaging(IF=15.2)中国医学科学院阜外医院放射科赵世华教授等团队:连续心肌纤维化评估预测肥厚型心肌病患者预后

01文献学习今天分享的文献是由中国医学科学院阜外医院放射科赵世华教授等团队于2026年2月在《JACC: Cardiovascular Imaging》&#xff08;中科院1区top&#xff0c;IF15.2&#xff09;上发表的研究“Serial Myocardial Fibrosis Assessments Predict Outcomes in Patients Wit…...

CANN/asc-devkit int8转int16 API

asc_int82int16 【免费下载链接】asc-devkit 本项目是CANN 推出的昇腾AI处理器专用的算子程序开发语言&#xff0c;原生支持C和C标准规范&#xff0c;主要由类库和语言扩展层构成&#xff0c;提供多层级API&#xff0c;满足多维场景算子开发诉求。 项目地址: https://gitcode…...