当前位置: 首页 > news >正文

【Kafka】Java整合Kafka

1.引入依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency>

2.搭建生产者

package com.wen.kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class MyProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//配置信息Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.117.80:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//创建生产者Producer<String,String> producer = new KafkaProducer<String, String>(prop);//创建消息ProducerRecord<String,String> record = new ProducerRecord<>("test", "hello kafka-client");//同步发送消息
//        RecordMetadata metadata = producer.send(record).get();
//        System.out.println("同步消息——topic:"+metadata.topic()+"partition"+metadata.partition()+"offset"+metadata.offset());//异步发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {System.out.println(e.getMessage());}if (recordMetadata != null) {System.out.println("异步消息——topic:"+recordMetadata.topic()+"partition"+recordMetadata.partition()+"offset"+recordMetadata.offset());}}});Thread.sleep(1000);}
}

3.搭建消费者

package com.wen.kafka;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {//参数信息Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.117.80:9092");prop.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//创建消费者Consumer<String,String> consumer = new KafkaConsumer<String, String>(prop);//订阅主题consumer.subscribe(Arrays.asList("test"));//拉取消息while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}}}
}

相关文章:

【Kafka】Java整合Kafka

1.引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency> 2.搭建生产者 package com.wen.kafka;import org.apache.kafka.clients.produ…...

所里网连不上,我服了

所里网连不上&#xff0c;我服了所里网连不上&#xff0c;我服了所里网连不上&#xff0c;我服了...

Yakit工具篇:WebFuzzer模块之热加载技术

简介 官方定义&#xff1a; 什么是热加载&#xff1f; 广义上来说&#xff0c;热加载是一种允许在不停止或重启应用程序的情况下&#xff0c;动态加载或更新特定组件或模块的功能。这种技术常用于开发过程中&#xff0c;提高开发效率和用户体验。 在Yakit 的Web Fuzzer中&…...

Linux基本指令(前篇)

目录 1.ls指令 2.pwd指令 3.cd 指令 4.touch指令 5.mkdir指令&#xff08;重要&#xff09; 6.rmdir指令 && rm 指令&#xff08;重要&#xff09; 7.man指令&#xff08;重要&#xff09; 1.ls指令 ls 选项 目录或文件 对于目录&#xff0c;该命令列出该目录下的所…...

[网鼎杯 2020 青龙组]singal

一道VM题目 可以看到长度是15 跟踪调用read函数的函数 分析一下switch中每个指令的含义、 在scanf下面打断点 在关键跳转处下断点 打开Ponce插件 GitHub - illera88/Ponce: IDA 2016 plugin contest winner! Symbolic Execution just one-click away! 然后开始动调 输入15个…...

Qt/QML编程学习之心得:一个QML工程的学习笔记(十)

前言: 到底什么是Qt Quick呢?因为Qt Quick是Qt新引入的,Qt Quick由Qt Quick模块提供,它是一个编写QML应用的标准库。Qt Quick模块提供了两种接口:使用QML语言创建应用的QML接口和使用C++语言扩展QML的C++接口。使用Qt Quick模块,设计人员和开发人员可以轻松地构建流畅的…...

LeetCode OJ循环队列(C语言)

1.题目的初步分析 我们分析上述题目的时候会发现题目非常的长&#xff0c;不好整理思路&#xff0c;我这里可以大致的将本题的几个核心点说出来&#xff1a; 1.队列的思路 循环队列说来说去不还是队列嘛&#xff0c;那么队列的基本操作增删查改、以及队列的基本结构肯定都是不能…...

打码平台之图鉴的使用步骤

打码平台之图鉴 背景&#xff1a; ​ 今天给大家推荐一个我一直使用的验证码识别平台&#xff0c;图鉴&#xff0c;我没有收费&#xff0c;我只是觉得这个网站使用方便&#xff0c;支持验证码种类多&#xff0c;好了&#xff0c;话不多说&#xff0c;上教程&#xff01; 注册…...

站群服务器与普通服务器有哪些区别?

站群服务器"通常指一组被单个实体或组织控制的网络站点&#xff0c;用于提高特定站点在搜索引擎中的排名。在讨论站群服务器与普通服务器的区别时&#xff0c;可能涉及到以下方面&#xff1a; 1. IP地址&#xff1a; 站群服务器&#xff1a; 站群服务器可能涉及多个站点&a…...

Java-使用poi-tl根据word模板动态生成word

作者wangsz&#xff0c;想写一些关于word的工具&#xff0c;所以就写了这篇文章 1.首先&#xff0c;先导入所需要的依赖&#xff08;poi相关依赖即可&#xff09; <!-- POI --><dependency><groupId>org.apache.poi</groupId><artifactId>poi&l…...

