当前位置: 首页 > news >正文

kafka复习:(25)kafka stream

一、java代码:

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public class KafkaTest25 {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "k8s-master:9092");props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" "))).groupBy((key, value) -> value).count();counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {public void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
}

二、启动console producer:

bin/kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic streams-plaintext-input

三、启动console consumer:

./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

四、在producer端输入字符串(空格分割),看consumer输出

相关文章:

kafka复习:(25)kafka stream

一、java代码&#xff1a; package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.s…...

接口自动化测试总结

一、什么项目适合做自动化测试&#xff1f; 软件需求变动不频繁 测试脚本的稳定性决定了自动化测试的维护成本。如果软件需求变动过于频繁&#xff0c;测试人员需要根据变动的需求来更新测试用例以及相关的测试脚本&#xff0c;而脚本的维护本身就是一个代码开发的过程&#x…...

【Redis】Lua脚本在Redis中的基本使用及其原子性保证原理

文章目录 背景一、Eval二、EvalSHA三、Redis 对 Lua 脚本的管理3.1 script flush3.2 script exists3.3 script load3.4 script kill 四、Lua在Redis中原子性执行的原理 背景 Lua 本身是一种轻量小巧的脚本语言&#xff0c;在Redis2.6版本开始引入了对Lua脚本的支持。通过在服务…...

汇编--int指令

中断信息可以来自CPU的内部和外部&#xff0c; 当CPU的内部有需要处理的事情发生的时候&#xff0c;将产生需要马上处理的中断信息&#xff0c;引发中断过程。在http://t.csdn.cn/jihpG&#xff0c;我们讲解了中断过程和两种内中断的处理。 这一章中&#xff0c; 我们讲解另一种…...

生成式AI的JavScript技术栈

如果不使用新的软件基础设施技术&#xff0c;就很难理解它们。 至少&#xff0c;a16z 基础设施团队发现了这一点&#xff0c;而且因为我们中的许多人都是以程序员的身份开始职业生涯的&#xff0c;所以我们经常通过实践来学习。 尤其是生成式AI浪潮的情况尤其如此&#xff0c;它…...

从零开始学习软件测试-第39天笔记

接口测试 http消息结构 请求报文 请求行 请求方式 url 协议版本请求头空行请求体响应报文 响应行 协议版本 状态码 状态消息响应头空行响应体 请求参数类型 path参数 写在路径中的 https://xxx.xxx.com/参数值query参数 写在url问号后面&#xff0c;以键值对形式存在 h…...

【多思路附源码】2023高教社杯 国赛数学建模C题思路 - 蔬菜类商品的自动定价与补货决策

赛题介绍 在生鲜商超中&#xff0c;一般蔬菜类商品的保鲜期都比较短&#xff0c;且品相随销售时间的增加而变差&#xff0c; 大部分品种如当日未售出&#xff0c;隔日就无法再售。因此&#xff0c; 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬…...

Vue2+Vue3基础入门到实战项目(六)——课程学习笔记

镇贴&#xff01;&#xff01;&#xff01; day07 vuex的基本认知 使用场景 某个状态 在 很多个组件 来使用 (个人信息) 多个组件 共同维护 一份数据 (购物车) 构建多组件共享的数据环境 1.创建项目 vue create vuex-demo 2.创建三个组件, 目录如下 |-components |--Son1.…...

QT—基于http协议的网络文件下载

1.常用到的类 QNetworkAccessManager类用于协调网络操作&#xff0c;负责发送网络请求&#xff0c;创建网络响应 QNetworkReply类表示网络请求的响应。在QNetworkAccessManager发送一个网络请求后创建一个网络响应。它提供了以下信号&#xff1a; finished()&#xff1a;完成…...

SpringBoot-配置优先级

配置 SpringBoot项目支持的三种格式的配置文件 application.properties&#xff1a;这是最常用的配置文件类型&#xff0c;使用键值对的形式来配置应用程序的属性。可以在该文件中配置应用程序的端口号、数据库连接信息、日志级别等。 application.yml&#xff1a;这是一种更…...

科普初步了解大模型

目录 一、大模型的简单认知 &#xff08;一&#xff09;官方定义 &#xff08;二&#xff09;聚焦到大语言模型 &#xff08;三&#xff09;大模型的应用举例 二、如何得到大模型 &#xff08;一&#xff09;整体的一般步骤 训练自己的模型 使用预训练模型 选择适当的…...

Nginx 和 网关的关系是什么

