kafka复习:(25)kafka stream
一、java代码:
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public class KafkaTest25 {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "k8s-master:9092");props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" "))).groupBy((key, value) -> value).count();counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {public void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
}
二、启动console producer:
bin/kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic streams-plaintext-input
三、启动console consumer:
./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
四、在producer端输入字符串(空格分割),看consumer输出
相关文章:
kafka复习:(25)kafka stream
一、java代码: package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.s…...

接口自动化测试总结
一、什么项目适合做自动化测试? 软件需求变动不频繁 测试脚本的稳定性决定了自动化测试的维护成本。如果软件需求变动过于频繁,测试人员需要根据变动的需求来更新测试用例以及相关的测试脚本,而脚本的维护本身就是一个代码开发的过程&#x…...

【Redis】Lua脚本在Redis中的基本使用及其原子性保证原理
文章目录 背景一、Eval二、EvalSHA三、Redis 对 Lua 脚本的管理3.1 script flush3.2 script exists3.3 script load3.4 script kill 四、Lua在Redis中原子性执行的原理 背景 Lua 本身是一种轻量小巧的脚本语言,在Redis2.6版本开始引入了对Lua脚本的支持。通过在服务…...

汇编--int指令
中断信息可以来自CPU的内部和外部, 当CPU的内部有需要处理的事情发生的时候,将产生需要马上处理的中断信息,引发中断过程。在http://t.csdn.cn/jihpG,我们讲解了中断过程和两种内中断的处理。 这一章中, 我们讲解另一种…...

生成式AI的JavScript技术栈
如果不使用新的软件基础设施技术,就很难理解它们。 至少,a16z 基础设施团队发现了这一点,而且因为我们中的许多人都是以程序员的身份开始职业生涯的,所以我们经常通过实践来学习。 尤其是生成式AI浪潮的情况尤其如此,它…...

从零开始学习软件测试-第39天笔记
接口测试 http消息结构 请求报文 请求行 请求方式 url 协议版本请求头空行请求体响应报文 响应行 协议版本 状态码 状态消息响应头空行响应体 请求参数类型 path参数 写在路径中的 https://xxx.xxx.com/参数值query参数 写在url问号后面,以键值对形式存在 h…...

【多思路附源码】2023高教社杯 国赛数学建模C题思路 - 蔬菜类商品的自动定价与补货决策
赛题介绍 在生鲜商超中,一般蔬菜类商品的保鲜期都比较短,且品相随销售时间的增加而变差, 大部分品种如当日未售出,隔日就无法再售。因此, 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬…...

Vue2+Vue3基础入门到实战项目(六)——课程学习笔记
镇贴!!! day07 vuex的基本认知 使用场景 某个状态 在 很多个组件 来使用 (个人信息) 多个组件 共同维护 一份数据 (购物车) 构建多组件共享的数据环境 1.创建项目 vue create vuex-demo 2.创建三个组件, 目录如下 |-components |--Son1.…...

QT—基于http协议的网络文件下载
1.常用到的类 QNetworkAccessManager类用于协调网络操作,负责发送网络请求,创建网络响应 QNetworkReply类表示网络请求的响应。在QNetworkAccessManager发送一个网络请求后创建一个网络响应。它提供了以下信号: finished():完成…...
SpringBoot-配置优先级
配置 SpringBoot项目支持的三种格式的配置文件 application.properties:这是最常用的配置文件类型,使用键值对的形式来配置应用程序的属性。可以在该文件中配置应用程序的端口号、数据库连接信息、日志级别等。 application.yml:这是一种更…...

科普初步了解大模型
目录 一、大模型的简单认知 (一)官方定义 (二)聚焦到大语言模型 (三)大模型的应用举例 二、如何得到大模型 (一)整体的一般步骤 训练自己的模型 使用预训练模型 选择适当的…...
Nginx 和 网关的关系是什么
分析&回答 Nginx也可以实现网关,可以实现对api接口的拦截,负载均衡、反向代理、请求过滤等。网关功能可以进行扩展,比如:安全控制,统一异常处理,XXS,SQL注入等;权限控制,黑白名…...

解决springboot项目中的groupId、package或路径的混淆问题
对于像我一样喜欢跳跃着学习的聪明人来说,肯定要学springboot,什么sevlet、maven、java基础,都太老土了,用不到就不学。所以古代的聪明人有句话叫“书到用时方恨少”,测试开源项目时,编译总是报错ÿ…...

Vmware 网络恢复断网和连接
如果你的 虚拟机无法联网了,比如: vmware 无法将网络更改为桥接状态: 没有未桥接的主机网络适配器 等各种稀奇古怪的问题; 按照下面操作 还远默认设置 包你解决各种问题!...

学生来看!如何白嫖内网穿透?点进来!
文章目录 前言本教程解决的问题是:按照本教程方法操作后,达到的效果是前排提醒: 1 搭建虚拟机1.1 下载文件vmvare虚拟机安装包1.2 安装VMware虚拟机:1.3 解压虚拟机文件1.4 虚拟机初始化1.5 没有搜索到解决方式:1.6 虚…...

C++中的stack和queue
文章目录 1. stack的介绍和使用1.1 stack的介绍1.2 stack的使用 2. queue的介绍和使用2.1 queue的介绍2.2 queue的使用 3 priority_queue的介绍和使用3.1 priority_queue的介绍3.2 priority_queue的使用 4. 容器适配器4.1 什么是适配器4.2 STL标准库中stack和queue的底层结构4.…...

Ubuntu-22.04通过RDP协议连接远程桌面
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、RDP是什么?二、配置1.打开远程桌面功能2.验证服务3.防火墙配置4.测试效果 总结 前言 由于一些特殊需要,我需要通过远程桌面连接到U…...
20230908java面经整理
1.cpp和java的区别 cpp可以多重继承,对表java中的实现多个接口 cpp支持运算符重载、goto、默认函数参数 cpp自动强转,导致不安全;java必须显式强转 java提供垃圾回收机制,自动管理内存分配,当gc要释放无用对象时调用f…...
uniapp 开发App 网络异常如何处理
我对该问题思考的不是很清楚,目前只想到了基本的解决方案 第一、客户端的网络异常(断网) 1. 断网情况 一定要弹出信息提示,目前最好的解决方式就是在uni.request封装的统一方法中写提示 //1. 封装的网络请求 async function se…...

docker安装常用软件
Linux系统安装docker请参考:https://mp.csdn.net/mp_blog/creation/editor/128176825 docker安装mysql 1、拉镜像:docker pull mysql:8.0.26 2、创建数据目录: mkdir -p /mnt/data/mysql/data mkdir -p /mnt/data/mysql/logs mkdir -p /mn…...
基于算法竞赛的c++编程(28)结构体的进阶应用
结构体的嵌套与复杂数据组织 在C中,结构体可以嵌套使用,形成更复杂的数据结构。例如,可以通过嵌套结构体描述多层级数据关系: struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...

Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
基于服务器使用 apt 安装、配置 Nginx
🧾 一、查看可安装的 Nginx 版本 首先,你可以运行以下命令查看可用版本: apt-cache madison nginx-core输出示例: nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...

srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...

NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...

如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...
服务器--宝塔命令
一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行! sudo su - 1. CentOS 系统: yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...