kafka复习:(20):消费者拦截器的使用
一、定义消费者拦截器(只消费含"sister"的消息)
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;import java.util.*;public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {Map<TopicPartition,List<ConsumerRecord<String,String>>> finalResult=new HashMap<>();Set<TopicPartition> partitionSet = records.partitions();for(TopicPartition topicPartition: partitionSet){List<ConsumerRecord<String,String>> partitionRecordList=records.records(topicPartition);List<ConsumerRecord<String,String>> newPartitionRecordList=new LinkedList<>();for(ConsumerRecord<String,String> record: partitionRecordList){if(record.value().contains("sister")){newPartitionRecordList.add(record);}}finalResult.put(topicPartition,newPartitionRecordList);}return new ConsumerRecords<>(finalResult);}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {offsets.forEach((tp,meta) -> {System.out.println("消费者拦截器:"+tp.topic()+":"+meta.offset());});}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
二、定义消费者,配置消费者拦截器
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class ConsumerInterceptorTest {public static void main(String[] args) {String topic="testTopic2";String server="xx.xx.xx.xx:9092";Properties properties=new Properties();properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupTest4");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName());KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(properties);myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000));for(ConsumerRecord consumerRecord: records){System.out.println(consumerRecord.value());}//myConsumer.commitSync();}}
}相关文章:
kafka复习:(20):消费者拦截器的使用
一、定义消费者拦截器(只消费含"sister"的消息) package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.…...
水库大坝安全监测的主要内容包括哪些?
在水库大坝的实时监测中,主要任务是通过无线传感网络监测各个监测点的水位、水压、渗流、流量、扬压力等数据,并在计算机上用数据模式或图形模式进行实时反映,以掌握整个水库大坝的各项变化情况。大坝安全监测系统能实现全天候远程自动监测&a…...
Cadence软件屏幕显示问题
问题 就是今天打开Cadence软件想导出网表看一下,发现没有显示确定按钮什么的,那个窗口也是无语,不能移动,缩放也只能左右缩放,还不能缩小什么的,真的醉了,后面就是调整窗口的分辨率。 因为我最…...
访问服务器快慢的因素
我们在租用服务器的过程中,可能在访问速度方面,会受到某些因素影响,如果您要进行此项业务,进行一些简单的了解 是非常的有必要的,下面壹基比小鑫带大家一起去做个具体的探讨吧。 对于服务器不太了解的都认为࿰…...
vue(element ui安装)
目录 一,element ui安装二,main.js三,使用element ui最后 一,element ui安装 先在盘服中找到你创建的node的位置 如有不懂根据可以看看上一章安装node 然后在终端找到 进入这个位置之后就可以安装了 输入npm i element-ui -S这个…...
基于FPGA视频接口之HDMI2.0编/解码
简介 为什么要特别说明HDMI的版本,是因为HDMI的版本众多,代表的HDMI速度同样不同,当前版本在HDMI2.1速度达到48Gbps,可以传输4K及以上图像,但我们当前还停留在1080P@60部分,且使用的芯片和硬件结构有很大差别,故将HDMI分为两个部分说明1080@60以下分辨率和4K以上分辨率(…...
Codeforces Round #894 (Div.3)
文章目录 前言A. Gift Carpet题目:输入:输出:思路:代码: B. Sequence Game题目:输入:输出:思路:代码: C. Flower City Fence题目:输入:…...
MyBatid动态语句且模糊查询
目录 什么是MyBtais动态语句??? MyBatis常用的动态标签和表达式 if标签 Choose标签 where标签 MyBatis模糊查询 #与$的区别 编辑 MyBatis映射 resultType resultMap 什么是MyBtais动态语句???…...
JVM——垃圾回收器G1+垃圾回收调优
4.4 G1(一个垃圾回收器) 定义: 取代了CMS垃圾回收器。和CMS一样时并发的。 适用场景: 物理上分区,逻辑上分代。 相关JVM参数: -XX:UseG1GC-XX:G1HeapRegionSizesize-XX:MaxGCPauseMillistime 1) G1 垃圾回收阶段 三个回收阶段࿰…...
【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析
【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析 系列文章汇总见:《【SA8295P 源码分析】00 - 系列文章链接汇总》 本文链接:《【SA8295P 源码分析】23 - QNX Ethernet MAC 驱动 之 emac1_config.conf 配置文件解析》 主要参数如下: hw_…...
iptables的使用规则
环境中为了安全要限制swagger的访问,最简单的方式是通过iptables防火墙设置规则限制。 在测试服务器中设置访问swagger-ui.html显示如下,区分大小写: iptables设置限制访问9783端口的swagger字段的请求: iptables -A INPUT -p t…...
JS 动画 vs CSS 动画:究竟有何不同?
在 Web 前端开发中,动画是提高用户体验的关键因素之一。我们通常可以使用 JavaScript(JS)和 CSS 来创建动画效果。但是,这两者之间有哪些区别呢?在本文中,我们将深入研究 JS 动画和 CSS 动画,探…...
供应链 | 大数据报童模型:基于机器学习的实践见解
论文解读:李欣 马玺渊 作者:Gah-Yi Ban, Cynthia Rudin 引用:Ban, Gah-Yi and Cynthia Rudin. The big data newsvendor: Practical insights from machine learning. Operations Research 67.1 (2019): 90-108. 文章链接:https…...
Java开发工作问题整理与记录
1、为什么Autowired不能注入static成员属性 扫描Class类需要注入的元数据的时候,直接选择忽略掉了static成员(包括属性和方法) Spring 依赖注入是依赖set方法, set方法是普通的对象方法,static变量是类的属性 AutowiredAnnotationBeanPostP…...
静态代码扫描持续构建(Jenkins)
前提条件 已正确安装、配置Jenkins环境,并装有 Gradle 插件、HTML 插件、SVN 插件等。如下图所示: 已正确安装、配置android sdk,在cmd窗口输入命令“android -h”,回车 配置步骤 打开Jenkins,新建一个job,输入项目…...
Git gui教程---汇总篇
想说的 汇总篇就是你应该已经学会基本操作了。剩下的代码上传云端之类的我懒得教了,反正你看命令版也差不多了,具体怎么操作就自己想吧。接下来的汇总篇,主要将每一个篇章对应的git命令写出来,一一对应,毕竟现在的编辑…...
flink sql checkpoint 调优配置
- execution.checkpointing.interval: 检查点之间的时间间隔(以毫秒为单位)。在此间隔内,系统将生成新的检查点 SET execution.checkpointing.interval 6000; - execution.checkpointing.tolerable-failed-checkpoints: 允许的连续失败检查…...
Linux 网络文件共享介绍
Linux 网络文件共享介绍 一.常见的存储类型 目前常见的存储类型有 DAS,NAS,SAN 等,最主要的区别是硬盘存储媒介是如何 于处理器连接的,以及处理器使用何种方式来访问磁盘,以及访问磁盘使用 的协议(网络协议、I/O 协议)。 三种存储类型如下 直…...
Qt中如何在qml文件中使用其他的qml文件并创建对象
如果想使用其他的qml文件直接创建对象,必须先这样导入其qml文件并as成别名,才可以创建对象并使用它。 一、导入qml文件,例如: import "CameraConfig.qml" as CameraConfig import "CameraDevelopView.qml" a…...
学习心得04:CUDA
2018年的时候,看过同事使用CUDA。因为工作忙,所以也没请教。 近来买了本入门的CUDA书,学习了一番。有两个心得: 工作拆分。 CUDA是并行计算,也就是大量重复的可拆分的计算。数组最符合这个要求。简单点就是把数组外面…...
别光看原理了!用STM32F407从零撸一个四轴飞控代码(附完整工程)
用STM32F407从零构建四轴飞控代码实战指南 当你在论坛上看到别人分享的无人机飞行视频,是否也曾心动想亲手打造一套自己的飞控系统?市面上大多数教程止步于理论讲解,真正落实到代码层面的少之又少。本文将带你用STM32F407开发板,…...
冒险岛V128单机版服务端魔改指南:从基础搭建到自定义任务/装备修改
冒险岛V128单机版深度定制指南:从零构建个性化游戏世界 在数字娱乐的黄金时代,怀旧游戏焕发新生已成为一种文化现象。作为横版卷轴网游的经典之作,冒险岛凭借其独特的艺术风格和社交属性,至今仍拥有大量忠实玩家。而单机版的出现&…...
8路HD-SDI录播主机CYS-08
在广电录制、教育录播、会议记录等场景中,稳定、高清、易管理的视频录制设备至关重要。春源丽影CYS-08 推出的8路HD-SDI硬盘录像机,凭借全接口支持、双编码技术、智能存储等核心优势,为多路高清录制需求提供了专业级解决方案。8路高清输入&am…...
保研党必看:用本科论文逆袭IEEE二区期刊的5个关键操作(含时间管理秘籍)
保研党必看:用本科论文逆袭IEEE二区期刊的5个关键操作(含时间管理秘籍) 在保研竞争日益激烈的当下,一篇高质量的学术论文往往能成为决定成败的关键。对于大多数本科生来说,科研经历有限、资源匮乏是普遍面临的困境。但…...
【概率统计】从直方图到核密度估计:数据分布可视化的进阶之路
1. 直方图:数据可视化的第一课 第一次接触数据分布可视化时,大多数人都是从直方图开始的。记得我刚学数据分析时,导师扔给我一组销售数据说:"先画个直方图看看分布情况。"当时我盯着matplotlib的hist函数参数一脸茫然—…...
在WSL2 Ubuntu 22.04上搞定RK3568 SDK编译:我遇到的8个坑和填坑方法
在WSL2 Ubuntu 22.04上搞定RK3568 SDK编译:我遇到的8个坑和填坑方法 作为一名长期在Windows环境下工作的嵌入式开发者,第一次尝试在WSL2中编译RK3568 SDK的经历简直像是一场噩梦。从环境配置到最终构建成功,我踩遍了几乎所有可能的坑。这篇文…...
SpringBoot整合MQTT实战:手把手教你实现设备动态连接与主题订阅管理(附完整源码)
SpringBoot整合MQTT实战:动态连接与主题订阅管理的工程化实现 在物联网项目开发中,设备连接管理和消息路由的灵活性往往是系统设计的难点。想象这样一个场景:你的智慧农业系统需要随时接入新部署的土壤传感器,气象站设备可能因网…...
Qwen3.5-4B-Claude-Opus推理模型基础教程:Temperature/Top-P参数详解
Qwen3.5-4B-Claude-Opus推理模型基础教程:Temperature/Top-P参数详解 1. 模型概述 Qwen3.5-4B-Claude-4.6-Opus-Reasoning-Distilled-GGUF是一个基于Qwen3.5-4B的推理蒸馏模型,特别强化了结构化分析、分步骤回答以及代码与逻辑类问题的处理能力。该模型…...
告别盲目搜索!Unity大版本升级时,系统化处理API变更的5个步骤
Unity大版本升级的系统化实践:从API变更管理到团队协作优化 当Unity 2023 LTS发布时,某中型游戏团队在升级过程中发现超过40%的脚本因API变更而报错,导致项目停滞两周。这种场景在技术迭代中并不罕见,但大多数团队仍采用"遇到…...
Workbench与Ls-Dyna中位移与远程位移设置的关键字映射解析
1. 固定支撑的关键字映射与实战配置 在有限元分析中,固定支撑是最基础的边界条件之一。Workbench和Ls-Dyna对固定支撑的实现逻辑完全不同,但最终达到的约束效果是等效的。先看Workbench端的操作:在Mechanical界面右键选择Ls-Dyna环境…...
