当前位置: 首页 > news >正文

kafka复习:(25)kafka stream

一、java代码:

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public class KafkaTest25 {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "k8s-master:9092");props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" "))).groupBy((key, value) -> value).count();counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {public void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
}

二、启动console producer:

bin/kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic streams-plaintext-input

三、启动console consumer:

./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

四、在producer端输入字符串(空格分割),看consumer输出

相关文章:

kafka复习:(25)kafka stream

一、java代码&#xff1a; package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.s…...

接口自动化测试总结

一、什么项目适合做自动化测试&#xff1f; 软件需求变动不频繁 测试脚本的稳定性决定了自动化测试的维护成本。如果软件需求变动过于频繁&#xff0c;测试人员需要根据变动的需求来更新测试用例以及相关的测试脚本&#xff0c;而脚本的维护本身就是一个代码开发的过程&#x…...

【Redis】Lua脚本在Redis中的基本使用及其原子性保证原理

文章目录 背景一、Eval二、EvalSHA三、Redis 对 Lua 脚本的管理3.1 script flush3.2 script exists3.3 script load3.4 script kill 四、Lua在Redis中原子性执行的原理 背景 Lua 本身是一种轻量小巧的脚本语言&#xff0c;在Redis2.6版本开始引入了对Lua脚本的支持。通过在服务…...

汇编--int指令

中断信息可以来自CPU的内部和外部&#xff0c; 当CPU的内部有需要处理的事情发生的时候&#xff0c;将产生需要马上处理的中断信息&#xff0c;引发中断过程。在http://t.csdn.cn/jihpG&#xff0c;我们讲解了中断过程和两种内中断的处理。 这一章中&#xff0c; 我们讲解另一种…...

生成式AI的JavScript技术栈

如果不使用新的软件基础设施技术&#xff0c;就很难理解它们。 至少&#xff0c;a16z 基础设施团队发现了这一点&#xff0c;而且因为我们中的许多人都是以程序员的身份开始职业生涯的&#xff0c;所以我们经常通过实践来学习。 尤其是生成式AI浪潮的情况尤其如此&#xff0c;它…...

从零开始学习软件测试-第39天笔记

接口测试 http消息结构 请求报文 请求行 请求方式 url 协议版本请求头空行请求体响应报文 响应行 协议版本 状态码 状态消息响应头空行响应体 请求参数类型 path参数 写在路径中的 https://xxx.xxx.com/参数值query参数 写在url问号后面&#xff0c;以键值对形式存在 h…...

【多思路附源码】2023高教社杯 国赛数学建模C题思路 - 蔬菜类商品的自动定价与补货决策

赛题介绍 在生鲜商超中&#xff0c;一般蔬菜类商品的保鲜期都比较短&#xff0c;且品相随销售时间的增加而变差&#xff0c; 大部分品种如当日未售出&#xff0c;隔日就无法再售。因此&#xff0c; 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬…...

Vue2+Vue3基础入门到实战项目(六)——课程学习笔记

镇贴&#xff01;&#xff01;&#xff01; day07 vuex的基本认知 使用场景 某个状态 在 很多个组件 来使用 (个人信息) 多个组件 共同维护 一份数据 (购物车) 构建多组件共享的数据环境 1.创建项目 vue create vuex-demo 2.创建三个组件, 目录如下 |-components |--Son1.…...

QT—基于http协议的网络文件下载

1.常用到的类 QNetworkAccessManager类用于协调网络操作&#xff0c;负责发送网络请求&#xff0c;创建网络响应 QNetworkReply类表示网络请求的响应。在QNetworkAccessManager发送一个网络请求后创建一个网络响应。它提供了以下信号&#xff1a; finished()&#xff1a;完成…...

SpringBoot-配置优先级

配置 SpringBoot项目支持的三种格式的配置文件 application.properties&#xff1a;这是最常用的配置文件类型&#xff0c;使用键值对的形式来配置应用程序的属性。可以在该文件中配置应用程序的端口号、数据库连接信息、日志级别等。 application.yml&#xff1a;这是一种更…...

科普初步了解大模型

目录 一、大模型的简单认知 &#xff08;一&#xff09;官方定义 &#xff08;二&#xff09;聚焦到大语言模型 &#xff08;三&#xff09;大模型的应用举例 二、如何得到大模型 &#xff08;一&#xff09;整体的一般步骤 训练自己的模型 使用预训练模型 选择适当的…...

Nginx 和 网关的关系是什么

分析&回答 Nginx也可以实现网关&#xff0c;可以实现对api接口的拦截&#xff0c;负载均衡、反向代理、请求过滤等。网关功能可以进行扩展&#xff0c;比如&#xff1a;安全控制&#xff0c;统一异常处理&#xff0c;XXS,SQL注入等&#xff1b;权限控制&#xff0c;黑白名…...

解决springboot项目中的groupId、package或路径的混淆问题

对于像我一样喜欢跳跃着学习的聪明人来说&#xff0c;肯定要学springboot&#xff0c;什么sevlet、maven、java基础&#xff0c;都太老土了&#xff0c;用不到就不学。所以古代的聪明人有句话叫“书到用时方恨少”&#xff0c;测试开源项目时&#xff0c;编译总是报错&#xff…...

