当前位置: 首页 > news >正文

KafkaStream:基本使用

简介:

        kafkaStream:提供了对存储在kafka中的数据进行流式处理和分析的功能

特点:

        KafkasSream提供了一个非常简单轻量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署

入门案例:

  1、新建工程kafka-demo

           引入kafkaStream依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--kafkaStream--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency></dependencies>

   2、新建流式处理类

          代码如下

package com.heima.kafkademo.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*
* 流式处理
* */
public class KafkaStreamQuickStart {public static void main(String[] args) {/*创建kafka配置中心并配置参数*/Properties prop = new Properties();//连接地址prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//key序列化prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//value序列化prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());//创建id名称prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream构造器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}//流式计算方法private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kafka对象,同时指定从哪个topic获取消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");//处理消息的valuestream.flatMapValues(new ValueMapper<String, Iterable<?>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})      //按照value进行聚合.groupBy((key,value)->value)//时间窗口,每隔10秒更新一次.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}

3、启动消费者类和流式处理类监听消息

        使用生产者类发送消息

       消费者和生产者类代码参考Kafka:安装和配置_Success___的博客-CSDN博客

4、测试

        成功接收到消息

 

 

相关文章:

KafkaStream:基本使用

简介&#xff1a; kafkaStream&#xff1a;提供了对存储在kafka中的数据进行流式处理和分析的功能 特点&#xff1a; KafkasSream提供了一个非常简单轻量的Library&#xff0c;它可以非常方便的嵌入到java程序中&#xff0c;也可以任何方式打包部署 入门案例&#xff1a; 1、…...

【数据结构】二叉树

完全二叉树 是指所有结点度数小于等于2的树 所以这种情况也是&#xff1a; 几条性质 一个具有n个结点的完全二叉树的深度为&#xff1a; log ⁡ 2 ( n 1 ) 的结果向上取整。 \\\log_{2}(n1) \ \ 的结果向上取整。 log2​(n1) 的结果向上取整。设度为0的结点个数是n0&#…...

基于灰狼优化(GWO)、帝国竞争算法(ICA)和粒子群优化(PSO)对梯度下降法训练的神经网络的权值进行了改进(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

jenkins自动化构建保姆级教程(持续更新中)

1.安装 1.1版本说明 访问jenkins官网 https://www.jenkins.io/&#xff0c;进入到首页 点击【Download】按钮进入到jenkins下载界面 左侧显示的是最新的长期支持版本&#xff0c;右侧显示的是最新的可测试版本&#xff08;可能不稳定&#xff09;&#xff0c;建议使用最新的…...

HTTPS 的加密流程

目录 一、HTTPS是什么&#xff1f; 二、为什么要加密 三、"加密" 是什么 四、HTTPS 的工作过程 1.对称加密 2.非对称加密 3.中间人攻击 4.证书 总结 一、HTTPS是什么&#xff1f; HTTPS (Hyper Text Transfer Protocol Secure) 是基于 HTTP 协议之上的安全协议&…...

Jmeter 参数化的几种方法

目录 配置元件-用户自定义变量 前置处理器-用户参数 配置元件-CSV Data Set Config Tools-函数助手 配置元件-用户自定义变量 可在测试计划、线程组、HTTP请求下创建用户定义的变量 全局变量&#xff0c;可以跨线程组调用 jmeter执行的时候&#xff0c;只获取一次&#xff0…...

剑指Offer45.把数组排成最小的数 C++

1、题目描述 输入一个非负整数数组&#xff0c;把数组里所有数字拼接起来排成一个数&#xff0c;打印能拼接出的所有数字中最小的一个。 示例 1: 输入: [10,2] 输出: “102” 示例 2: 输入: [3,30,34,5,9] 输出: “3033459” 2、VS2019上运行 先转换成字符串再组合起来 #in…...

【java毕业设计】基于SSM+MySql的人才公寓管理系统设计与实现(程序源码)--人才公寓管理系统

基于SSMMySql的人才公寓管理系统设计与实现&#xff08;程序源码毕业论文&#xff09; 大家好&#xff0c;今天给大家介绍基于SSMMySql的人才公寓管理系统设计与实现&#xff0c;本论文只截取部分文章重点&#xff0c;文章末尾附有本毕业设计完整源码及论文的获取方式。更多毕业…...

golang操作excel的高性能库——excelize/v2

目录 介绍文档与源码安装快速开始创建 Excel 文档读取 Excel 文档打开数据流流式写入 [相关 Excel 开源类库性能对比](https://xuri.me/excelize/zh-hans/performance.html) 介绍 Excelize是一个纯Go编写的库&#xff0c;提供了一组功能&#xff0c;允许你向XLAM / XLSM / XLS…...

学习51单片机怎么开始?

学习的过程不总是先打好基础&#xff0c;然后再盖上层建筑&#xff0c;尤其是实践性的、工程性很强的东西。如果你一定要先全面打好基础&#xff0c;再学习单片机&#xff0c;我觉得你一定学不好&#xff0c;因为你的基础永远打不好&#xff0c;因为基础太庞大了&#xff0c;基…...

[.NET学习笔记] -.NET6.0项目动态加载netstandard2.0报错但项目添加引用则正常的问题

问题描述 .NET6.0的项目使用netstandard2.0版本的动态链接库。若是在项目中直接添加引用&#xff0c;应用netstandard2.0项目或者netstandard2.0编译后的dll均能正常工作。但如果通过xcopy等方式&#xff0c;额外将对应的dll复制到执行目录&#xff0c;会执行失败。调用方式一…...

山景DSP芯片可烧录AP8224C2音频处理器方案

AP8224C2高性能32位音频应用处理器AP82系列音频处理器是面向音频应用领域设计的新一代SoC平台产品&#xff0c;适用于传统音响系统、新兴的蓝牙或Wifi 无线音频产品、Sound Bar 和调音台等市场。该处理器在总体架构和系统组成上&#xff0c;充分考虑了音频领域的特点&#xff0…...

来聊聊托管服务提供商(MSP)安全

纵观各个中小型企业&#xff0c;由于预算十分有限而且系统环境的满载&#xff0c;如今它们往往需要依赖托管服务提供商&#xff08;managed service providers&#xff0c;MSP&#xff09;来支持其IT服务与流程。而由于MSP提供的解决方案可以与客户端基础设施相集成&#xff0c…...

最新版本的Anaconda环境配置、Cuda、cuDNN以及pytorch环境一键式配置流程

本教程是最新的深度学习入门环境配置教程&#xff0c;跟着本教程可以帮你解决入门深度学习之前的环境配置问题。同时&#xff0c;本教程拒绝琐碎&#xff0c;大部分以图例形式进行教程。这里我们安装的都是最新版本~ 文章目录 一、Anaconda的安装1.1 下载1.2 安装1.3 环境配置…...

【数据结构与算法】十大经典排序算法-选择排序

&#x1f31f;个人博客&#xff1a;www.hellocode.top &#x1f3f0;Java知识导航&#xff1a;Java-Navigate &#x1f525;CSDN&#xff1a;HelloCode. &#x1f31e;知乎&#xff1a;HelloCode &#x1f334;掘金&#xff1a;HelloCode ⚡如有问题&#xff0c;欢迎指正&#…...

【Spring专题】Spring之Bean的生命周期源码解析——阶段一(扫描生成BeanDefinition)

目录 前言阅读准备阅读指引阅读建议 课程内容一、生成BeanDefinition1.1 简单回顾*1.2 概念回顾1.3 核心方法讲解 二、方法讲解2.1 ClassPathBeanDefinitionScanner#scan2.2 ClassPathBeanDefinitionScanner#doScan2.3 ClassPathScanningCandidateComponentProvider#findCandid…...

【C#】判断打印机共享状态

打印机共享状态 /// <summary>/// 打印机共享状态/// </summary>public enum PrinterShareState{/// <summary>/// 无打印机/// </summary>None -1,/// <summary>/// 未共享/// </summary>NotShare 0,/// <summary>/// 已共享/// …...

运维监控学习笔记7

Zabbix的安装&#xff1a; 1、基础环境准备&#xff1a; 安装zabbix的yum源&#xff0c;阿里的yum源提供了zabbix3.0。 rpm -ivh http://mirrors.aliyun.com/zabbix/zabbix/3.0/rhel/7/x86_64/zabbix-release-3.0-1.el7.noarch.rpm 这个文件就是生成了一个zabbix.repo 2、安…...

【业务功能篇64】maven加速 配置settings.xml文件 镜像

maven加速 添加阿里镜像仓 <?xml version"1.0" encoding"UTF-8"?><!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additi…...

Spring Boot(六十四):SpringBoot集成Gzip压缩数据

1 实现思路 2 实现 2.1 创建springboot项目 2.2 编写一个接口,功能很简单就是传入一个Json对象并返回 package com.example.demo.controller;import com.example.demo.entity.Advertising; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframewo…...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启&#xff0c;数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后&#xff0c;存在与用户组权限相关的问题。具体表现为&#xff0c;Oracle 实例的运行用户&#xff08;oracle&#xff09;和集…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:

一、属性动画概述NETX 作用&#xff1a;实现组件通用属性的渐变过渡效果&#xff0c;提升用户体验。支持属性&#xff1a;width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项&#xff1a; 布局类属性&#xff08;如宽高&#xff09;变化时&#…...

ssc377d修改flash分区大小

1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

大数据零基础学习day1之环境准备和大数据初步理解

学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 &#xff08;1&#xff09;设置网关 打开VMware虚拟机&#xff0c;点击编辑…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

【算法训练营Day07】字符串part1

文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接&#xff1a;344. 反转字符串 双指针法&#xff0c;两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

CMake 从 GitHub 下载第三方库并使用

有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

今日科技热点速览

&#x1f525; 今日科技热点速览 &#x1f3ae; 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售&#xff0c;主打更强图形性能与沉浸式体验&#xff0c;支持多模态交互&#xff0c;受到全球玩家热捧 。 &#x1f916; 人工智能持续突破 DeepSeek-R1&…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek

文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama&#xff08;有网络的电脑&#xff09;2.2.3 安装Ollama&#xff08;无网络的电脑&#xff09;2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...