当前位置: 首页 > news >正文

【Kafka】Java整合Kafka

1.引入依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency>

2.搭建生产者

package com.wen.kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class MyProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//配置信息Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.117.80:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//创建生产者Producer<String,String> producer = new KafkaProducer<String, String>(prop);//创建消息ProducerRecord<String,String> record = new ProducerRecord<>("test", "hello kafka-client");//同步发送消息
//        RecordMetadata metadata = producer.send(record).get();
//        System.out.println("同步消息——topic:"+metadata.topic()+"partition"+metadata.partition()+"offset"+metadata.offset());//异步发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {System.out.println(e.getMessage());}if (recordMetadata != null) {System.out.println("异步消息——topic:"+recordMetadata.topic()+"partition"+recordMetadata.partition()+"offset"+recordMetadata.offset());}}});Thread.sleep(1000);}
}

3.搭建消费者

package com.wen.kafka;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {//参数信息Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.117.80:9092");prop.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//创建消费者Consumer<String,String> consumer = new KafkaConsumer<String, String>(prop);//订阅主题consumer.subscribe(Arrays.asList("test"));//拉取消息while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}}}
}

相关文章:

【Kafka】Java整合Kafka

1.引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency> 2.搭建生产者 package com.wen.kafka;import org.apache.kafka.clients.produ…...

所里网连不上,我服了

所里网连不上&#xff0c;我服了所里网连不上&#xff0c;我服了所里网连不上&#xff0c;我服了...

Yakit工具篇:WebFuzzer模块之热加载技术

简介 官方定义&#xff1a; 什么是热加载&#xff1f; 广义上来说&#xff0c;热加载是一种允许在不停止或重启应用程序的情况下&#xff0c;动态加载或更新特定组件或模块的功能。这种技术常用于开发过程中&#xff0c;提高开发效率和用户体验。 在Yakit 的Web Fuzzer中&…...

Linux基本指令(前篇)

目录 1.ls指令 2.pwd指令 3.cd 指令 4.touch指令 5.mkdir指令&#xff08;重要&#xff09; 6.rmdir指令 && rm 指令&#xff08;重要&#xff09; 7.man指令&#xff08;重要&#xff09; 1.ls指令 ls 选项 目录或文件 对于目录&#xff0c;该命令列出该目录下的所…...

[网鼎杯 2020 青龙组]singal

一道VM题目 可以看到长度是15 跟踪调用read函数的函数 分析一下switch中每个指令的含义、 在scanf下面打断点 在关键跳转处下断点 打开Ponce插件 GitHub - illera88/Ponce: IDA 2016 plugin contest winner! Symbolic Execution just one-click away! 然后开始动调 输入15个…...

Qt/QML编程学习之心得:一个QML工程的学习笔记(十)

前言: 到底什么是Qt Quick呢?因为Qt Quick是Qt新引入的,Qt Quick由Qt Quick模块提供,它是一个编写QML应用的标准库。Qt Quick模块提供了两种接口:使用QML语言创建应用的QML接口和使用C++语言扩展QML的C++接口。使用Qt Quick模块,设计人员和开发人员可以轻松地构建流畅的…...

LeetCode OJ循环队列(C语言)

1.题目的初步分析 我们分析上述题目的时候会发现题目非常的长&#xff0c;不好整理思路&#xff0c;我这里可以大致的将本题的几个核心点说出来&#xff1a; 1.队列的思路 循环队列说来说去不还是队列嘛&#xff0c;那么队列的基本操作增删查改、以及队列的基本结构肯定都是不能…...

打码平台之图鉴的使用步骤

打码平台之图鉴 背景&#xff1a; ​ 今天给大家推荐一个我一直使用的验证码识别平台&#xff0c;图鉴&#xff0c;我没有收费&#xff0c;我只是觉得这个网站使用方便&#xff0c;支持验证码种类多&#xff0c;好了&#xff0c;话不多说&#xff0c;上教程&#xff01; 注册…...

站群服务器与普通服务器有哪些区别?

站群服务器"通常指一组被单个实体或组织控制的网络站点&#xff0c;用于提高特定站点在搜索引擎中的排名。在讨论站群服务器与普通服务器的区别时&#xff0c;可能涉及到以下方面&#xff1a; 1. IP地址&#xff1a; 站群服务器&#xff1a; 站群服务器可能涉及多个站点&a…...

Java-使用poi-tl根据word模板动态生成word

作者wangsz&#xff0c;想写一些关于word的工具&#xff0c;所以就写了这篇文章 1.首先&#xff0c;先导入所需要的依赖&#xff08;poi相关依赖即可&#xff09; <!-- POI --><dependency><groupId>org.apache.poi</groupId><artifactId>poi&l…...