Vmware 网络恢复断网和连接

如果你的 虚拟机无法联网了&#xff0c;比如&#xff1a; vmware 无法将网络更改为桥接状态: 没有未桥接的主机网络适配器 等各种稀奇古怪的问题&#xff1b; 按照下面操作 还远默认设置 包你解决各种问题&#xff01;...

学生来看!如何白嫖内网穿透?点进来!

文章目录 前言本教程解决的问题是&#xff1a;按照本教程方法操作后&#xff0c;达到的效果是前排提醒&#xff1a; 1 搭建虚拟机1.1 下载文件vmvare虚拟机安装包1.2 安装VMware虚拟机&#xff1a;1.3 解压虚拟机文件1.4 虚拟机初始化1.5 没有搜索到解决方式&#xff1a;1.6 虚…...

C++中的stack和queue

文章目录 1. stack的介绍和使用1.1 stack的介绍1.2 stack的使用 2. queue的介绍和使用2.1 queue的介绍2.2 queue的使用 3 priority_queue的介绍和使用3.1 priority_queue的介绍3.2 priority_queue的使用 4. 容器适配器4.1 什么是适配器4.2 STL标准库中stack和queue的底层结构4.…...

Ubuntu-22.04通过RDP协议连接远程桌面

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、RDP是什么&#xff1f;二、配置1.打开远程桌面功能2.验证服务3.防火墙配置4.测试效果 总结 前言 由于一些特殊需要&#xff0c;我需要通过远程桌面连接到U…...

20230908java面经整理

1.cpp和java的区别 cpp可以多重继承&#xff0c;对表java中的实现多个接口 cpp支持运算符重载、goto、默认函数参数 cpp自动强转&#xff0c;导致不安全&#xff1b;java必须显式强转 java提供垃圾回收机制&#xff0c;自动管理内存分配&#xff0c;当gc要释放无用对象时调用f…...

uniapp 开发App 网络异常如何处理

我对该问题思考的不是很清楚&#xff0c;目前只想到了基本的解决方案 第一、客户端的网络异常&#xff08;断网&#xff09; 1. 断网情况 一定要弹出信息提示&#xff0c;目前最好的解决方式就是在uni.request封装的统一方法中写提示 //1. 封装的网络请求 async function se…...

docker安装常用软件

Linux系统安装docker请参考&#xff1a;https://mp.csdn.net/mp_blog/creation/editor/128176825 docker安装mysql 1、拉镜像&#xff1a;docker pull mysql:8.0.26 2、创建数据目录&#xff1a; mkdir -p /mnt/data/mysql/data mkdir -p /mnt/data/mysql/logs mkdir -p /mn…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力

引言&#xff1a; 在人工智能快速发展的浪潮中&#xff0c;快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型&#xff08;LLM&#xff09;。该模型代表着该领域的重大突破&#xff0c;通过独特方式融合思考与非思考…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?

现有的 Redis 分布式锁库&#xff08;如 Redisson&#xff09;相比于开发者自己基于 Redis 命令&#xff08;如 SETNX, EXPIRE, DEL&#xff09;手动实现分布式锁&#xff0c;提供了巨大的便利性和健壮性。主要体现在以下几个方面&#xff1a; 原子性保证 (Atomicity)&#xff…...

脑机新手指南(七):OpenBCI_GUI:从环境搭建到数据可视化(上)

一、OpenBCI_GUI 项目概述 &#xff08;一&#xff09;项目背景与目标 OpenBCI 是一个开源的脑电信号采集硬件平台&#xff0c;其配套的 OpenBCI_GUI 则是专为该硬件设计的图形化界面工具。对于研究人员、开发者和学生而言&#xff0c;首次接触 OpenBCI 设备时&#xff0c;往…...

​​企业大模型服务合规指南:深度解析备案与登记制度​​

伴随AI技术的爆炸式发展&#xff0c;尤其是大模型&#xff08;LLM&#xff09;在各行各业的深度应用和整合&#xff0c;企业利用AI技术提升效率、创新服务的步伐不断加快。无论是像DeepSeek这样的前沿技术提供者&#xff0c;还是积极拥抱AI转型的传统企业&#xff0c;在面向公众…...

鸿蒙HarmonyOS 5军旗小游戏实现指南

1. 项目概述 本军旗小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;采用DevEco Studio实现&#xff0c;包含完整的游戏逻辑和UI界面。 2. 项目结构 /src/main/java/com/example/militarychess/├── MainAbilitySlice.java // 主界面├── GameView.java // 游戏核…...

基于单片机的宠物屋智能系统设计与实现(论文+源码)

本设计基于单片机的宠物屋智能系统核心是实现对宠物生活环境及状态的智能管理。系统以单片机为中枢&#xff0c;连接红外测温传感器&#xff0c;可实时精准捕捉宠物体温变化&#xff0c;以便及时发现健康异常&#xff1b;水位检测传感器时刻监测饮用水余量&#xff0c;防止宠物…...