当前位置: 首页 > news >正文

java(kotlin)和 python 通过DoubleCloud的kafka进行线程间通信

进入

DoubleCloud
https://www.double.cloud
创建一个kafka
1 选择语言
2 运行curl 的url命令启动一个topic
3 生成对应语言的token
4 复制3中的配置文件到本地,命名为client.properties
5 复制客户端代码
对python和java客户端代码进行了重写,java改成了kotlin:

配置文件

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=
sasl.password=
group.id=
auto.offset.reset=earliest
# Best practice for higher availability in librdkafka clients prior to 1.7
session.timeout.ms=45000
import timefrom confluent_kafka import Producer, Consumer
import asyncio
import threadingclass KafkaClient:def __init__(self, config_file):self.config = self.read_config(config_file)def read_config(self, config_file):config = {}with open(config_file) as fh:for line in fh:line = line.strip()if len(line) != 0 and line[0] != "#":parameter, value = line.strip().split('=', 1)config[parameter] = value.strip()return configdef produce(self, topic, key, value):# Creates a new producer instanceproducer = Producer(self.config)# Produces a sample messageproducer.produce(topic, key=key, value=value)print(f"Produced message to topic {topic}: key = {key:12} value = {value:12}")# Send any outstanding or buffered messages to the Kafka brokerproducer.flush()def consume_async(self, topic, callback=None, group_id="python-group-1", auto_offset_reset="earliest"):# Sets the consumer group ID and offsetself.config["group.id"] = group_idself.config["auto.offset.reset"] = auto_offset_resetconsumer = Consumer(self.config)consumer.subscribe([topic])loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)if callback is not None:loop.run_until_complete(callback(consumer))def consume(self, topic, callback=None):thread = threading.Thread(target=self.consume_async, args=(topic, callback,))thread.start()return threadasync def consume_async(consumer):try:while True:msg = consumer.poll(1.0)if msg is not None:breakif not msg.error():key = msg.key().decode("utf-8")value = msg.value().decode("utf-8")print(f"Consumed message: key = {key:12} value = {value:12}")except KeyboardInterrupt:passfinally:consumer.close()config_file_path = ".\\client.properties"
topic = "test"
key = "key"
value = "value"kafka_client = KafkaClient(config_file_path)
kafka_client.produce(topic, key, value)
thread = kafka_client.consume(topic, consume_async)

配置文件

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='GHFXZDIOMQW3IPKA' password='TimUk7hj/EwTiB031lA95LeKfXN3t2Ddnw+izhKx3+7wFxZKMLGEqTOnneTKrlQQ';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000
topic=
group.id=
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
# Best practice for Kafka producer to prevent data loss
acks=all

java(kotiln)


import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.io.Closeable
import java.io.FileInputStream
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Duration
import java.util.*class KafkaClient<T, V> : Closeable {private var producer: KafkaProducer<T, V>? = nullprivate var fileConfig: Properties? = nullval TOPIC = "topic"private val DURATION = 100Lprivate val POOLSIZE = 10private val DISPATCHER = newFixedThreadPoolContext(POOLSIZE, "CoroutinePool")private val SCOPE = CoroutineScope(DISPATCHER)constructor(configPath: String? = null, config: Properties? = null) {if (config == null && configPath == null) throw Exception("don't have any config")var config1 = Properties()if (configPath != null) {fileConfig = readConfig(configPath)fileConfig?.let { config1.putAll(it) }}if (config != null) {config1.putAll(config)}producer = KafkaProducer(config1)}fun produce(key: T, value: V, topic: String? = null) {producer?.send(ProducerRecord(topic ?: (fileConfig?.getProperty(TOPIC) as String), key, value))}fun consume(func: suspend (ConsumerRecords<T, V>) -> Unit) {val consumer: KafkaConsumer<T, V> = KafkaConsumer(fileConfig)consumer.subscribe(Arrays.asList(fileConfig?.getProperty(TOPIC)))SCOPE.launch {while (true) {val records: ConsumerRecords<T, V> = consumer.poll(Duration.ofMillis(DURATION))func(records)delay(DURATION)}}}@Throws(IOException::class)fun readConfig(configFile: String): Properties {if (!Files.exists(Paths.get(configFile))) {throw IOException("$configFile not found.")}val config = Properties()FileInputStream(configFile).use { inputStream -> config.load(inputStream) }return config}override fun close() {producer?.close()}
}fun main() {val cli =KafkaClient<String, String>("D:\\src\\main\\java\\com\\tr\\robot\\io\\kafka\\client.properties")cli.consume {println("test beg")for (record in it) {println(String.format("Consumed message from topic %s: key = %s value = %s", cli.TOPIC, record.key(), record.value()))}println("test end")}// Give some time for the consumer to startThread.sleep(2000)cli.produce("key1", "test")// Give some time for the consumer to consume the messageThread.sleep(5000)
}

