当前位置: 首页 > news >正文

kafka复习:(20):消费者拦截器的使用

一、定义消费者拦截器(只消费含"sister"的消息)

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;import java.util.*;public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {Map<TopicPartition,List<ConsumerRecord<String,String>>> finalResult=new HashMap<>();Set<TopicPartition> partitionSet = records.partitions();for(TopicPartition topicPartition: partitionSet){List<ConsumerRecord<String,String>> partitionRecordList=records.records(topicPartition);List<ConsumerRecord<String,String>> newPartitionRecordList=new LinkedList<>();for(ConsumerRecord<String,String> record: partitionRecordList){if(record.value().contains("sister")){newPartitionRecordList.add(record);}}finalResult.put(topicPartition,newPartitionRecordList);}return new ConsumerRecords<>(finalResult);}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {offsets.forEach((tp,meta) -> {System.out.println("消费者拦截器:"+tp.topic()+":"+meta.offset());});}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

二、定义消费者,配置消费者拦截器

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class ConsumerInterceptorTest  {public static void main(String[] args) {String topic="testTopic2";String server="xx.xx.xx.xx:9092";Properties properties=new Properties();properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupTest4");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName());KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(properties);myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000));for(ConsumerRecord consumerRecord: records){System.out.println(consumerRecord.value());}//myConsumer.commitSync();}}
}

相关文章:

kafka复习:(20):消费者拦截器的使用

一、定义消费者拦截器&#xff08;只消费含"sister"的消息&#xff09; package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.…...

水库大坝安全监测的主要内容包括哪些?

在水库大坝的实时监测中&#xff0c;主要任务是通过无线传感网络监测各个监测点的水位、水压、渗流、流量、扬压力等数据&#xff0c;并在计算机上用数据模式或图形模式进行实时反映&#xff0c;以掌握整个水库大坝的各项变化情况。大坝安全监测系统能实现全天候远程自动监测&a…...

Cadence软件屏幕显示问题

问题 就是今天打开Cadence软件想导出网表看一下&#xff0c;发现没有显示确定按钮什么的&#xff0c;那个窗口也是无语&#xff0c;不能移动&#xff0c;缩放也只能左右缩放&#xff0c;还不能缩小什么的&#xff0c;真的醉了&#xff0c;后面就是调整窗口的分辨率。 因为我最…...

访问服务器快慢的因素

我们在租用服务器的过程中&#xff0c;可能在访问速度方面&#xff0c;会受到某些因素影响&#xff0c;如果您要进行此项业务&#xff0c;进行一些简单的了解 是非常的有必要的&#xff0c;下面壹基比小鑫带大家一起去做个具体的探讨吧。 对于服务器不太了解的都认为&#xff0…...

vue(element ui安装)

目录 一&#xff0c;element ui安装二&#xff0c;main.js三&#xff0c;使用element ui最后 一&#xff0c;element ui安装 先在盘服中找到你创建的node的位置 如有不懂根据可以看看上一章安装node 然后在终端找到 进入这个位置之后就可以安装了 输入npm i element-ui -S这个…...

基于FPGA视频接口之HDMI2.0编/解码

简介 为什么要特别说明HDMI的版本,是因为HDMI的版本众多,代表的HDMI速度同样不同,当前版本在HDMI2.1速度达到48Gbps,可以传输4K及以上图像,但我们当前还停留在1080P@60部分,且使用的芯片和硬件结构有很大差别,故将HDMI分为两个部分说明1080@60以下分辨率和4K以上分辨率(…...

Codeforces Round #894 (Div.3)

文章目录 前言A. Gift Carpet题目&#xff1a;输入&#xff1a;输出&#xff1a;思路&#xff1a;代码&#xff1a; B. Sequence Game题目&#xff1a;输入&#xff1a;输出&#xff1a;思路&#xff1a;代码&#xff1a; C. Flower City Fence题目&#xff1a;输入&#xff1a…...

MyBatid动态语句且模糊查询

目录 什么是MyBtais动态语句&#xff1f;&#xff1f;&#xff1f; MyBatis常用的动态标签和表达式 if标签 Choose标签 where标签 MyBatis模糊查询 #与$的区别 ​编辑 MyBatis映射 resultType resultMap 什么是MyBtais动态语句&#xff1f;&#xff1f;&#xff1f;…...

JVM——垃圾回收器G1+垃圾回收调优

4.4 G1&#xff08;一个垃圾回收器&#xff09; 定义: 取代了CMS垃圾回收器。和CMS一样时并发的。 适用场景: 物理上分区&#xff0c;逻辑上分代。 相关JVM参数: -XX:UseG1GC-XX:G1HeapRegionSizesize-XX:MaxGCPauseMillistime 1) G1 垃圾回收阶段 三个回收阶段&#xff0…...

【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析

【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析 系列文章汇总见:《【SA8295P 源码分析】00 - 系列文章链接汇总》 本文链接:《【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析》 主要参数如下: hw_…...

iptables的使用规则

环境中为了安全要限制swagger的访问&#xff0c;最简单的方式是通过iptables防火墙设置规则限制。 在测试服务器中设置访问swagger-ui.html显示如下&#xff0c;区分大小写&#xff1a; iptables设置限制访问9783端口的swagger字段的请求&#xff1a; iptables -A INPUT -p t…...

JS 动画 vs CSS 动画:究竟有何不同?

在 Web 前端开发中&#xff0c;动画是提高用户体验的关键因素之一。我们通常可以使用 JavaScript&#xff08;JS&#xff09;和 CSS 来创建动画效果。但是&#xff0c;这两者之间有哪些区别呢&#xff1f;在本文中&#xff0c;我们将深入研究 JS 动画和 CSS 动画&#xff0c;探…...

供应链 | 大数据报童模型:基于机器学习的实践见解

论文解读&#xff1a;李欣 马玺渊 作者&#xff1a;Gah-Yi Ban, Cynthia Rudin 引用&#xff1a;Ban, Gah-Yi and Cynthia Rudin. The big data newsvendor: Practical insights from machine learning. Operations Research 67.1 (2019): 90-108. 文章链接&#xff1a;https…...

Java开发工作问题整理与记录

1、为什么Autowired不能注入static成员属性 扫描Class类需要注入的元数据的时候&#xff0c;直接选择忽略掉了static成员&#xff08;包括属性和方法&#xff09; Spring 依赖注入是依赖set方法, set方法是普通的对象方法,static变量是类的属性 AutowiredAnnotationBeanPostP…...

静态代码扫描持续构建(Jenkins)

前提条件 已正确安装、配置Jenkins环境&#xff0c;并装有 Gradle 插件、HTML 插件、SVN 插件等。如下图所示&#xff1a; 已正确安装、配置android sdk&#xff0c;在cmd窗口输入命令“android -h”,回车 配置步骤 打开Jenkins&#xff0c;新建一个job&#xff0c;输入项目…...

Git gui教程---汇总篇

想说的 汇总篇就是你应该已经学会基本操作了。剩下的代码上传云端之类的我懒得教了&#xff0c;反正你看命令版也差不多了&#xff0c;具体怎么操作就自己想吧。接下来的汇总篇&#xff0c;主要将每一个篇章对应的git命令写出来&#xff0c;一一对应&#xff0c;毕竟现在的编辑…...

flink sql checkpoint 调优配置

- execution.checkpointing.interval: 检查点之间的时间间隔&#xff08;以毫秒为单位&#xff09;。在此间隔内&#xff0c;系统将生成新的检查点 SET execution.checkpointing.interval 6000; - execution.checkpointing.tolerable-failed-checkpoints: 允许的连续失败检查…...

Linux 网络文件共享介绍

Linux 网络文件共享介绍 一.常见的存储类型 目前常见的存储类型有 DAS,NAS,SAN 等&#xff0c;最主要的区别是硬盘存储媒介是如何 于处理器连接的&#xff0c;以及处理器使用何种方式来访问磁盘&#xff0c;以及访问磁盘使用 的协议(网络协议、I/O 协议)。 三种存储类型如下 直…...

Qt中如何在qml文件中使用其他的qml文件并创建对象

如果想使用其他的qml文件直接创建对象&#xff0c;必须先这样导入其qml文件并as成别名&#xff0c;才可以创建对象并使用它。 一、导入qml文件&#xff0c;例如&#xff1a; import "CameraConfig.qml" as CameraConfig import "CameraDevelopView.qml" a…...

学习心得04:CUDA

2018年的时候&#xff0c;看过同事使用CUDA。因为工作忙&#xff0c;所以也没请教。 近来买了本入门的CUDA书&#xff0c;学习了一番。有两个心得&#xff1a; 工作拆分。 CUDA是并行计算&#xff0c;也就是大量重复的可拆分的计算。数组最符合这个要求。简单点就是把数组外面…...

后进先出(LIFO)详解

LIFO 是 Last In, First Out 的缩写&#xff0c;中文译为后进先出。这是一种数据结构的工作原则&#xff0c;类似于一摞盘子或一叠书本&#xff1a; 最后放进去的元素最先出来 -想象往筒状容器里放盘子&#xff1a; &#xff08;1&#xff09;你放进的最后一个盘子&#xff08…...

[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?

&#x1f9e0; 智能合约中的数据是如何在区块链中保持一致的&#xff1f; 为什么所有区块链节点都能得出相同结果&#xff1f;合约调用这么复杂&#xff0c;状态真能保持一致吗&#xff1f;本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里&#xf…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

IT供电系统绝缘监测及故障定位解决方案

随着新能源的快速发展&#xff0c;光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域&#xff0c;IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选&#xff0c;但在长期运行中&#xff0c;例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...

06 Deep learning神经网络编程基础 激活函数 --吴恩达

深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf

FTP 客服管理系统 实现kefu123登录&#xff0c;不允许匿名访问&#xff0c;kefu只能访问/data/kefu目录&#xff0c;不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...

手机平板能效生态设计指令EU 2023/1670标准解读

手机平板能效生态设计指令EU 2023/1670标准解读 以下是针对欧盟《手机和平板电脑生态设计法规》(EU) 2023/1670 的核心解读&#xff0c;综合法规核心要求、最新修正及企业合规要点&#xff1a; 一、法规背景与目标 生效与强制时间 发布于2023年8月31日&#xff08;OJ公报&…...

在 Visual Studio Code 中使用驭码 CodeRider 提升开发效率:以冒泡排序为例

目录 前言1 插件安装与配置1.1 安装驭码 CodeRider1.2 初始配置建议 2 示例代码&#xff1a;冒泡排序3 驭码 CodeRider 功能详解3.1 功能概览3.2 代码解释功能3.3 自动注释生成3.4 逻辑修改功能3.5 单元测试自动生成3.6 代码优化建议 4 驭码的实际应用建议5 常见问题与解决建议…...

Python常用模块:time、os、shutil与flask初探

一、Flask初探 & PyCharm终端配置 目的: 快速搭建小型Web服务器以提供数据。 工具: 第三方Web框架 Flask (需 pip install flask 安装)。 安装 Flask: 建议: 使用 PyCharm 内置的 Terminal (模拟命令行) 进行安装,避免频繁切换。 PyCharm Terminal 配置建议: 打开 Py…...

UE5 音效系统

一.音效管理 音乐一般都是WAV,创建一个背景音乐类SoudClass,一个音效类SoundClass。所有的音乐都分为这两个类。再创建一个总音乐类&#xff0c;将上述两个作为它的子类。 接着我们创建一个音乐混合类SoundMix&#xff0c;将上述三个类翻入其中&#xff0c;通过它管理每个音乐…...