当前位置: 首页 > article >正文

RocketMQ Streams 1.1.0: 轻量级流处理再出发

本文作者倪泽Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer01 背景RocketMQ Streams是一款基于RocketMQ为基础的轻量级流计算引擎具有资源消耗少、部署简单、功能全面的特点目前已经在社区开源。RocketMQ Streams在阿里云内部被使用在对资源比较敏感同时又强烈需要流计算的场景比如在自建机房的云安全场景下。自RocketMQ Streams开源以来吸引了大量用户调研和试用。但是也存在一些问题在RocketMQ Streams 1.1.0中主要针对以下问题做出了改进和优化。1、面向用户API不够友好不能使用泛型不支持自定义序列化/反序列化2、代码冗余在RocketMQ Streams中存在将流处理拓扑序列化反序列化模块RocketMQ Streams作为轻量级流处理SDK构建好流处理节点之后应该可以直接处理数据不存在将流处理拓扑图本地保存或者网络传输需求。3、流处理过程不容易理解含有大量缓存、刷新逻辑4、存在大量支持SQL的代码这部分和SDK方式运行流处理任务的逻辑无关在RocketMQ Streams 1.1.0中对上述问题做出了改进期望能带来更好的使用体验。同时重新设计了流处理拓扑构建过程、去掉冗余代码使得代码更容易被理解。从今天起将推出系列文章介绍RocketMQ Streams 1.1.0版本本次文章主要介绍RocketMQ Streams 1.1.0的API如何使用如何利用RocketMQ Streams快速构建流处理应用。02 典型使用示例本地运行下列示例的步骤1、部署RocketMQ 5.02、使用mqAdmin创建topic3、构建示例工程添加依赖启动示例。RocketMQ Streams 坐标dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-streams/artifactId version1.1.0/version /dependency4、向topic中写入相应数据并观察结果。更详细文档请参考https://github.com/apache/roc...WordCountpublic class WordCount { public static void main(String[] args) { StreamBuilder builder new StreamBuilder(wordCount); builder.source(sourceTopic, total - { String value new String(total, StandardCharsets.UTF_8); return new Pair(null, value); }) .flatMap((ValueMapperActionString, ListString) value - { String[] splits value.toLowerCase().split(\W); return Arrays.asList(splits); }) .keyBy(value - value) .count() .toRStream() .print(); TopologyBuilder topologyBuilder builder.build(); Properties properties new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, 127.0.0.1:9876); RocketMQStream rocketMQStream new RocketMQStream(topologyBuilder, properties); final CountDownLatch latch new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread(wordcount-shutdown-hook) { Override public void run() { rocketMQStream.stop(); latch.countDown(); } }); try { rocketMQStream.start(); latch.await(); } catch (final Throwable e) { System.exit(1); } System.exit(0); } }WordCount示例要点1、JobId wordCount唯一标识流处理任务2、自定义的反序列化3、一对多转化4、lambda形式从数据中指定Key5、支持有状态计算窗口聚合public class WindowCount { public static void main(String[] args) { StreamBuilder builder new StreamBuilder(windowCountUser); AggregateActionString, User, Num aggregateAction (key, value, accumulator) - new Num(value.getName(), 100); builder.source(user, source - { User user1 JSON.parseObject(source, User.class); return new Pair(null, user1); }) .selectTimestamp(User::getTimestamp) .filter(value - value.getAge() 0) .keyBy(value - key) .window(WindowBuilder.tumblingWindow(Time.seconds(15))) .aggregate(aggregateAction) .toRStream() .print(); TopologyBuilder topologyBuilder builder.build(); Properties properties new Properties(); properties.putIfAbsent(MixAll.NAMESRV_ADDR_PROPERTY, 127.0.0.1:9876); properties.put(Constant.TIME_TYPE, TimeType.EVENT_TIME); properties.put(Constant.ALLOW_LATENESS_MILLISECOND, 2000); RocketMQStream rocketMQStream new RocketMQStream(topologyBuilder, properties); rocketMQStream.start(); } }窗口聚合示例要点1、支持指定时间字段2、支持滑动、滚动、会话多种类型window3、支持自定义UDAF类型聚合4、支持自定义时间类型和数据最大迟到时间双流JOINpublic class JoinWindow { public static void main(String[] args) { StreamBuilder builder new StreamBuilder(joinWindow); //左流 RStreamUser user builder.source(user, total - { User user1 JSON.parseObject(total, User.class); return new Pair(null, user1); }); //右流 RStreamNum num builder.source(num, source - { Num user12 JSON.parseObject(source, Num.class); return new Pair(null, user12); }); //自定义join后的运算 ValueJoinActionUser, Num, Union action new ValueJoinActionUser, Num, Union() { Override public Union apply(User value1, Num value2) { ... } }; user.join(num) .where(User::getName) .equalTo(Num::getName) .window(WindowBuilder.tumblingWindow(Time.seconds(30))) .apply(action) .print(); TopologyBuilder topologyBuilder builder.build(); Properties properties new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, 127.0.0.1:9876); RocketMQStream rocketMQStream new RocketMQStream(topologyBuilder, properties); rocketMQStream.start(); } }双流聚合示例要点1、支持window join和非window join对于非window join只需要在上述及连表达式中去掉window即可2、支持多种窗口类型的window join3、支持对join后数据自定义操作03 参与贡献RocketMQ Streams是Apache RocketMQ的子项目已经在社区开源参与RocketMQ Streams相关工作请参考以下资源1、试用RocketMQ Streams并阅读相关文档以了解更多信息maven仓库坐标dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-streams/artifactId version1.1.0/version /dependencyRocketMQ Streams文档https://rocketmq.apache.org/z...2、参与贡献如果你有任何功能请求或错误报告请随时提交 Pull Request 来分享你的反馈和想法社区仓库https://github.com/apache/roc...3、联系我们可以在 GitHub上创建 Issue向 RocketMQ 邮件列表发送电子邮件或在 RocketMQ Streams SIG 交流群与专家共同探讨RocketMQ Streams SIG加入方式添加“小火箭”微信回复RocketMQ Streams。

