当前位置: 首页 > news >正文

kafka复习:(20):消费者拦截器的使用

一、定义消费者拦截器(只消费含"sister"的消息)

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;import java.util.*;public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {Map<TopicPartition,List<ConsumerRecord<String,String>>> finalResult=new HashMap<>();Set<TopicPartition> partitionSet = records.partitions();for(TopicPartition topicPartition: partitionSet){List<ConsumerRecord<String,String>> partitionRecordList=records.records(topicPartition);List<ConsumerRecord<String,String>> newPartitionRecordList=new LinkedList<>();for(ConsumerRecord<String,String> record: partitionRecordList){if(record.value().contains("sister")){newPartitionRecordList.add(record);}}finalResult.put(topicPartition,newPartitionRecordList);}return new ConsumerRecords<>(finalResult);}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {offsets.forEach((tp,meta) -> {System.out.println("消费者拦截器:"+tp.topic()+":"+meta.offset());});}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

二、定义消费者,配置消费者拦截器

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class ConsumerInterceptorTest  {public static void main(String[] args) {String topic="testTopic2";String server="xx.xx.xx.xx:9092";Properties properties=new Properties();properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupTest4");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName());KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(properties);myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000));for(ConsumerRecord consumerRecord: records){System.out.println(consumerRecord.value());}//myConsumer.commitSync();}}
}

相关文章:

kafka复习:(20):消费者拦截器的使用

一、定义消费者拦截器&#xff08;只消费含"sister"的消息&#xff09; package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.…...

水库大坝安全监测的主要内容包括哪些?

在水库大坝的实时监测中&#xff0c;主要任务是通过无线传感网络监测各个监测点的水位、水压、渗流、流量、扬压力等数据&#xff0c;并在计算机上用数据模式或图形模式进行实时反映&#xff0c;以掌握整个水库大坝的各项变化情况。大坝安全监测系统能实现全天候远程自动监测&a…...

Cadence软件屏幕显示问题

问题 就是今天打开Cadence软件想导出网表看一下&#xff0c;发现没有显示确定按钮什么的&#xff0c;那个窗口也是无语&#xff0c;不能移动&#xff0c;缩放也只能左右缩放&#xff0c;还不能缩小什么的&#xff0c;真的醉了&#xff0c;后面就是调整窗口的分辨率。 因为我最…...

访问服务器快慢的因素

我们在租用服务器的过程中&#xff0c;可能在访问速度方面&#xff0c;会受到某些因素影响&#xff0c;如果您要进行此项业务&#xff0c;进行一些简单的了解 是非常的有必要的&#xff0c;下面壹基比小鑫带大家一起去做个具体的探讨吧。 对于服务器不太了解的都认为&#xff0…...

vue(element ui安装)

目录 一&#xff0c;element ui安装二&#xff0c;main.js三&#xff0c;使用element ui最后 一&#xff0c;element ui安装 先在盘服中找到你创建的node的位置 如有不懂根据可以看看上一章安装node 然后在终端找到 进入这个位置之后就可以安装了 输入npm i element-ui -S这个…...

基于FPGA视频接口之HDMI2.0编/解码

简介 为什么要特别说明HDMI的版本,是因为HDMI的版本众多,代表的HDMI速度同样不同,当前版本在HDMI2.1速度达到48Gbps,可以传输4K及以上图像,但我们当前还停留在1080P@60部分,且使用的芯片和硬件结构有很大差别,故将HDMI分为两个部分说明1080@60以下分辨率和4K以上分辨率(…...

Codeforces Round #894 (Div.3)

文章目录 前言A. Gift Carpet题目&#xff1a;输入&#xff1a;输出&#xff1a;思路&#xff1a;代码&#xff1a; B. Sequence Game题目&#xff1a;输入&#xff1a;输出&#xff1a;思路&#xff1a;代码&#xff1a; C. Flower City Fence题目&#xff1a;输入&#xff1a…...

MyBatid动态语句且模糊查询

目录 什么是MyBtais动态语句&#xff1f;&#xff1f;&#xff1f; MyBatis常用的动态标签和表达式 if标签 Choose标签 where标签 MyBatis模糊查询 #与$的区别 ​编辑 MyBatis映射 resultType resultMap 什么是MyBtais动态语句&#xff1f;&#xff1f;&#xff1f;…...

JVM——垃圾回收器G1+垃圾回收调优

4.4 G1&#xff08;一个垃圾回收器&#xff09; 定义: 取代了CMS垃圾回收器。和CMS一样时并发的。 适用场景: 物理上分区&#xff0c;逻辑上分代。 相关JVM参数: -XX:UseG1GC-XX:G1HeapRegionSizesize-XX:MaxGCPauseMillistime 1) G1 垃圾回收阶段 三个回收阶段&#xff0…...

【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析

【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析 系列文章汇总见:《【SA8295P 源码分析】00 - 系列文章链接汇总》 本文链接:《【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析》 主要参数如下: hw_…...

iptables的使用规则

环境中为了安全要限制swagger的访问&#xff0c;最简单的方式是通过iptables防火墙设置规则限制。 在测试服务器中设置访问swagger-ui.html显示如下&#xff0c;区分大小写&#xff1a; iptables设置限制访问9783端口的swagger字段的请求&#xff1a; iptables -A INPUT -p t…...

JS 动画 vs CSS 动画:究竟有何不同?

在 Web 前端开发中&#xff0c;动画是提高用户体验的关键因素之一。我们通常可以使用 JavaScript&#xff08;JS&#xff09;和 CSS 来创建动画效果。但是&#xff0c;这两者之间有哪些区别呢&#xff1f;在本文中&#xff0c;我们将深入研究 JS 动画和 CSS 动画&#xff0c;探…...

供应链 | 大数据报童模型:基于机器学习的实践见解

论文解读&#xff1a;李欣 马玺渊 作者&#xff1a;Gah-Yi Ban, Cynthia Rudin 引用&#xff1a;Ban, Gah-Yi and Cynthia Rudin. The big data newsvendor: Practical insights from machine learning. Operations Research 67.1 (2019): 90-108. 文章链接&#xff1a;https…...

Java开发工作问题整理与记录

1、为什么Autowired不能注入static成员属性 扫描Class类需要注入的元数据的时候&#xff0c;直接选择忽略掉了static成员&#xff08;包括属性和方法&#xff09; Spring 依赖注入是依赖set方法, set方法是普通的对象方法,static变量是类的属性 AutowiredAnnotationBeanPostP…...

静态代码扫描持续构建(Jenkins)

前提条件 已正确安装、配置Jenkins环境&#xff0c;并装有 Gradle 插件、HTML 插件、SVN 插件等。如下图所示&#xff1a; 已正确安装、配置android sdk&#xff0c;在cmd窗口输入命令“android -h”,回车 配置步骤 打开Jenkins&#xff0c;新建一个job&#xff0c;输入项目…...

Git gui教程---汇总篇

想说的 汇总篇就是你应该已经学会基本操作了。剩下的代码上传云端之类的我懒得教了&#xff0c;反正你看命令版也差不多了&#xff0c;具体怎么操作就自己想吧。接下来的汇总篇&#xff0c;主要将每一个篇章对应的git命令写出来&#xff0c;一一对应&#xff0c;毕竟现在的编辑…...

flink sql checkpoint 调优配置

- execution.checkpointing.interval: 检查点之间的时间间隔&#xff08;以毫秒为单位&#xff09;。在此间隔内&#xff0c;系统将生成新的检查点 SET execution.checkpointing.interval 6000; - execution.checkpointing.tolerable-failed-checkpoints: 允许的连续失败检查…...

Linux 网络文件共享介绍

Linux 网络文件共享介绍 一.常见的存储类型 目前常见的存储类型有 DAS,NAS,SAN 等&#xff0c;最主要的区别是硬盘存储媒介是如何 于处理器连接的&#xff0c;以及处理器使用何种方式来访问磁盘&#xff0c;以及访问磁盘使用 的协议(网络协议、I/O 协议)。 三种存储类型如下 直…...

Qt中如何在qml文件中使用其他的qml文件并创建对象

如果想使用其他的qml文件直接创建对象&#xff0c;必须先这样导入其qml文件并as成别名&#xff0c;才可以创建对象并使用它。 一、导入qml文件&#xff0c;例如&#xff1a; import "CameraConfig.qml" as CameraConfig import "CameraDevelopView.qml" a…...

学习心得04:CUDA

2018年的时候&#xff0c;看过同事使用CUDA。因为工作忙&#xff0c;所以也没请教。 近来买了本入门的CUDA书&#xff0c;学习了一番。有两个心得&#xff1a; 工作拆分。 CUDA是并行计算&#xff0c;也就是大量重复的可拆分的计算。数组最符合这个要求。简单点就是把数组外面…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

Debian系统简介

目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版&#xff…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式&#xff1a; 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接&#xff1a;3403. 从盒子中找出字典序最大的字符串 I 代码如下&#xff1a; class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)

文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

【HTTP三个基础问题】

面试官您好&#xff01;HTTP是超文本传输协议&#xff0c;是互联网上客户端和服务器之间传输超文本数据&#xff08;比如文字、图片、音频、视频等&#xff09;的核心协议&#xff0c;当前互联网应用最广泛的版本是HTTP1.1&#xff0c;它基于经典的C/S模型&#xff0c;也就是客…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3

一&#xff0c;概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本&#xff1a;2014.07&#xff1b; Kernel版本&#xff1a;Linux-3.10&#xff1b; 二&#xff0c;Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01)&#xff0c;并让boo…...

Xen Server服务器释放磁盘空间

disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...