当前位置: 首页 > news >正文

kafka复习:(20):消费者拦截器的使用

一、定义消费者拦截器(只消费含"sister"的消息)

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;import java.util.*;public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {Map<TopicPartition,List<ConsumerRecord<String,String>>> finalResult=new HashMap<>();Set<TopicPartition> partitionSet = records.partitions();for(TopicPartition topicPartition: partitionSet){List<ConsumerRecord<String,String>> partitionRecordList=records.records(topicPartition);List<ConsumerRecord<String,String>> newPartitionRecordList=new LinkedList<>();for(ConsumerRecord<String,String> record: partitionRecordList){if(record.value().contains("sister")){newPartitionRecordList.add(record);}}finalResult.put(topicPartition,newPartitionRecordList);}return new ConsumerRecords<>(finalResult);}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {offsets.forEach((tp,meta) -> {System.out.println("消费者拦截器:"+tp.topic()+":"+meta.offset());});}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

二、定义消费者,配置消费者拦截器

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class ConsumerInterceptorTest  {public static void main(String[] args) {String topic="testTopic2";String server="xx.xx.xx.xx:9092";Properties properties=new Properties();properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupTest4");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName());KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(properties);myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000));for(ConsumerRecord consumerRecord: records){System.out.println(consumerRecord.value());}//myConsumer.commitSync();}}
}

相关文章:

kafka复习:(20):消费者拦截器的使用

一、定义消费者拦截器&#xff08;只消费含"sister"的消息&#xff09; package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.…...

水库大坝安全监测的主要内容包括哪些?

在水库大坝的实时监测中&#xff0c;主要任务是通过无线传感网络监测各个监测点的水位、水压、渗流、流量、扬压力等数据&#xff0c;并在计算机上用数据模式或图形模式进行实时反映&#xff0c;以掌握整个水库大坝的各项变化情况。大坝安全监测系统能实现全天候远程自动监测&a…...

Cadence软件屏幕显示问题

问题 就是今天打开Cadence软件想导出网表看一下&#xff0c;发现没有显示确定按钮什么的&#xff0c;那个窗口也是无语&#xff0c;不能移动&#xff0c;缩放也只能左右缩放&#xff0c;还不能缩小什么的&#xff0c;真的醉了&#xff0c;后面就是调整窗口的分辨率。 因为我最…...

访问服务器快慢的因素

我们在租用服务器的过程中&#xff0c;可能在访问速度方面&#xff0c;会受到某些因素影响&#xff0c;如果您要进行此项业务&#xff0c;进行一些简单的了解 是非常的有必要的&#xff0c;下面壹基比小鑫带大家一起去做个具体的探讨吧。 对于服务器不太了解的都认为&#xff0…...

vue(element ui安装)

目录 一&#xff0c;element ui安装二&#xff0c;main.js三&#xff0c;使用element ui最后 一&#xff0c;element ui安装 先在盘服中找到你创建的node的位置 如有不懂根据可以看看上一章安装node 然后在终端找到 进入这个位置之后就可以安装了 输入npm i element-ui -S这个…...

基于FPGA视频接口之HDMI2.0编/解码

简介 为什么要特别说明HDMI的版本,是因为HDMI的版本众多,代表的HDMI速度同样不同,当前版本在HDMI2.1速度达到48Gbps,可以传输4K及以上图像,但我们当前还停留在1080P@60部分,且使用的芯片和硬件结构有很大差别,故将HDMI分为两个部分说明1080@60以下分辨率和4K以上分辨率(…...

Codeforces Round #894 (Div.3)

文章目录 前言A. Gift Carpet题目&#xff1a;输入&#xff1a;输出&#xff1a;思路&#xff1a;代码&#xff1a; B. Sequence Game题目&#xff1a;输入&#xff1a;输出&#xff1a;思路&#xff1a;代码&#xff1a; C. Flower City Fence题目&#xff1a;输入&#xff1a…...

MyBatid动态语句且模糊查询

目录 什么是MyBtais动态语句&#xff1f;&#xff1f;&#xff1f; MyBatis常用的动态标签和表达式 if标签 Choose标签 where标签 MyBatis模糊查询 #与$的区别 ​编辑 MyBatis映射 resultType resultMap 什么是MyBtais动态语句&#xff1f;&#xff1f;&#xff1f;…...

JVM——垃圾回收器G1+垃圾回收调优

4.4 G1&#xff08;一个垃圾回收器&#xff09; 定义: 取代了CMS垃圾回收器。和CMS一样时并发的。 适用场景: 物理上分区&#xff0c;逻辑上分代。 相关JVM参数: -XX:UseG1GC-XX:G1HeapRegionSizesize-XX:MaxGCPauseMillistime 1) G1 垃圾回收阶段 三个回收阶段&#xff0…...

【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析

【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析 系列文章汇总见:《【SA8295P 源码分析】00 - 系列文章链接汇总》 本文链接:《【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析》 主要参数如下: hw_…...

iptables的使用规则

环境中为了安全要限制swagger的访问&#xff0c;最简单的方式是通过iptables防火墙设置规则限制。 在测试服务器中设置访问swagger-ui.html显示如下&#xff0c;区分大小写&#xff1a; iptables设置限制访问9783端口的swagger字段的请求&#xff1a; iptables -A INPUT -p t…...

JS 动画 vs CSS 动画:究竟有何不同?

在 Web 前端开发中&#xff0c;动画是提高用户体验的关键因素之一。我们通常可以使用 JavaScript&#xff08;JS&#xff09;和 CSS 来创建动画效果。但是&#xff0c;这两者之间有哪些区别呢&#xff1f;在本文中&#xff0c;我们将深入研究 JS 动画和 CSS 动画&#xff0c;探…...

供应链 | 大数据报童模型:基于机器学习的实践见解

论文解读&#xff1a;李欣 马玺渊 作者&#xff1a;Gah-Yi Ban, Cynthia Rudin 引用&#xff1a;Ban, Gah-Yi and Cynthia Rudin. The big data newsvendor: Practical insights from machine learning. Operations Research 67.1 (2019): 90-108. 文章链接&#xff1a;https…...

Java开发工作问题整理与记录

1、为什么Autowired不能注入static成员属性 扫描Class类需要注入的元数据的时候&#xff0c;直接选择忽略掉了static成员&#xff08;包括属性和方法&#xff09; Spring 依赖注入是依赖set方法, set方法是普通的对象方法,static变量是类的属性 AutowiredAnnotationBeanPostP…...

静态代码扫描持续构建(Jenkins)

前提条件 已正确安装、配置Jenkins环境&#xff0c;并装有 Gradle 插件、HTML 插件、SVN 插件等。如下图所示&#xff1a; 已正确安装、配置android sdk&#xff0c;在cmd窗口输入命令“android -h”,回车 配置步骤 打开Jenkins&#xff0c;新建一个job&#xff0c;输入项目…...

Git gui教程---汇总篇

想说的 汇总篇就是你应该已经学会基本操作了。剩下的代码上传云端之类的我懒得教了&#xff0c;反正你看命令版也差不多了&#xff0c;具体怎么操作就自己想吧。接下来的汇总篇&#xff0c;主要将每一个篇章对应的git命令写出来&#xff0c;一一对应&#xff0c;毕竟现在的编辑…...

flink sql checkpoint 调优配置

- execution.checkpointing.interval: 检查点之间的时间间隔&#xff08;以毫秒为单位&#xff09;。在此间隔内&#xff0c;系统将生成新的检查点 SET execution.checkpointing.interval 6000; - execution.checkpointing.tolerable-failed-checkpoints: 允许的连续失败检查…...

Linux 网络文件共享介绍

Linux 网络文件共享介绍 一.常见的存储类型 目前常见的存储类型有 DAS,NAS,SAN 等&#xff0c;最主要的区别是硬盘存储媒介是如何 于处理器连接的&#xff0c;以及处理器使用何种方式来访问磁盘&#xff0c;以及访问磁盘使用 的协议(网络协议、I/O 协议)。 三种存储类型如下 直…...

Qt中如何在qml文件中使用其他的qml文件并创建对象

如果想使用其他的qml文件直接创建对象&#xff0c;必须先这样导入其qml文件并as成别名&#xff0c;才可以创建对象并使用它。 一、导入qml文件&#xff0c;例如&#xff1a; import "CameraConfig.qml" as CameraConfig import "CameraDevelopView.qml" a…...

学习心得04:CUDA

2018年的时候&#xff0c;看过同事使用CUDA。因为工作忙&#xff0c;所以也没请教。 近来买了本入门的CUDA书&#xff0c;学习了一番。有两个心得&#xff1a; 工作拆分。 CUDA是并行计算&#xff0c;也就是大量重复的可拆分的计算。数组最符合这个要求。简单点就是把数组外面…...

挑战杯推荐项目

“人工智能”创意赛 - 智能艺术创作助手&#xff1a;借助大模型技术&#xff0c;开发能根据用户输入的主题、风格等要求&#xff0c;生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用&#xff0c;帮助艺术家和创意爱好者激发创意、提高创作效率。 ​ - 个性化梦境…...

unix/linux,sudo,其发展历程详细时间线、由来、历史背景

sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...

Go 语言并发编程基础:无缓冲与有缓冲通道

在上一章节中&#xff0c;我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道&#xff0c;它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好&#xff0…...

Go 并发编程基础:通道(Channel)的使用

在 Go 中&#xff0c;Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式&#xff0c;用于在多个 Goroutine 之间传递数据&#xff0c;从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...

【JVM面试篇】高频八股汇总——类加载和类加载器

目录 1. 讲一下类加载过程&#xff1f; 2. Java创建对象的过程&#xff1f; 3. 对象的生命周期&#xff1f; 4. 类加载器有哪些&#xff1f; 5. 双亲委派模型的作用&#xff08;好处&#xff09;&#xff1f; 6. 讲一下类的加载和双亲委派原则&#xff1f; 7. 双亲委派模…...

Bean 作用域有哪些?如何答出技术深度?

导语&#xff1a; Spring 面试绕不开 Bean 的作用域问题&#xff0c;这是面试官考察候选人对 Spring 框架理解深度的常见方式。本文将围绕“Spring 中的 Bean 作用域”展开&#xff0c;结合典型面试题及实战场景&#xff0c;帮你厘清重点&#xff0c;打破模板式回答&#xff0c…...

uniapp 实现腾讯云IM群文件上传下载功能

UniApp 集成腾讯云IM实现群文件上传下载功能全攻略 一、功能背景与技术选型 在团队协作场景中&#xff0c;群文件共享是核心需求之一。本文将介绍如何基于腾讯云IMCOS&#xff0c;在uniapp中实现&#xff1a; 群内文件上传/下载文件元数据管理下载进度追踪跨平台文件预览 二…...

结构化文件管理实战:实现目录自动创建与归类

手动操作容易因疲劳或疏忽导致命名错误、路径混乱等问题&#xff0c;进而引发后续程序异常。使用工具进行标准化操作&#xff0c;能有效降低出错概率。 需要快速整理大量文件的技术用户而言&#xff0c;这款工具提供了一种轻便高效的解决方案。程序体积仅有 156KB&#xff0c;…...