相关文章:

RocketMQ Streams 1.1.0: 轻量级流处理再出发

本文作者:倪泽,Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer 01 背景 RocketMQ Streams是一款基于RocketMQ为基础的轻量级流计算引擎,具有资源消耗少、部署简单、功能全面的特点,目前已经在社区开源。Rocket…...

Gemma-4-26B-A4B-it-GGUF部署教程:开源大模型镜像免配置方案——从裸机到7860端口可用仅需8分钟

Gemma-4-26B-A4B-it-GGUF部署教程:开源大模型镜像免配置方案——从裸机到7860端口可用仅需8分钟 1. 项目概述 Gemma-4-26B-A4B-it-GGUF 是 Google Gemma 4 系列中高性能、高效能的 MoE(混合专家)聊天模型,具备256K tokens的超长…...

RocketMQ 运维管控的利器 - RocketMQ Operator

本文主要分为三个部分: 首先简单介绍一下 RocketMQ Operator 的相关知识;然后结合案例详细介绍 RocketMQ Operator 提供的自定义资源及使用方法;最后介绍 Operator 社区目前的情况并展望 RocketMQ Operator 下一步的发展方向。 相关背景知识…...

【Netty高性能网络框架解析系列】系列文章之四大高性能特性之内存池化技术(3)

netty的内存管理和内存池化设计Netty 内存池设计Netty为什么用内存池化设计:Netty管理内存整体架构Jemalloc 内存分片算法和结构内存分配的组件架构图如下:Netty分配器类结构层次关系如下:PooledByteBufAllocator 分配器Netty 内存池设计 Ne…...

05 - AMDGPU中的VRAM管理器

难度: 🟡 进阶级 预计学习时间: 60分钟 前置知识: 04-drm_buddy核心数据结构详解 📋 概述 AMDGPU VRAM Manager是Buddy分配器和TTM框架之间的桥梁: 🔗 集成层: 将Buddy嵌入到TTM资源管理框架📊 统计层: 追踪VRAM使用…...

密封类取代if-else和Visitor模式,性能提升47%?——基于JMH压测的Java 25真实基准报告

更多请点击: https://intelliparadigm.com 第一章:密封类取代if-else和Visitor模式,性能提升47%?——基于JMH压测的Java 25真实基准报告 Java 25 正式引入了对密封类(Sealed Classes)的完整运行时优化支持…...

保姆级教程:ROS2 Humble下用rs_launch.py调通你的RealSense D435i(含点云与配准配置)

ROS2 Humble实战:RealSense D435i点云与配准配置全解析 第一次接触RealSense D435i和ROS2时,我盯着黑漆漆的Rviz界面发呆了半小时——明明按照教程启动了相机,为什么就是看不到点云?如果你也遇到过类似问题,这篇保姆级…...

【绝密】Python配置热加载失效的底层机制:从importlib.reload()缺陷到__pycache__污染链(仅限CI/CD工程师内部解密)