相关文章:

java(kotlin)和 python 通过DoubleCloud的kafka进行线程间通信

进入 DoubleCloud https://www.double.cloud 创建一个kafka 1 选择语言 2 运行curl 的url命令启动一个topic 3 生成对应语言的token 4 复制3中的配置文件到本地&#xff0c;命名为client.properties 5 复制客户端代码 对python和java客户端代码进行了重写&#xff0c;java改成…...

vivado DIAGRAM、HW_AXI

图表 描述 块设计&#xff08;.bd&#xff09;是在IP中创建的互连IP核的复杂系统 Vivado设计套件的集成商。Vivado IP集成器可让您创建复杂的 通过实例化和互连Vivado IP目录中的IP进行系统设计。一块 设计是一种分层设计&#xff0c;可以写入磁盘上的文件&#xff08;.bd&…...

学习分享-为什么把后台的用户验证和认证逻辑放到网关

将后台的用户验证和认证逻辑放到网关&#xff08;API Gateway&#xff09;中是一种常见的设计模式&#xff0c;这种做法在微服务架构和现代应用中有许多优势和理由&#xff1a; 1. 集中管理认证和授权 统一的安全策略 在一个包含多个微服务的系统中&#xff0c;如果每个服务…...

27 ssh+scp+nfs+yum进阶

ssh远程管理 ssh是一种安全通道协议&#xff0c;用来实现字符界面的远程登录。远程复制&#xff0c;远程文本传输。 ssh对通信双方的数据进行了加密。 用户名和密码登录 密钥对认证方式&#xff08;可以实现免密登录&#xff09; ssh 22 网络层 传输层 数据传输的过程中是…...

LabVIEW液压伺服压力机控制系统与控制频率选择

液压伺服压力机的控制频率是一个重要的参数&#xff0c;它直接影响系统的响应速度、稳定性和控制精度。具体选择的控制频率取决于多种因素&#xff0c;包括系统的动态特性、控制目标、硬件性能以及应用场景。以下是一些常见的指导原则和考量因素&#xff1a; 常见的控制频率范…...

阿里云(域名解析) certbot 证书配置

1、安装 certbot ubuntu 系统&#xff1a; sudo apt install certbot 2、申请certbot 域名证书&#xff0c;如申请二级域名aa.example.com 的ssl证书&#xff0c;同时需要让 bb.aa.example.com 也可以使用此证书 1、命令&#xff1a;sudo certbot certonly -d “域名” -d “…...

Web LLM 攻击技术

概述 在ChatGPT问世以来&#xff0c;我也尝试挖掘过ChatGPT的漏洞&#xff0c;不过仅仅发现过一些小问题&#xff1a;无法显示xml的bug和错误信息泄露&#xff0c;虽然也挖到过一些开源LLM的漏洞&#xff0c;比如前段时间发现的Jan的漏洞&#xff0c;但是不得不说传统漏洞越来…...

Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier)

Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier) Async如何使用 使用Async标注在方法上&#xff0c;可以使该方法异步的调用执行。而所有异步方法的实际执行是交给TaskExecutor的。 1.启动类添加EnableAsync注解 2. 方法上添加A…...

秒杀优化+秒杀安全

1.Redis预减库存 1.OrderServiceImpl.java 问题分析 2.具体实现 SeckillController.java 1.实现InitializingBean接口的afterPropertiesSet方法&#xff0c;在bean初始化之后将库存信息加载到Redis /*** 系统初始化&#xff0c;将秒杀商品库存加载到redis中** throws Excepti…...

48、Flink 的 Data Source API 详解

a&#xff09;概述 本节将描述 FLIP-27 中引入的新 Source API 的主要接口。 b&#xff09;Source Source API 是一个工厂模式的接口&#xff0c;用于创建以下组件。 Split EnumeratorSource ReaderSplit SerializerEnumerator Checkpoint Serializer 此外&#xff0c;Sou…...

深入解析Java扩展机制:SPI与Spring.factories

目录 Java SPI概述 1.1 什么是SPI&#xff1f;1.2 SPI的工作原理1.3 SPI的优缺点 SPI的应用 2.1 Java标准库中的SPI应用2.2 自定义SPI示例 Spring.factories概述 3.1 什么是spring.factories&#xff1f;3.2 spring.factories的工作原理3.3 spring.factories的优缺点 spring.f…...

