当前位置: 首页 > news >正文

kafka复习:(25)kafka stream

一、java代码:

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public class KafkaTest25 {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "k8s-master:9092");props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" "))).groupBy((key, value) -> value).count();counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {public void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
}

二、启动console producer:

bin/kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic streams-plaintext-input

三、启动console consumer:

./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

四、在producer端输入字符串(空格分割),看consumer输出

相关文章:

kafka复习:(25)kafka stream

一、java代码&#xff1a; package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.s…...

接口自动化测试总结

一、什么项目适合做自动化测试&#xff1f; 软件需求变动不频繁 测试脚本的稳定性决定了自动化测试的维护成本。如果软件需求变动过于频繁&#xff0c;测试人员需要根据变动的需求来更新测试用例以及相关的测试脚本&#xff0c;而脚本的维护本身就是一个代码开发的过程&#x…...

【Redis】Lua脚本在Redis中的基本使用及其原子性保证原理

文章目录 背景一、Eval二、EvalSHA三、Redis 对 Lua 脚本的管理3.1 script flush3.2 script exists3.3 script load3.4 script kill 四、Lua在Redis中原子性执行的原理 背景 Lua 本身是一种轻量小巧的脚本语言&#xff0c;在Redis2.6版本开始引入了对Lua脚本的支持。通过在服务…...

汇编--int指令

中断信息可以来自CPU的内部和外部&#xff0c; 当CPU的内部有需要处理的事情发生的时候&#xff0c;将产生需要马上处理的中断信息&#xff0c;引发中断过程。在http://t.csdn.cn/jihpG&#xff0c;我们讲解了中断过程和两种内中断的处理。 这一章中&#xff0c; 我们讲解另一种…...

生成式AI的JavScript技术栈

如果不使用新的软件基础设施技术&#xff0c;就很难理解它们。 至少&#xff0c;a16z 基础设施团队发现了这一点&#xff0c;而且因为我们中的许多人都是以程序员的身份开始职业生涯的&#xff0c;所以我们经常通过实践来学习。 尤其是生成式AI浪潮的情况尤其如此&#xff0c;它…...

从零开始学习软件测试-第39天笔记

接口测试 http消息结构 请求报文 请求行 请求方式 url 协议版本请求头空行请求体响应报文 响应行 协议版本 状态码 状态消息响应头空行响应体 请求参数类型 path参数 写在路径中的 https://xxx.xxx.com/参数值query参数 写在url问号后面&#xff0c;以键值对形式存在 h…...

【多思路附源码】2023高教社杯 国赛数学建模C题思路 - 蔬菜类商品的自动定价与补货决策

赛题介绍 在生鲜商超中&#xff0c;一般蔬菜类商品的保鲜期都比较短&#xff0c;且品相随销售时间的增加而变差&#xff0c; 大部分品种如当日未售出&#xff0c;隔日就无法再售。因此&#xff0c; 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬…...

Vue2+Vue3基础入门到实战项目(六)——课程学习笔记

镇贴&#xff01;&#xff01;&#xff01; day07 vuex的基本认知 使用场景 某个状态 在 很多个组件 来使用 (个人信息) 多个组件 共同维护 一份数据 (购物车) 构建多组件共享的数据环境 1.创建项目 vue create vuex-demo 2.创建三个组件, 目录如下 |-components |--Son1.…...

QT—基于http协议的网络文件下载

1.常用到的类 QNetworkAccessManager类用于协调网络操作&#xff0c;负责发送网络请求&#xff0c;创建网络响应 QNetworkReply类表示网络请求的响应。在QNetworkAccessManager发送一个网络请求后创建一个网络响应。它提供了以下信号&#xff1a; finished()&#xff1a;完成…...

SpringBoot-配置优先级

配置 SpringBoot项目支持的三种格式的配置文件 application.properties&#xff1a;这是最常用的配置文件类型&#xff0c;使用键值对的形式来配置应用程序的属性。可以在该文件中配置应用程序的端口号、数据库连接信息、日志级别等。 application.yml&#xff1a;这是一种更…...

科普初步了解大模型