更多请点击: https://intelliparadigm.com 第一章:Python配置热加载失效的全局现象与影响面 Python 应用在微服务与云原生场景中广泛依赖配置热加载(Hot Reload)机制实现运行时参数动态更新,但实践中该能力常因环境、…...

Fairseq-Dense-13B-Janeway入门指南:识别模型局限——为何必须用英文提示词

Fairseq-Dense-13B-Janeway入门指南:识别模型局限——为何必须用英文提示词 1. 模型概述 Fairseq-Dense-13B-Janeway 是由 KoboldAI 发布的 130 亿参数创意写作大模型,专注于生成具有经典叙事风格的英文科幻与奇幻内容。该模型基于 2210 本科幻与奇幻题…...

PeachPy未来展望:汇编编程的发展趋势与创新方向

PeachPy未来展望:汇编编程的发展趋势与创新方向 【免费下载链接】PeachPy x86-64 assembler embedded in Python 项目地址: https://gitcode.com/gh_mirrors/pe/PeachPy PeachPy作为一款将x86-64汇编嵌入Python的创新工具,正在重新定义汇编编程的…...

TigerVNC终极指南:如何在3分钟内搭建跨平台远程桌面连接

TigerVNC终极指南:如何在3分钟内搭建跨平台远程桌面连接 【免费下载链接】tigervnc High performance, multi-platform VNC client and server 项目地址: https://gitcode.com/gh_mirrors/ti/tigervnc TigerVNC是一款高性能、跨平台的VNC客户端和服务器软件&…...

ComfyUI-WanVideoWrapper深度解析:企业级AI视频生成架构与性能优化实战指南

ComfyUI-WanVideoWrapper深度解析:企业级AI视频生成架构与性能优化实战指南 【免费下载链接】ComfyUI-WanVideoWrapper 项目地址: https://gitcode.com/GitHub_Trending/co/ComfyUI-WanVideoWrapper ComfyUI-WanVideoWrapper作为ComfyUI生态中的专业级AI视频…...

网盘直链解析助手:八大平台高效下载的完整解决方案

网盘直链解析助手:八大平台高效下载的完整解决方案 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 ,支持 百度网盘 / 阿里云盘 / 中国移动云盘 / 天翼云盘…...

PeachPy社区贡献指南:从用户到开发者的成长路径

PeachPy社区贡献指南:从用户到开发者的成长路径 【免费下载链接】PeachPy x86-64 assembler embedded in Python 项目地址: https://gitcode.com/gh_mirrors/pe/PeachPy PeachPy是一个嵌入Python的x86-64汇编器,它允许开发者直接在Python代码中编…...

Chaplin:本地化实时唇语识别完整指南,5分钟开启无声语音革命

Chaplin:本地化实时唇语识别完整指南,5分钟开启无声语音革命 【免费下载链接】chaplin A real-time silent speech recognition tool. 项目地址: https://gitcode.com/gh_mirrors/chapl/chaplin 在当今隐私至上的数字时代,Chaplin 作为…...

如何永久免费使用Cursor AI Pro功能:终极破解工具完整指南

如何永久免费使用Cursor AI Pro功能:终极破解工具完整指南 【免费下载链接】cursor-free-vip [Support 0.45](Multi Language 多语言)自动注册 Cursor Ai ,自动重置机器ID , 免费升级使用Pro 功能: Youve reached your…...

为团队统一开发环境使用 TaoToken CLI 一键配置 API 密钥

为团队统一开发环境使用 TaoToken CLI 一键配置 API 密钥 1. 准备工作 在团队协作开发中,确保所有成员使用统一的大模型调用配置至关重要。通过 TaoToken CLI 工具,可以快速为团队成员配置相同的 API 密钥、模型选择和端点地址。开始前需要准备以下内容…...

SensibleSideButtons vs 原生手势:哪个更适合你的工作流?

SensibleSideButtons vs 原生手势:哪个更适合你的工作流? 【免费下载链接】sensible-side-buttons A macOS menu bar app that enables system-wide navigation functionality for the side buttons on third-party mice. 项目地址: https://gitcode.c…...

终极指南:如何在Windows上获得完整的AirPods使用体验

终极指南:如何在Windows上获得完整的AirPods使用体验 【免费下载链接】AirPodsDesktop ☄️ AirPods desktop user experience enhancement program, for Windows and Linux (WIP) 项目地址: https://gitcode.com/gh_mirrors/ai/AirPodsDesktop 你是否在Wind…...