计算机毕业设计 基于SpringBoot的物业管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…...

UI for Apache Kafka

文章Overview of UI Tools for Monitoring and Management of Apache Kafka Clusters | by German Osin | Towards Data Science中介绍了8种常见的kafka UI工具,这些产品的核心功能对比信息如下图所示, 通过对比发现 UI for Apache Kafka 功能齐全且免费,因此可以作为我们的首…...

iview table 默认排序字段不高亮解决办法

iview treeSelect 组件封装 1、表格增加排序时触发的方法2、定义三个变量&#xff0c;sortColumnDefaultStyle存放默认的样式&#xff0c;定义页面默认的列以及顺序3、显示的列加上 sortable, 和样式4、使用下面这块代表默认选中5、点击时清除掉默认的排序6、把排序的字段查询时…...

系列七、ThreadLocal为什么会导致内存泄漏

一、ThreadLocal为什么会导致内存泄露 1.1、ThreadLocalMap的基本结构 ThreadLocalMap是ThreadLocal的内部类&#xff0c;没有实现Map接口&#xff0c;用独立的方式实现了Map的功能&#xff0c;其内部的Entry也是独立实现的。源码如下&#xff1a; 1.2、ThreadLocal引用示意图…...

如何避免死锁

程序员的公众号&#xff1a;源1024&#xff0c;获取更多资料&#xff0c;无加密无套路&#xff01; 最近整理了一波电子书籍资料&#xff0c;包含《Effective Java中文版 第2版》《深入JAVA虚拟机》&#xff0c;《重构改善既有代码设计》&#xff0c;《MySQL高性能-第3版》&…...

HTML新特性【缩放图像、图像切片、平移、旋转、缩放、变形、裁切路径、时钟、运动的小球】(二)-全面详解(学习总结---从入门到深化)

目录 绘制图像_缩放图像 绘制图像_图像切片 Canvas状态的保存和恢复 图形变形_平移 图形变形_旋转 图形变形_缩放 图形变形_变形 裁切路径 动画_时钟 动画_运动的小球 引入外部SVG 绘制图像_缩放图像 ctx.drawImage(img, x, y, width, height) img &#xf…...

C/C++ 开发SCM服务管理组件

SCM&#xff08;Service Control Manager&#xff09;服务管理器是 Windows 操作系统中的一个关键组件&#xff0c;负责管理系统服务的启动、停止和配置。服务是一种在后台运行的应用程序&#xff0c;可以在系统启动时自动启动&#xff0c;也可以由用户或其他应用程序手动启动。…...

springboot程序启动成功后执行的方法

//实现该接口&#xff0c;run方法既程序启动成功后将要执行的方法 // // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) //package org.springframework.boot;FunctionalInterface public interface CommandLineRunner {…...

STM32 寄存器配置笔记——USART配置 打印

一、概述 本文主要介绍如何配置USART&#xff0c;并通过USART打印验证结果。以stm32f10为例&#xff0c;将PA9、PA10复用为USART功能&#xff0c;使用HSE PLL输出72MHZ时钟 APB2 clk不分频提供配置9600波特率。波特率计算公式如下&#xff1a; fck即为APB2 clk参考计算&#xf…...

Spine深入学习 —— 数据

atlas数据的处理 作用 图集&#xff0c;描述了spine使用的图片信息。 结构 page 页块 页块包含了页图像名称, 以及加载和渲染图像的相关信息。 page1.pngsize: 640, 480format: RGBA8888filter: Linear, Linearrepeat: nonepma: truename: 首行为该页中的图像名称. 图片位…...

线程同步:确保多线程程序的安全与高效!

全文目录&#xff1a; 开篇语前序前言第一部分&#xff1a;线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分&#xff1a;synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分&#xff…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

剑指offer20_链表中环的入口节点

链表中环的入口节点 给定一个链表&#xff0c;若其中包含环&#xff0c;则输出环的入口节点。 若其中不包含环&#xff0c;则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

.Net Framework 4/C# 关键字(非常用,持续更新...)

一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南

1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;使用DevEco Studio作为开发工具&#xff0c;采用Java语言实现&#xff0c;包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!

简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求&#xff0c;并检查收到的响应。它以以下模式之一…...

【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论

路径问题的革命性重构&#xff1a;基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中&#xff08;图1&#xff09;&#xff1a; mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

Java数值运算常见陷阱与规避方法

整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...

MySQL:分区的基本使用

目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区&#xff08;Partitioning&#xff09;是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分&#xff08;分区&#xff09;可以独立存储、管理和优化&#xff0c;…...