Vue2之模板语法

文章目录 1.模板语法1.1 插值语法{{}}可以写什么1.2 指令语法1.2.1 指令概述1.2.2 v-bind指令1.2.3 v-model指令 1.模板语法 1.1 插值语法{{}}可以写什么 &#xff08;1&#xff09;在data中声明的 &#xff08;2&#xff09;常量 &#xff08;3&#xff09;合法的JavaScript…...

java基础练习题

1、一个".java"源文件中是否可以包括多个类&#xff1f;有什么限制&#xff1f; 可以包含多个类。但是只有一个类可以声明为public&#xff0c;且要求声明为public的类的类名与源文件名相同。 2、java的优势&#xff1f; a、跨平台性 b、安全性高 c、简单性 d、…...

unity中通过实现底层接口实现非按钮(图片)的事件监听

编写监听脚本 PEListenter 继承自MonoBehaviour类&#xff0c;并实现了IPointerDownHandler、IPointerUpHandler和IDragHandler接口&#xff0c;按照需求定义需要接收事件&#xff08;鼠标按下、抬起、拖拽&#xff09;的回调函数 //监听类&#xff08;需要挂载在物体上面&am…...

重庆耶非凡科技有限公司的选品师项目加盟靠谱吗?

在当今电子商务的浪潮中&#xff0c;选品师的角色愈发重要。而重庆耶非凡科技有限公司以其独特的选品师项目&#xff0c;在行业内引起了广泛关注。对于想要加盟该项目的人来说&#xff0c;项目的靠谱性无疑是首要考虑的问题。 首先&#xff0c;我们来看看耶非凡科技有限公司的背…...

《青少年编程与数学》课程方案:4、课程策略

《青少年编程与数学》课程方案&#xff1a;4、课程策略 一、工程师思维二、使命感驱动三、价值观引领四、学习现代化五、工作生活化六、与时代共进 《青少年编程与数学》课程策略强调采用工程师思维&#xff0c;避免重复造轮子&#xff0c;培养使命感&#xff0c;通过探索兴趣、…...

用爬虫实现---模拟填志愿

先来说实现逻辑&#xff0c;首先我要获取到这个网站上所有的信息&#xff0c;那么我们就可以开始对元素进行检查 我们发现他的每一个学校信息都有一个对应的属性&#xff0c;并且是相同的&#xff0c;那么我们就可以遍历这个网页中的所有属性一样的开始爬取 在来分析&#xff0…...

vscode Run Code输出出现中文乱码情况问题解决方案

主要解决方案是通过修改计算机默认的编码格式,来完成的。 chcp 是 Windows 操作系统中的一个命令,用于显示或设置控制台的代码页(code page)。代码页决定了控制台如何解释和显示字符,特别是非 ASCII 字符(例如 Unicode 字符)。 使用方法 显示当前代码页: 输入 chcp 而…...

代码随想录训练营Day30

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、重新安排行程 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 今天是跟着代码随想录刷题的第30天&#xff0c;主要是复习了回溯算法…...

Swift 序列(Sequence)排序面面俱到 - 从过去到现在(二)

概览 在上篇 Swift 序列(Sequence)排序面面俱到 - 从过去到现在(一)博文中,我们讨论了 Swift 语言中序列和集合元素排序的一些基本知识,我们还给出了以自定义类型中任意属性排序的“康庄大道”。 不过在实际的撸码场景中,我们往往需要的是“多属性”同时参与到排序的考…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

现代密码学 | 椭圆曲线密码学—附py代码

Elliptic Curve Cryptography 椭圆曲线密码学&#xff08;ECC&#xff09;是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础&#xff0c;例如椭圆曲线数字签…...

uniapp中使用aixos 报错

问题&#xff1a; 在uniapp中使用aixos&#xff0c;运行后报如下错误&#xff1a; AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...

算法笔记2

1.字符串拼接最好用StringBuilder&#xff0c;不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...

[ACTF2020 新生赛]Include 1(php://filter伪协议)

题目 做法 启动靶机&#xff0c;点进去 点进去 查看URL&#xff0c;有 ?fileflag.php说明存在文件包含&#xff0c;原理是php://filter 协议 当它与包含函数结合时&#xff0c;php://filter流会被当作php文件执行。 用php://filter加编码&#xff0c;能让PHP把文件内容…...

spring Security对RBAC及其ABAC的支持使用

RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型&#xff0c;它将权限分配给角色&#xff0c;再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...