当前位置: 首页 > news >正文

Spring Cloud Stream实现数据流处理

1.什么是Spring Cloud Stream?

Spring Cloud Stream的核心是Stream,准确来讲Spring Cloud Stream提供了一整套数据流走向(流向)的API, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理 我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。
在这里插入图片描述
我们很容易总结出每个模块的流程:

1、从上一个模块拉取数据
2、处理数据
3、将处理完成的数据发给下一个模块

其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同,需要使用不同的API。 为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。

2.环境准备

采用docker-compose搭建kafaka环境

version: '3'networks:kafka:ipam:driver: defaultconfig:- subnet: "172.22.6.0/24"services:zookepper:image: /zookeeper:latestcontainer_name: zookeeper-serverrestart: unless-stoppedvolumes:- "/etc/localtime:/etc/localtime"environment:ALLOW_ANONYMOUS_LOGIN: yesports:- "2181:2181"networks:kafka:ipv4_address: 172.22.6.11kafka:image: /kafka:3.4.1container_name: kafkarestart: unless-stoppedvolumes:- "/etc/localtime:/etc/localtime"environment:ALLOW_PLAINTEXT_LISTENER: yesKAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092ports:- "9092:9092"depends_on:- zookeppernetworks:kafka:ipv4_address: 172.22.6.12kafka-map:image: /kafka-mapcontainer_name: kafka-maprestart: unless-stoppedvolumes:- "./kafka/kafka-map/data:/usr/local/kafka-map/data"environment:DEFAULT_USERNAME: adminDEFAULT_PASSWORD: 123456ports:- "9080:8080"depends_on:                         - kafkanetworks:kafka:ipv4_address: 172.22.6.13

run

docker-compose -f docker-compose-kafka.yml -p kafka up -d

3.代码工程

实验目标

1、生成UUID并将其发送到Kafka主题batch-in。
2、从batch-in主题接收UUID的批量消息,移除其中的数字,并将结果发送到batch-out主题。
3、监听batch-out主题并打印接收到的消息。

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springcloud-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>spring-cloud-stream-kafaka</artifactId><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target></properties><dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies></project>

处理流