计算机毕业设计 基于SpringBoot的物业管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…...

UI for Apache Kafka

文章Overview of UI Tools for Monitoring and Management of Apache Kafka Clusters | by German Osin | Towards Data Science中介绍了8种常见的kafka UI工具,这些产品的核心功能对比信息如下图所示, 通过对比发现 UI for Apache Kafka 功能齐全且免费,因此可以作为我们的首…...

iview table 默认排序字段不高亮解决办法

iview treeSelect 组件封装 1、表格增加排序时触发的方法2、定义三个变量&#xff0c;sortColumnDefaultStyle存放默认的样式&#xff0c;定义页面默认的列以及顺序3、显示的列加上 sortable, 和样式4、使用下面这块代表默认选中5、点击时清除掉默认的排序6、把排序的字段查询时…...

系列七、ThreadLocal为什么会导致内存泄漏

一、ThreadLocal为什么会导致内存泄露 1.1、ThreadLocalMap的基本结构 ThreadLocalMap是ThreadLocal的内部类&#xff0c;没有实现Map接口&#xff0c;用独立的方式实现了Map的功能&#xff0c;其内部的Entry也是独立实现的。源码如下&#xff1a; 1.2、ThreadLocal引用示意图…...

如何避免死锁

程序员的公众号&#xff1a;源1024&#xff0c;获取更多资料&#xff0c;无加密无套路&#xff01; 最近整理了一波电子书籍资料&#xff0c;包含《Effective Java中文版 第2版》《深入JAVA虚拟机》&#xff0c;《重构改善既有代码设计》&#xff0c;《MySQL高性能-第3版》&…...

HTML新特性【缩放图像、图像切片、平移、旋转、缩放、变形、裁切路径、时钟、运动的小球】(二)-全面详解(学习总结---从入门到深化)

目录 绘制图像_缩放图像 绘制图像_图像切片 Canvas状态的保存和恢复 图形变形_平移 图形变形_旋转 图形变形_缩放 图形变形_变形 裁切路径 动画_时钟 动画_运动的小球 引入外部SVG 绘制图像_缩放图像 ctx.drawImage(img, x, y, width, height) img &#xf…...

C/C++ 开发SCM服务管理组件

SCM&#xff08;Service Control Manager&#xff09;服务管理器是 Windows 操作系统中的一个关键组件&#xff0c;负责管理系统服务的启动、停止和配置。服务是一种在后台运行的应用程序&#xff0c;可以在系统启动时自动启动&#xff0c;也可以由用户或其他应用程序手动启动。…...

springboot程序启动成功后执行的方法

//实现该接口&#xff0c;run方法既程序启动成功后将要执行的方法 // // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) //package org.springframework.boot;FunctionalInterface public interface CommandLineRunner {…...

STM32 寄存器配置笔记——USART配置 打印

一、概述 本文主要介绍如何配置USART&#xff0c;并通过USART打印验证结果。以stm32f10为例&#xff0c;将PA9、PA10复用为USART功能&#xff0c;使用HSE PLL输出72MHZ时钟 APB2 clk不分频提供配置9600波特率。波特率计算公式如下&#xff1a; fck即为APB2 clk参考计算&#xf…...

Spine深入学习 —— 数据

atlas数据的处理 作用 图集&#xff0c;描述了spine使用的图片信息。 结构 page 页块 页块包含了页图像名称, 以及加载和渲染图像的相关信息。 page1.pngsize: 640, 480format: RGBA8888filter: Linear, Linearrepeat: nonepma: truename: 首行为该页中的图像名称. 图片位…...

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…...

【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15

缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下&#xff1a; struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...

在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module

1、为什么要修改 CONNECT 报文&#xff1f; 多租户隔离&#xff1a;自动为接入设备追加租户前缀&#xff0c;后端按 ClientID 拆分队列。零代码鉴权&#xff1a;将入站用户名替换为 OAuth Access-Token&#xff0c;后端 Broker 统一校验。灰度发布&#xff1a;根据 IP/地理位写…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验&#xff0c;以及大语言模型的分析能力&#xff0c;我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际&#xff0c;我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测&#xff0c;聊作存档。等到明…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

AspectJ 在 Android 中的完整使用指南

一、环境配置&#xff08;Gradle 7.0 适配&#xff09; 1. 项目级 build.gradle // 注意&#xff1a;沪江插件已停更&#xff0c;推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

ip子接口配置及删除

配置永久生效的子接口&#xff0c;2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...