【Kafka】Java整合Kafka
1.引入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency>
2.搭建生产者
package com.wen.kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class MyProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//配置信息Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.117.80:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//创建生产者Producer<String,String> producer = new KafkaProducer<String, String>(prop);//创建消息ProducerRecord<String,String> record = new ProducerRecord<>("test", "hello kafka-client");//同步发送消息
// RecordMetadata metadata = producer.send(record).get();
// System.out.println("同步消息——topic:"+metadata.topic()+"partition"+metadata.partition()+"offset"+metadata.offset());//异步发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {System.out.println(e.getMessage());}if (recordMetadata != null) {System.out.println("异步消息——topic:"+recordMetadata.topic()+"partition"+recordMetadata.partition()+"offset"+recordMetadata.offset());}}});Thread.sleep(1000);}
}
3.搭建消费者
package com.wen.kafka;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {//参数信息Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.117.80:9092");prop.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//创建消费者Consumer<String,String> consumer = new KafkaConsumer<String, String>(prop);//订阅主题consumer.subscribe(Arrays.asList("test"));//拉取消息while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}}}
}
相关文章:
【Kafka】Java整合Kafka
1.引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency> 2.搭建生产者 package com.wen.kafka;import org.apache.kafka.clients.produ…...
所里网连不上,我服了
所里网连不上,我服了所里网连不上,我服了所里网连不上,我服了...
Yakit工具篇:WebFuzzer模块之热加载技术
简介 官方定义: 什么是热加载? 广义上来说,热加载是一种允许在不停止或重启应用程序的情况下,动态加载或更新特定组件或模块的功能。这种技术常用于开发过程中,提高开发效率和用户体验。 在Yakit 的Web Fuzzer中&…...
Linux基本指令(前篇)
目录 1.ls指令 2.pwd指令 3.cd 指令 4.touch指令 5.mkdir指令(重要) 6.rmdir指令 && rm 指令(重要) 7.man指令(重要) 1.ls指令 ls 选项 目录或文件 对于目录,该命令列出该目录下的所…...
[网鼎杯 2020 青龙组]singal
一道VM题目 可以看到长度是15 跟踪调用read函数的函数 分析一下switch中每个指令的含义、 在scanf下面打断点 在关键跳转处下断点 打开Ponce插件 GitHub - illera88/Ponce: IDA 2016 plugin contest winner! Symbolic Execution just one-click away! 然后开始动调 输入15个…...
Qt/QML编程学习之心得:一个QML工程的学习笔记(十)
前言: 到底什么是Qt Quick呢?因为Qt Quick是Qt新引入的,Qt Quick由Qt Quick模块提供,它是一个编写QML应用的标准库。Qt Quick模块提供了两种接口:使用QML语言创建应用的QML接口和使用C++语言扩展QML的C++接口。使用Qt Quick模块,设计人员和开发人员可以轻松地构建流畅的…...
LeetCode OJ循环队列(C语言)
1.题目的初步分析 我们分析上述题目的时候会发现题目非常的长,不好整理思路,我这里可以大致的将本题的几个核心点说出来: 1.队列的思路 循环队列说来说去不还是队列嘛,那么队列的基本操作增删查改、以及队列的基本结构肯定都是不能…...
打码平台之图鉴的使用步骤
打码平台之图鉴 背景: 今天给大家推荐一个我一直使用的验证码识别平台,图鉴,我没有收费,我只是觉得这个网站使用方便,支持验证码种类多,好了,话不多说,上教程! 注册…...
站群服务器与普通服务器有哪些区别?
站群服务器"通常指一组被单个实体或组织控制的网络站点,用于提高特定站点在搜索引擎中的排名。在讨论站群服务器与普通服务器的区别时,可能涉及到以下方面: 1. IP地址: 站群服务器: 站群服务器可能涉及多个站点&a…...
Java-使用poi-tl根据word模板动态生成word
作者wangsz,想写一些关于word的工具,所以就写了这篇文章 1.首先,先导入所需要的依赖(poi相关依赖即可) <!-- POI --><dependency><groupId>org.apache.poi</groupId><artifactId>poi&l…...
计算机毕业设计 基于SpringBoot的物业管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解
博主介绍:✌从事软件开发10年之余,专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ 🍅文末获取源码联系🍅 👇🏻 精…...
UI for Apache Kafka
文章Overview of UI Tools for Monitoring and Management of Apache Kafka Clusters | by German Osin | Towards Data Science中介绍了8种常见的kafka UI工具,这些产品的核心功能对比信息如下图所示, 通过对比发现 UI for Apache Kafka 功能齐全且免费,因此可以作为我们的首…...
iview table 默认排序字段不高亮解决办法
iview treeSelect 组件封装 1、表格增加排序时触发的方法2、定义三个变量,sortColumnDefaultStyle存放默认的样式,定义页面默认的列以及顺序3、显示的列加上 sortable, 和样式4、使用下面这块代表默认选中5、点击时清除掉默认的排序6、把排序的字段查询时…...
系列七、ThreadLocal为什么会导致内存泄漏
一、ThreadLocal为什么会导致内存泄露 1.1、ThreadLocalMap的基本结构 ThreadLocalMap是ThreadLocal的内部类,没有实现Map接口,用独立的方式实现了Map的功能,其内部的Entry也是独立实现的。源码如下: 1.2、ThreadLocal引用示意图…...
如何避免死锁
程序员的公众号:源1024,获取更多资料,无加密无套路! 最近整理了一波电子书籍资料,包含《Effective Java中文版 第2版》《深入JAVA虚拟机》,《重构改善既有代码设计》,《MySQL高性能-第3版》&…...
HTML新特性【缩放图像、图像切片、平移、旋转、缩放、变形、裁切路径、时钟、运动的小球】(二)-全面详解(学习总结---从入门到深化)
目录 绘制图像_缩放图像 绘制图像_图像切片 Canvas状态的保存和恢复 图形变形_平移 图形变形_旋转 图形变形_缩放 图形变形_变形 裁切路径 动画_时钟 动画_运动的小球 引入外部SVG 绘制图像_缩放图像 ctx.drawImage(img, x, y, width, height) img …...
C/C++ 开发SCM服务管理组件
SCM(Service Control Manager)服务管理器是 Windows 操作系统中的一个关键组件,负责管理系统服务的启动、停止和配置。服务是一种在后台运行的应用程序,可以在系统启动时自动启动,也可以由用户或其他应用程序手动启动。…...
springboot程序启动成功后执行的方法
//实现该接口,run方法既程序启动成功后将要执行的方法 // // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) //package org.springframework.boot;FunctionalInterface public interface CommandLineRunner {…...
STM32 寄存器配置笔记——USART配置 打印
一、概述 本文主要介绍如何配置USART,并通过USART打印验证结果。以stm32f10为例,将PA9、PA10复用为USART功能,使用HSE PLL输出72MHZ时钟 APB2 clk不分频提供配置9600波特率。波特率计算公式如下: fck即为APB2 clk参考计算…...
Spine深入学习 —— 数据
atlas数据的处理 作用 图集,描述了spine使用的图片信息。 结构 page 页块 页块包含了页图像名称, 以及加载和渲染图像的相关信息。 page1.pngsize: 640, 480format: RGBA8888filter: Linear, Linearrepeat: nonepma: truename: 首行为该页中的图像名称. 图片位…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15
缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下: struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
AspectJ 在 Android 中的完整使用指南
一、环境配置(Gradle 7.0 适配) 1. 项目级 build.gradle // 注意:沪江插件已停更,推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...