目录 一、大模型的简单认知 &#xff08;一&#xff09;官方定义 &#xff08;二&#xff09;聚焦到大语言模型 &#xff08;三&#xff09;大模型的应用举例 二、如何得到大模型 &#xff08;一&#xff09;整体的一般步骤 训练自己的模型 使用预训练模型 选择适当的…...

Nginx 和 网关的关系是什么

分析&回答 Nginx也可以实现网关&#xff0c;可以实现对api接口的拦截&#xff0c;负载均衡、反向代理、请求过滤等。网关功能可以进行扩展&#xff0c;比如&#xff1a;安全控制&#xff0c;统一异常处理&#xff0c;XXS,SQL注入等&#xff1b;权限控制&#xff0c;黑白名…...

解决springboot项目中的groupId、package或路径的混淆问题

对于像我一样喜欢跳跃着学习的聪明人来说&#xff0c;肯定要学springboot&#xff0c;什么sevlet、maven、java基础&#xff0c;都太老土了&#xff0c;用不到就不学。所以古代的聪明人有句话叫“书到用时方恨少”&#xff0c;测试开源项目时&#xff0c;编译总是报错&#xff…...

Vmware 网络恢复断网和连接

如果你的 虚拟机无法联网了&#xff0c;比如&#xff1a; vmware 无法将网络更改为桥接状态: 没有未桥接的主机网络适配器 等各种稀奇古怪的问题&#xff1b; 按照下面操作 还远默认设置 包你解决各种问题&#xff01;...

学生来看!如何白嫖内网穿透?点进来!

文章目录 前言本教程解决的问题是&#xff1a;按照本教程方法操作后&#xff0c;达到的效果是前排提醒&#xff1a; 1 搭建虚拟机1.1 下载文件vmvare虚拟机安装包1.2 安装VMware虚拟机&#xff1a;1.3 解压虚拟机文件1.4 虚拟机初始化1.5 没有搜索到解决方式&#xff1a;1.6 虚…...

C++中的stack和queue

文章目录 1. stack的介绍和使用1.1 stack的介绍1.2 stack的使用 2. queue的介绍和使用2.1 queue的介绍2.2 queue的使用 3 priority_queue的介绍和使用3.1 priority_queue的介绍3.2 priority_queue的使用 4. 容器适配器4.1 什么是适配器4.2 STL标准库中stack和queue的底层结构4.…...

Ubuntu-22.04通过RDP协议连接远程桌面

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、RDP是什么&#xff1f;二、配置1.打开远程桌面功能2.验证服务3.防火墙配置4.测试效果 总结 前言 由于一些特殊需要&#xff0c;我需要通过远程桌面连接到U…...

20230908java面经整理

1.cpp和java的区别 cpp可以多重继承&#xff0c;对表java中的实现多个接口 cpp支持运算符重载、goto、默认函数参数 cpp自动强转&#xff0c;导致不安全&#xff1b;java必须显式强转 java提供垃圾回收机制&#xff0c;自动管理内存分配&#xff0c;当gc要释放无用对象时调用f…...

uniapp 开发App 网络异常如何处理

我对该问题思考的不是很清楚&#xff0c;目前只想到了基本的解决方案 第一、客户端的网络异常&#xff08;断网&#xff09; 1. 断网情况 一定要弹出信息提示&#xff0c;目前最好的解决方式就是在uni.request封装的统一方法中写提示 //1. 封装的网络请求 async function se…...

docker安装常用软件

Linux系统安装docker请参考&#xff1a;https://mp.csdn.net/mp_blog/creation/editor/128176825 docker安装mysql 1、拉镜像&#xff1a;docker pull mysql:8.0.26 2、创建数据目录&#xff1a; mkdir -p /mnt/data/mysql/data mkdir -p /mnt/data/mysql/logs mkdir -p /mn…...

NVIDIA vGPU许可服务器HA配置避坑指南:从环境准备到故障切换测试

NVIDIA vGPU许可服务器高可用配置实战&#xff1a;从零搭建到容灾验证 在虚拟化与AI计算融合的今天&#xff0c;NVIDIA vGPU技术已成为图形工作站、云游戏和机器学习平台的核心支撑。但许多团队在享受显卡虚拟化红利时&#xff0c;往往忽略了许可服务的高可用保障——当单点故障…...

