当前位置: 首页 > news >正文

KafkaStream:基本使用

简介:

        kafkaStream:提供了对存储在kafka中的数据进行流式处理和分析的功能

特点:

        KafkasSream提供了一个非常简单轻量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署

入门案例:

  1、新建工程kafka-demo

           引入kafkaStream依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--kafkaStream--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency></dependencies>

   2、新建流式处理类

          代码如下

package com.heima.kafkademo.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*
* 流式处理
* */
public class KafkaStreamQuickStart {public static void main(String[] args) {/*创建kafka配置中心并配置参数*/Properties prop = new Properties();//连接地址prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//key序列化prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//value序列化prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());//创建id名称prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream构造器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}//流式计算方法private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kafka对象,同时指定从哪个topic获取消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");//处理消息的valuestream.flatMapValues(new ValueMapper<String, Iterable<?>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})      //按照value进行聚合.groupBy((key,value)->value)//时间窗口,每隔10秒更新一次.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}

3、启动消费者类和流式处理类监听消息

        使用生产者类发送消息

       消费者和生产者类代码参考Kafka:安装和配置_Success___的博客-CSDN博客

4、测试

        成功接收到消息

 

 

相关文章:

KafkaStream:基本使用

简介&#xff1a; kafkaStream&#xff1a;提供了对存储在kafka中的数据进行流式处理和分析的功能 特点&#xff1a; KafkasSream提供了一个非常简单轻量的Library&#xff0c;它可以非常方便的嵌入到java程序中&#xff0c;也可以任何方式打包部署 入门案例&#xff1a; 1、…...

【数据结构】二叉树

完全二叉树 是指所有结点度数小于等于2的树 所以这种情况也是&#xff1a; 几条性质 一个具有n个结点的完全二叉树的深度为&#xff1a; log ⁡ 2 ( n 1 ) 的结果向上取整。 \\\log_{2}(n1) \ \ 的结果向上取整。 log2​(n1) 的结果向上取整。设度为0的结点个数是n0&#…...

基于灰狼优化(GWO)、帝国竞争算法(ICA)和粒子群优化(PSO)对梯度下降法训练的神经网络的权值进行了改进(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

jenkins自动化构建保姆级教程(持续更新中)

1.安装 1.1版本说明 访问jenkins官网 https://www.jenkins.io/&#xff0c;进入到首页 点击【Download】按钮进入到jenkins下载界面 左侧显示的是最新的长期支持版本&#xff0c;右侧显示的是最新的可测试版本&#xff08;可能不稳定&#xff09;&#xff0c;建议使用最新的…...

HTTPS 的加密流程

目录 一、HTTPS是什么&#xff1f; 二、为什么要加密 三、"加密" 是什么 四、HTTPS 的工作过程 1.对称加密 2.非对称加密 3.中间人攻击 4.证书 总结 一、HTTPS是什么&#xff1f; HTTPS (Hyper Text Transfer Protocol Secure) 是基于 HTTP 协议之上的安全协议&…...

Jmeter 参数化的几种方法

目录 配置元件-用户自定义变量 前置处理器-用户参数 配置元件-CSV Data Set Config Tools-函数助手 配置元件-用户自定义变量 可在测试计划、线程组、HTTP请求下创建用户定义的变量 全局变量&#xff0c;可以跨线程组调用 jmeter执行的时候&#xff0c;只获取一次&#xff0…...

剑指Offer45.把数组排成最小的数 C++

1、题目描述 输入一个非负整数数组&#xff0c;把数组里所有数字拼接起来排成一个数&#xff0c;打印能拼接出的所有数字中最小的一个。 示例 1: 输入: [10,2] 输出: “102” 示例 2: 输入: [3,30,34,5,9] 输出: “3033459” 2、VS2019上运行 先转换成字符串再组合起来 #in…...

【java毕业设计】基于SSM+MySql的人才公寓管理系统设计与实现(程序源码)--人才公寓管理系统

基于SSMMySql的人才公寓管理系统设计与实现&#xff08;程序源码毕业论文&#xff09; 大家好&#xff0c;今天给大家介绍基于SSMMySql的人才公寓管理系统设计与实现&#xff0c;本论文只截取部分文章重点&#xff0c;文章末尾附有本毕业设计完整源码及论文的获取方式。更多毕业…...

golang操作excel的高性能库——excelize/v2

目录 介绍文档与源码安装快速开始创建 Excel 文档读取 Excel 文档打开数据流流式写入 [相关 Excel 开源类库性能对比](https://xuri.me/excelize/zh-hans/performance.html) 介绍 Excelize是一个纯Go编写的库&#xff0c;提供了一组功能&#xff0c;允许你向XLAM / XLSM / XLS…...

学习51单片机怎么开始?

学习的过程不总是先打好基础&#xff0c;然后再盖上层建筑&#xff0c;尤其是实践性的、工程性很强的东西。如果你一定要先全面打好基础&#xff0c;再学习单片机&#xff0c;我觉得你一定学不好&#xff0c;因为你的基础永远打不好&#xff0c;因为基础太庞大了&#xff0c;基…...

[.NET学习笔记] -.NET6.0项目动态加载netstandard2.0报错但项目添加引用则正常的问题

问题描述 .NET6.0的项目使用netstandard2.0版本的动态链接库。若是在项目中直接添加引用&#xff0c;应用netstandard2.0项目或者netstandard2.0编译后的dll均能正常工作。但如果通过xcopy等方式&#xff0c;额外将对应的dll复制到执行目录&#xff0c;会执行失败。调用方式一…...

山景DSP芯片可烧录AP8224C2音频处理器方案

AP8224C2高性能32位音频应用处理器AP82系列音频处理器是面向音频应用领域设计的新一代SoC平台产品&#xff0c;适用于传统音响系统、新兴的蓝牙或Wifi 无线音频产品、Sound Bar 和调音台等市场。该处理器在总体架构和系统组成上&#xff0c;充分考虑了音频领域的特点&#xff0…...

来聊聊托管服务提供商(MSP)安全

纵观各个中小型企业&#xff0c;由于预算十分有限而且系统环境的满载&#xff0c;如今它们往往需要依赖托管服务提供商&#xff08;managed service providers&#xff0c;MSP&#xff09;来支持其IT服务与流程。而由于MSP提供的解决方案可以与客户端基础设施相集成&#xff0c…...

最新版本的Anaconda环境配置、Cuda、cuDNN以及pytorch环境一键式配置流程

本教程是最新的深度学习入门环境配置教程&#xff0c;跟着本教程可以帮你解决入门深度学习之前的环境配置问题。同时&#xff0c;本教程拒绝琐碎&#xff0c;大部分以图例形式进行教程。这里我们安装的都是最新版本~ 文章目录 一、Anaconda的安装1.1 下载1.2 安装1.3 环境配置…...

【数据结构与算法】十大经典排序算法-选择排序

&#x1f31f;个人博客&#xff1a;www.hellocode.top &#x1f3f0;Java知识导航&#xff1a;Java-Navigate &#x1f525;CSDN&#xff1a;HelloCode. &#x1f31e;知乎&#xff1a;HelloCode &#x1f334;掘金&#xff1a;HelloCode ⚡如有问题&#xff0c;欢迎指正&#…...

【Spring专题】Spring之Bean的生命周期源码解析——阶段一(扫描生成BeanDefinition)

目录 前言阅读准备阅读指引阅读建议 课程内容一、生成BeanDefinition1.1 简单回顾*1.2 概念回顾1.3 核心方法讲解 二、方法讲解2.1 ClassPathBeanDefinitionScanner#scan2.2 ClassPathBeanDefinitionScanner#doScan2.3 ClassPathScanningCandidateComponentProvider#findCandid…...

【C#】判断打印机共享状态

打印机共享状态 /// <summary>/// 打印机共享状态/// </summary>public enum PrinterShareState{/// <summary>/// 无打印机/// </summary>None -1,/// <summary>/// 未共享/// </summary>NotShare 0,/// <summary>/// 已共享/// …...

运维监控学习笔记7

Zabbix的安装&#xff1a; 1、基础环境准备&#xff1a; 安装zabbix的yum源&#xff0c;阿里的yum源提供了zabbix3.0。 rpm -ivh http://mirrors.aliyun.com/zabbix/zabbix/3.0/rhel/7/x86_64/zabbix-release-3.0-1.el7.noarch.rpm 这个文件就是生成了一个zabbix.repo 2、安…...

【业务功能篇64】maven加速 配置settings.xml文件 镜像

maven加速 添加阿里镜像仓 <?xml version"1.0" encoding"UTF-8"?><!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additi…...

Spring Boot(六十四):SpringBoot集成Gzip压缩数据

1 实现思路 2 实现 2.1 创建springboot项目 2.2 编写一个接口,功能很简单就是传入一个Json对象并返回 package com.example.demo.controller;import com.example.demo.entity.Advertising; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframewo…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

【JavaEE】-- HTTP

1. HTTP是什么&#xff1f; HTTP&#xff08;全称为"超文本传输协议"&#xff09;是一种应用非常广泛的应用层协议&#xff0c;HTTP是基于TCP协议的一种应用层协议。 应用层协议&#xff1a;是计算机网络协议栈中最高层的协议&#xff0c;它定义了运行在不同主机上…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek

文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama&#xff08;有网络的电脑&#xff09;2.2.3 安装Ollama&#xff08;无网络的电脑&#xff09;2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...

嵌入式学习笔记DAY33(网络编程——TCP)

一、网络架构 C/S &#xff08;client/server 客户端/服务器&#xff09;&#xff1a;由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序&#xff0c;负责提供用户界面和交互逻辑 &#xff0c;接收用户输入&#xff0c;向服务器发送请求&#xff0c;并展示服务…...

DingDing机器人群消息推送

文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人&#xff0c;点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置&#xff0c;详见说明文档 成功后&#xff0c;记录Webhook 2 API文档说明 点击设置说明 查看自…...