FLUX.1-Krea-Extracted-LoRA效果展示:珠宝反光与金属拉丝质感高清样例

FLUX.1-Krea-Extracted-LoRA效果展示:珠宝反光与金属拉丝质感高清样例 1. 真实感图像生成新标杆 FLUX.1-Krea-Extracted-LoRA模型为AI图像生成带来了革命性的真实感提升。这个从FLUX.1-Krea-dev基础模型中提取的LoRA风格权重,专门针对FLUX.1-dev模型进…...

别再手动合并单元格了!用EasyExcel模板填充,5分钟搞定带固定表头的复杂Excel导出

告别Excel手工排版:用EasyExcel模板引擎实现智能报表生成 每次财务季度会前,技术团队总会收到业务部门发来的Excel格式调整需求——"这个表头能不能加粗显示?""合并单元格后打印预览总是错位怎么办?"。作为后…...

Face Analysis WebUI实战教程:结合Pillow实现检测结果图自动裁剪保存

Face Analysis WebUI实战教程:结合Pillow实现检测结果图自动裁剪保存 你是不是也遇到过这样的烦恼?用Face Analysis WebUI分析了一堆照片,得到了带有人脸框和关键点的结果图,但每次想单独保存某个人脸时,都得手动截图…...

Fairseq-Dense-13B-Janeway保姆级教学:从显存监控(nvidia-smi)到生成质量评估全流程

Fairseq-Dense-13B-Janeway保姆级教学:从显存监控(nvidia-smi)到生成质量评估全流程 1. 模型概述与快速体验 Fairseq-Dense-13B-Janeway是KoboldAI发布的130亿参数创意写作大模型,专门针对科幻与奇幻题材进行优化。该模型使用22…...

构建多 Agent 协作系统时如何通过 Taotoken 统一管理模型调用

构建多 Agent 协作系统时如何通过 Taotoken 统一管理模型调用 1. 多 Agent 系统的模型调用挑战 在由多个专用 Agent 组成的复杂系统中,每个 Agent 往往需要不同的模型能力。例如,一个对话 Agent 可能需要 Claude 系列模型的流畅性,而一个数据…...

软件评测师基础知识专项刷题:网络安全技术(一)

前言软考软件评测师备考之路,基础刷题必不可少。本文围绕【网络安全技术】模块整理经典习题 核心考点梳理,系列内容长期连载更新,慢慢积累、逐个突破,轻松夯实应试功底。考点防火墙防火墙是在内部网络和外部因特网之间增加的一道…...

鼠标连点器:游戏玩家的得力助手

在玩某些游戏的时候,我们经常需要反复点击鼠标,时间长了手指会很酸痛。 而且有些场景需要非常快速的连点,手动很难达到理想的速度。 这时候鼠标连点器就派上用场了,能帮我们完成这些重复性的点击工作。 今天我们要介绍的这款鼠标连…...

别再死记硬背‘枚举’和‘哈希’了!通过‘奶牛拼图’这道趣题,真正理解它们的应用场景与配合

从奶牛拼图到算法思维:枚举与哈希的趣味实践 想象一下,一群奶牛围坐在谷仓里,不是在咀嚼干草,而是在玩单词拼图游戏。它们对"MOO"这个词情有独钟,甚至发明了一套加密系统来保护自己的拼图不被农夫约翰轻易破…...

各有所长:连点器软件对比分析

连点器软件有很多,每款的功能都会有不同的侧重。 有的侧重连点速度,有的侧重稳定性,有的侧重功能丰富程度。 用户在选择的时候,往往不知道哪款最适合自己。 今天我们就来分析一下不同连点器软件的特点,帮助用户做出选择…...

企业如何利用多模型聚合平台构建内部智能问答助手

企业如何利用多模型聚合平台构建内部智能问答助手 1. 企业内部智能问答的需求背景 现代企业知识库通常包含产品文档、技术手册、客户案例等结构化与非结构化内容。传统关键词检索难以理解自然语言查询意图,而单一模型在应对不同复杂度问题时可能面临效果或成本瓶颈…...

别再只用单片机点灯了!用Multisim仿真4017+运放,体验纯硬件流水灯的乐趣

从单片机到纯硬件:用Multisim仿真4017运放打造复古流水灯 在嵌入式开发领域,点灯实验几乎是每个工程师和学生的入门必修课。从Arduino的digitalWrite()到STM32的HAL库,我们习惯了用几行代码控制LED的亮灭。但你是否思考过,在微控制…...