CentOS7系统维护终止后YUM源失效的解决方案

1. CentOS7维护终止带来的YUM源危机 去年夏天我给客户部署的CentOS7服务器突然无法安装新软件&#xff0c;屏幕上不断弹出"无法解析主机"的错误。这才意识到官方已经停止维护&#xff0c;默认的YUM源就像突然关门的超市&#xff0c;所有货架都空了。对于仍在使用Cent…...

UMA模型深度解析:机器学习加速的科学计算革命与高通量筛选架构揭秘

UMA模型深度解析&#xff1a;机器学习加速的科学计算革命与高通量筛选架构揭秘 【免费下载链接】ocp Open Catalyst Projects library of machine learning methods for catalysis 项目地址: https://gitcode.com/GitHub_Trending/oc/ocp 在计算材料科学与催化研究领域…...

手把手教你搭建轻量级Gitea代码托管平台:Windows本地部署实战

1. 为什么选择Gitea作为本地代码托管平台 作为一个长期在Windows环境下开发的程序员&#xff0c;我深知一个轻量级代码托管平台的重要性。以前我也用过Gitblit这类工具&#xff0c;但随着项目复杂度提升&#xff0c;越来越需要一个更现代的解决方案。Gitea就像是为个人开发者量…...

PowerPaint-V1 Gradio 新手入门指南:3步搞定图片修复,小白也能变大神

PowerPaint-V1 Gradio 新手入门指南&#xff1a;3步搞定图片修复&#xff0c;小白也能变大神 1. 为什么选择PowerPaint-V1&#xff1f; 如果你经常需要处理图片中的瑕疵、水印或者想替换某些元素&#xff0c;PowerPaint-V1绝对是你的得力助手。这个由字节跳动与香港大学联合研…...

深入XFS文件系统:从一次CentOS 7的Internal error报错,聊聊xfs_repair背后的原理与避坑指南

深入XFS文件系统&#xff1a;从Internal error报错到修复原理与实战指南 当你在一台运行CentOS 7的生产服务器上看到"XFS_WANT_CORRUPTED_GOTO"这个鲜红的报错信息时&#xff0c;作为运维工程师的肾上腺素会立刻飙升。这不是一个普通的I/O错误&#xff0c;而是XFS文件…...

OpenVoice语音合成技术全解析:从痛点突破到多场景落地实践

OpenVoice语音合成技术全解析&#xff1a;从痛点突破到多场景落地实践 【免费下载链接】OpenVoice 项目是MyShell AI开源的即时语音克隆技术OpenVoice&#xff0c;旨在提供一种能够快速从少量语音样本中准确复制人类声音特征&#xff0c;并实现多种语言及语音风格转换的解决方案…...

DEFOM-Stereo vs RAFT-Stereo:双目匹配领域的新旧王者对比实测(附KITTI数据集结果)

DEFOM-Stereo与RAFT-Stereo&#xff1a;双目视觉技术的实战性能解析 在计算机视觉领域&#xff0c;双目立体匹配技术一直是实现三维场景重建和环境感知的核心方法之一。近年来&#xff0c;随着深度学习技术的快速发展&#xff0c;RAFT-Stereo等基于神经网络的双目匹配算法已经展…...

智能工作流引擎:多智能体系统任务编排的高效解决方案

智能工作流引擎&#xff1a;多智能体系统任务编排的高效解决方案 【免费下载链接】agno High-performance runtime for multi-agent systems. Build, run and manage secure multi-agent systems in your cloud. 项目地址: https://gitcode.com/GitHub_Trending/ag/agno …...

Volcano调度算法全解析:从DRF公平分配到Binpack节点装箱(含权重调优技巧)

Volcano调度算法深度实战&#xff1a;从DRF公平分配到Binpack节点装箱 在Kubernetes生态中&#xff0c;资源调度一直是决定集群效率和稳定性的核心环节。当你的业务从简单的Web服务扩展到AI训练、大数据处理等复杂场景时&#xff0c;原生Kubernetes调度器的局限性就会凸显——它…...