分析&回答 Nginx也可以实现网关&#xff0c;可以实现对api接口的拦截&#xff0c;负载均衡、反向代理、请求过滤等。网关功能可以进行扩展&#xff0c;比如&#xff1a;安全控制&#xff0c;统一异常处理&#xff0c;XXS,SQL注入等&#xff1b;权限控制&#xff0c;黑白名…...

解决springboot项目中的groupId、package或路径的混淆问题

对于像我一样喜欢跳跃着学习的聪明人来说&#xff0c;肯定要学springboot&#xff0c;什么sevlet、maven、java基础&#xff0c;都太老土了&#xff0c;用不到就不学。所以古代的聪明人有句话叫“书到用时方恨少”&#xff0c;测试开源项目时&#xff0c;编译总是报错&#xff…...

Vmware 网络恢复断网和连接

如果你的 虚拟机无法联网了&#xff0c;比如&#xff1a; vmware 无法将网络更改为桥接状态: 没有未桥接的主机网络适配器 等各种稀奇古怪的问题&#xff1b; 按照下面操作 还远默认设置 包你解决各种问题&#xff01;...

学生来看!如何白嫖内网穿透?点进来!

文章目录 前言本教程解决的问题是&#xff1a;按照本教程方法操作后&#xff0c;达到的效果是前排提醒&#xff1a; 1 搭建虚拟机1.1 下载文件vmvare虚拟机安装包1.2 安装VMware虚拟机&#xff1a;1.3 解压虚拟机文件1.4 虚拟机初始化1.5 没有搜索到解决方式&#xff1a;1.6 虚…...

C++中的stack和queue

文章目录 1. stack的介绍和使用1.1 stack的介绍1.2 stack的使用 2. queue的介绍和使用2.1 queue的介绍2.2 queue的使用 3 priority_queue的介绍和使用3.1 priority_queue的介绍3.2 priority_queue的使用 4. 容器适配器4.1 什么是适配器4.2 STL标准库中stack和queue的底层结构4.…...

Ubuntu-22.04通过RDP协议连接远程桌面

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、RDP是什么&#xff1f;二、配置1.打开远程桌面功能2.验证服务3.防火墙配置4.测试效果 总结 前言 由于一些特殊需要&#xff0c;我需要通过远程桌面连接到U…...

20230908java面经整理

1.cpp和java的区别 cpp可以多重继承&#xff0c;对表java中的实现多个接口 cpp支持运算符重载、goto、默认函数参数 cpp自动强转&#xff0c;导致不安全&#xff1b;java必须显式强转 java提供垃圾回收机制&#xff0c;自动管理内存分配&#xff0c;当gc要释放无用对象时调用f…...

uniapp 开发App 网络异常如何处理

我对该问题思考的不是很清楚&#xff0c;目前只想到了基本的解决方案 第一、客户端的网络异常&#xff08;断网&#xff09; 1. 断网情况 一定要弹出信息提示&#xff0c;目前最好的解决方式就是在uni.request封装的统一方法中写提示 //1. 封装的网络请求 async function se…...

docker安装常用软件

Linux系统安装docker请参考&#xff1a;https://mp.csdn.net/mp_blog/creation/editor/128176825 docker安装mysql 1、拉镜像&#xff1a;docker pull mysql:8.0.26 2、创建数据目录&#xff1a; mkdir -p /mnt/data/mysql/data mkdir -p /mnt/data/mysql/logs mkdir -p /mn…...

微信小程序之bind和catch

这两个呢&#xff0c;都是绑定事件用的&#xff0c;具体使用有些小区别。 官方文档&#xff1a; 事件冒泡处理不同 bind&#xff1a;绑定的事件会向上冒泡&#xff0c;即触发当前组件的事件后&#xff0c;还会继续触发父组件的相同事件。例如&#xff0c;有一个子视图绑定了b…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版​分享

平时用 iPhone 的时候&#xff0c;难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵&#xff0c;或者买了二手 iPhone 却被原来的 iCloud 账号锁住&#xff0c;这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

【配置 YOLOX 用于按目录分类的图片数据集】

现在的图标点选越来越多&#xff0c;如何一步解决&#xff0c;采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集&#xff08;每个目录代表一个类别&#xff0c;目录下是该类别的所有图片&#xff09;&#xff0c;你需要进行以下配置步骤&#x…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...

Angular微前端架构:Module Federation + ngx-build-plus (Webpack)

以下是一个完整的 Angular 微前端示例&#xff0c;其中使用的是 Module Federation 和 npx-build-plus 实现了主应用&#xff08;Shell&#xff09;与子应用&#xff08;Remote&#xff09;的集成。 &#x1f6e0;️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...

服务器--宝塔命令

一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行&#xff01; sudo su - 1. CentOS 系统&#xff1a; yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...