package com.et;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;@SpringBootApplication
public class CloudStreamsFunctionBatch {public static void main(String[] args) {SpringApplication.run(CloudStreamsFunctionBatch.class, args);}@Beanpublic Supplier<UUID> stringSupplier() {return () -> {var uuid = UUID.randomUUID();System.out.println(uuid + " -> batch-in");return uuid;};}@Beanpublic Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {return idBatch -> {System.out.println("Removed digits from batch of " + idBatch.size());return idBatch.stream().map(UUID::toString)// Remove all digits from the UUID.map(uuid -> uuid.replaceAll("\\d","")).map(noDigitString -> MessageBuilder.withPayload(noDigitString).build()).toList();};}@KafkaListener(id = "batch-out", topics = "batch-out")public void listen(String in) {System.out.println("batch-out -> " + in);}}
  • 定义一个名为stringSupplier的Bean,它实现了Supplier接口。这个方法生成一个随机的UUID,并打印到控制台,表示这个UUID将被发送到batch-in主题。
  • 定义一个名为digitRemovingConsumer的Bean,它实现了Function<List, List<Message>>接口。这个方法接受一个UUID的列表,打印出处理的UUID数量,然后将每个UUID转换为字符串,移除其中的所有数字,最后将结果封装为消息并返回。
  • 使用@KafkaListener注解定义一个Kafka监听器,监听batch-out主题。当接收到消息时,调用listen方法并打印接收到的消息内容。

配置文件

spring:cloud:function:definition: stringSupplier;digitRemovingConsumerstream:bindings:stringSupplier-out-0:destination: batch-indigitRemovingConsumer-in-0:destination: batch-ingroup: batch-inconsumer:batch-mode: truedigitRemovingConsumer-out-0:destination: batch-outkafka:binder:brokers: localhost:9092bindings:digitRemovingConsumer-in-0:consumer:configuration:# Forces consumer to wait 5 seconds before polling for messagesfetch.max.wait.ms: 5000fetch.min.bytes: 1000000000max.poll.records: 10000000

参数解释
1、spring.cloud.function.definition:定义了两个函数,stringSupplier和digitRemovingConsumer。这两个函数将在应用程序中被使用。

spring:cloud:function:definition: stringSupplier;digitRemovingCon

2、stream.bindings.stringSupplier-out-0.destination:将stringSupplier函数的输出绑定到Kafka主题batch-in。

stream:bindings:stringSupplier-out-0:destination: batch-in

3、stream.bindings.digitRemovingConsumer-in-0.destination:将digitRemovingConsumer函数的输入绑定到Kafka主题batch-in。

digitRemovingConsumer-in-0:destination: batch-ingroup: batch-inconsumer:batch-mode: true

4、group: batch-in:指定消费者组为batch-in,这意味着多个实例可以共享这个组来处理消息。

5、consumer.batch-mode: true:启用批处理模式,允许消费者一次处理多条消息。

6、stream.bindings.digitRemovingConsumer-out-0.destination:将digitRemovingConsumer函数的输出绑定到Kafka主题batch-out。

digitRemovingConsumer-out-0:destination: batch-out

4.测试

启动弄Spring Boot应用,可以看到控制台输出日志如下:

291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in
c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in
a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in
db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in
b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in
Removed digits from batch of 5
batch-out -> eacc-ee-dfb-b-dead
batch-out -> cbae-e-f-c-acfb
batch-out -> ab-dd—adade
batch-out -> db-fb-f-bbb-bfdec
batch-out -> bdb–d-ad-bbbb

相关文章:

Spring Cloud Stream实现数据流处理

1.什么是Spring Cloud Stream&#xff1f; Spring Cloud Stream的核心是Stream&#xff0c;准确来讲Spring Cloud Stream提供了一整套数据流走向&#xff08;流向&#xff09;的API&#xff0c; 它的最终目的是使我们不关心数据的流入和写出&#xff0c;而只关心对数据的业务处…...

列表上移下移功能实现

后台管理某列表需实现上移下移功能&#xff0c;并与前端展示列表排序相关。 现将开发完成过程笔记记录下来。 目录 列表增加属性 JQuery脚本 服务端 控制器 服务层 总结 列表增加属性 在循环渲染时&#xff0c;在table表格的tr上增加id和排序的属性值&#xff0c;以便传…...

升级智享 AI 直播三代:领航原生直播驶向自动化运营新航道

在瞬息万变的数字商业世界&#xff0c;直播行业恰似一艘破浪前行的巨轮&#xff0c;原生直播作为初始 “航船”&#xff0c;在历经风雨后&#xff0c;终于迎来智享 AI 直播三代这股强劲 “东风”&#xff0c;校准航向&#xff0c;开启自动化运营的全新航道&#xff0c;驶向一片…...

Llmcad: Fast and scalable on-device large language model inference

题目&#xff1a;Llmcad: Fast and scalable on-device large language model inference 发表于2023.09 链接&#xff1a;https://arxiv.org/pdf/2309.04255 声称是第一篇speculative decoding边缘设备的论文&#xff08;不一定是绝对的第一篇&#xff09;&#xff0c;不开源…...

Hbase2.2.7集群部署

环境说明 准备三台服务器&#xff0c;分别为&#xff1a;bigdata141&#xff08;作为Hbase主节点&#xff09;、bigdata142、bigdata143确保hadoop和zookeeper集群都先启动好我这边的hadoop版本为3.2.0&#xff0c;zookeeper版本为3.5.8 下载安装包 下载链接&#xff1a;In…...

【青牛科技】D1671 75Ω 带4级低通滤波的单通道视频放大电 路芯片介绍

概 述 &#xff1a; D1671是 一 块 带 4级 低 通 滤 波 的 单 通 道 视 频 放 大 电 路 &#xff0c; 可 在3V或5V的 低 电 压 下 工 作 。 该 电 路 用 在 有 TV影 象 输 出 功 能 的 产 品 上 面&#xff0c;比如 机 顶 盒 &#xff0c;监 控 摄 象 头 &#xff0c;DVD&#…...

[NeurIPS 2022] Leveraging Inter-Layer Dependency for Post-Training Quantization

Contents IntroductionMethodExperimentsReferences Introduction 作者提出一种端到端的 PTQ 训练策略 Network-Wise Quantization (NWQ)&#xff0c;并通过 Annealing Softmax (ASoftmax) 和 Annealing Mixup (AMixup) 改进了 AdaRound&#xff0c;降低了训练收敛难度 Metho…...

ubuntu+ROS推视频流至网络

目录 概述 工具 ros_rtsp 接受流 web_video_server 源码安装 二进制安装 ros接收rtsp视频流 总结 概述 ros_rtsp功能包可以将ros视频流以rtsp形式推送 web_video_server功能包可以将ros视频话题推HTTP流 rocon_rtsp_camera_relay可以接受同一网段下的rtsp视频流输出为…...

PHP 去掉特殊不可见字符 “\u200e“

描述 最近在排查网站业务时&#xff0c;发现有数据匹配失败的情况 肉眼上完全看不出问题所在 当把字符串 【M24308/23-14F‎】复制出来发现 末尾有个不可见的字符 使用删除键或左右移动时才会发现 最后测试通过 var_dump 打印 发现这个"空字符"占了三个长度 &#xf…...

深度学习—BP算法梯度下降及优化方法Day37

梯度下降 1.公式 w i j n e w w i j o l d − α ∂ E ∂ w i j w_{ij}^{new} w_{ij}^{old} - \alpha \frac{\partial E}{\partial w_{ij}} wijnew​wijold​−α∂wij​∂E​ α为学习率 当α过小时&#xff0c;训练时间过久增加算力成本&#xff0c;α过大则容易造成越过最…...

elasticsearch8.16 docker-compose 多机器集群安装

在网上找了一圈, 发现要么就是单机版的部署了多个节点, 很少有多台机器部署集群的, 有些就拿官网的例子写一写, 没有实战经验, 下面分享一个教程, 实实在在的多台机器, 每台机器部署2个节点的例子 先上.env , docker-compose.yml文件, 这个文件是核心, 里面掺杂太多坑, 已经帮你…...

Flink--API 之 Source 使用解析

目录 一、Flink Data Sources 分类概览 &#xff08;一&#xff09;预定义 Source &#xff08;二&#xff09;自定义 Source 二、代码实战演示 &#xff08;一&#xff09;预定义 Source 示例 基于本地集合 基于本地文件 基于网络套接字&#xff08;socketTextStream&…...

uniapp在小程序连接webScoket实现余额支付

webScoket文档&#xff1a;uni.connectSocket(OBJECT) | uni-app官网 /plugins/event.js const Dep function() {this.Evens Object.create(null); } class Event {constructor({dep new Dep()} {}) {if (dep.constructor Object && Object.keys(dep).length 0…...

Spring Boot【三】

自动注入 xml中可以在bean元素中通过autowire属性来设置自动注入的方式&#xff1a; <bean id"" class"" autowire"byType|byName|constructor|default" /> byName&#xff1a;按照名称进行注入 byType&#xff1a;按类型进行注入 constr…...

R 因子

R 因子 引言 在金融领域&#xff0c;风险管理和投资策略的优化一直是核心议题。传统的风险度量工具&#xff0c;如波动率、Beta系数等&#xff0c;虽然在一定程度上能够帮助投资者理解市场的波动和资产的相对风险&#xff0c;但它们往往无法全面捕捉到市场动态的复杂性。因此…...

【博主推荐】C# Winform 拼图小游戏源码详解(附源码)

文章目录 前言摘要1.设计来源拼图小游戏讲解1.1 拼图主界面设计1.2 一般难度拼图效果1.3 普通难度拼图效果1.4 困难难度拼图效果1.5 地域难度拼图效果1.6 内置五种拼图效果 2.效果和源码2.1 动态效果2.2 源代码 源码下载结束语 前言 在数字浪潮汹涌澎湃的时代&#xff0c;程序开…...

深入解析 MySQL 启动方式:`systemctl` 与 `mysqld` 的对比与应用

目录 前言1. 使用 systemctl 启动 MySQL1.1 什么是 systemctl1.2 systemctl 启动 MySQL 的方法1.3 应用场景1.4 优缺点优点缺点 2. 使用 mysqld 命令直接启动 MySQL2.1 什么是 mysqld2.2 mysqld 启动 MySQL 的方法2.3 应用场景2.4 优缺点优点缺点 3. 对比分析结语 前言 MySQL …...

【python】windows pip 安装 module 提示 Microsoft Visual C++ 14.0 is required 处理方法

参考链接&#xff1a;https://blog.csdn.net/qzzzxiaosheng/article/details/12511900 1.问题引入 在使用pip 安装一些module经常会出现报错&#xff1a; Microsoft Visual C 14.0 is required. Get it with “Microsoft Visual C Build Tools很明显这是缺少C的编译的相关依…...

python爬虫案例——猫眼电影数据抓取之字体解密,多套字体文件解密方法(20)

文章目录 1、任务目标2、网站分析3、代码编写1、任务目标 目标网站:猫眼电影(https://www.maoyan.com/films?showType=2) 要求:抓取该网站下,所有即将上映电影的预约人数,保证能够获取到实时更新的内容;如下: 2、网站分析 进入目标网站,打开开发者模式,经过分析,我…...

go sync.WaitGroup

1、数据结构 type WaitGroup struct {noCopy noCopystate atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.sema uint32 } 计数器&#xff1a;原子变量&#xff0c;高32位用于为协程计数&#xff0c;低32位为等待计数&#xff08;被Wait阻塞等待&a…...

<6>-MySQL表的增删查改

目录 一&#xff0c;create&#xff08;创建表&#xff09; 二&#xff0c;retrieve&#xff08;查询表&#xff09; 1&#xff0c;select列 2&#xff0c;where条件 三&#xff0c;update&#xff08;更新表&#xff09; 四&#xff0c;delete&#xff08;删除表&#xf…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》

在注意力分散、内容高度同质化的时代&#xff0c;情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现&#xff0c;消费者对内容的“有感”程度&#xff0c;正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中&#xff0…...

cf2117E

原题链接&#xff1a;https://codeforces.com/contest/2117/problem/E 题目背景&#xff1a; 给定两个数组a,b&#xff0c;可以执行多次以下操作&#xff1a;选择 i (1 < i < n - 1)&#xff0c;并设置 或&#xff0c;也可以在执行上述操作前执行一次删除任意 和 。求…...

【配置 YOLOX 用于按目录分类的图片数据集】

现在的图标点选越来越多&#xff0c;如何一步解决&#xff0c;采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集&#xff08;每个目录代表一个类别&#xff0c;目录下是该类别的所有图片&#xff09;&#xff0c;你需要进行以下配置步骤&#x…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

Python如何给视频添加音频和字幕

在Python中&#xff0c;给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加&#xff0c;包括必要的代码示例和详细解释。 环境准备 在开始之前&#xff0c;需要安装以下Python库&#xff1a;…...

rnn判断string中第一次出现a的下标

# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...

C++.OpenGL (14/64)多光源(Multiple Lights)

多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的

修改bug思路&#xff1a; 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑&#xff1a;async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...