当前位置: 首页 > article >正文

KisFlow-Golang流式实时计算案例(四)-KisFlow在消息队列MQ中的应用

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

案例:
KisFlow-Golang流式计算案例(一)快速开始QuickStart
KisFlow-Golang流式计算案例(二)-Flow并流操作
KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用


DownLoad kis-flow source

$go get github.com/aceld/kis-flow

KisFlow with Kafka

案例源代码
https://github.com/aceld/kis-flow-usage/tree/main/12-with_kafka

这里以github.com/segmentio/kafka-go 作为第三方Kafka Client SDK(开发者也可以选择其他kafka的go开发工具)。

package mainimport ("context""fmt""github.com/aceld/kis-flow/file""github.com/aceld/kis-flow/kis""github.com/segmentio/kafka-go""sync""time"
)func main() {ctx := context.Background()// Load Configuration from fileif err := file.ConfigImportYaml("conf/"); err != nil {panic(err)}// Get the flowflowOrg := kis.Pool().GetFlow("CalStuAvgScore")if flowOrg == nil {panic("flowOrg is nil")}// Create a new Kafka readerreader := kafka.NewReader(kafka.ReaderConfig{Brokers:     []string{"localhost:9092"},Topic:       "SourceStuScore",GroupID:     "group1",MinBytes:    10e3,                   // 10KBMaxBytes:    10e6,                   // 10MBMaxWait:     500 * time.Millisecond, // 最长等待时间StartOffset: kafka.FirstOffset,})defer reader.Close()var wg sync.WaitGroupfor i := 0; i < 5; i++ { // use 5 consumers to consume in parallelwg.Add(1)go func() {// fork a new flow for each consumerflowCopy := flowOrg.Fork(ctx)defer wg.Done()for {// Read a message from Kafkamessage, err := reader.ReadMessage(ctx)if err != nil {fmt.Printf("error reading message: %v\n", err)break}// Commit the message to the flow_ = flowCopy.CommitRow(string(message.Value))// Run the flowif err := flowCopy.Run(ctx); err != nil {fmt.Println("err: ", err)return}}}()}wg.Wait()return
}func init() {// Register functionskis.Pool().FaaS("VerifyStu", VerifyStu)kis.Pool().FaaS("AvgStuScore", AvgStuScore)kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

KisFlow with Nsq

案例源代码
https://github.com/aceld/kis-flow-usage/tree/main/13-with_nsq

本KisFlow消费者以github.com/nsqio/go-nsq作为第三方SDK。

package mainimport ("context""fmt""github.com/aceld/kis-flow/file""github.com/aceld/kis-flow/kis""github.com/nsqio/go-nsq"
)func main() {ctx := context.Background()// Load Configuration from fileif err := file.ConfigImportYaml("conf/"); err != nil {panic(err)}// Get the flowflowOrg := kis.Pool().GetFlow("CalStuAvgScore")if flowOrg == nil {panic("flowOrg is nil")}// Create a new NSQ consumerconfig := nsq.NewConfig()config.MaxInFlight = 5consumer, err := nsq.NewConsumer("SourceStuScore", "channel1", config)if err != nil {panic(err)}consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {// fork a new flow for each messageflowCopy := flowOrg.Fork(ctx)// Commit the message to the flow_ = flowCopy.CommitRow(string(message.Body))// Run the flowif err := flowCopy.Run(ctx); err != nil {fmt.Println("err: ", err)return err}return nil}))err = consumer.ConnectToNSQLookupd("localhost:4161")if err != nil {panic(err)}defer consumer.Stop()select {}
}func init() {// Register functionskis.Pool().FaaS("VerifyStu", VerifyStu)kis.Pool().FaaS("AvgStuScore", AvgStuScore)kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

KisFlow with RocketMQ

案例源代码
https://github.com/aceld/kis-flow-usage/tree/main/14-with_rocketmq

github.com/apache/rocketmq-client-go 作为RocketMQ消费者SDK。

package mainimport ("context""fmt""github.com/aceld/kis-flow/file""github.com/aceld/kis-flow/kis""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {// Load Configuration from fileif err := file.ConfigImportYaml("conf/"); err != nil {panic(err)}// Get the flowmyFloq := kis.Pool().GetFlow("CalStuAvgScore")if myFloq == nil {panic("myFloq is nil")}// Create a new RocketMQ consumerc, err := rocketmq.NewPushConsumer(consumer.WithGroupName("group1"),consumer.WithNameServer([]string{"localhost:9876"}),)if err != nil {panic(err)}err = c.Subscribe("SourceStuScore", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for _, msg := range msgs {// Commit the message to the flow_ = myFloq.CommitRow(string(msg.Body))}// Run the flowif err := myFloq.Run(ctx); err != nil {fmt.Println("err: ", err)return consumer.ConsumeRetryLater, err}return consumer.ConsumeSuccess, nil})if err != nil {panic(err)}err = c.Start()if err != nil {panic(err)}defer c.Shutdown()select {}
}

作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

案例:
KisFlow-Golang流式计算案例(一)快速开始QuickStart
KisFlow-Golang流式计算案例(二)-Flow并流操作
KisFlow-Golang流式计算案例(三)-KisFlow在多协程中的应用

最后编辑于:2025-03-31 20:49:36


喜欢的朋友记得点赞、收藏、关注哦!!!

相关文章:

KisFlow-Golang流式实时计算案例(四)-KisFlow在消息队列MQ中的应用

Golang框架实战-KisFlow流式计算框架专栏 Golang框架实战-KisFlow流式计算框架(1)-概述 Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上) Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下) Golang框架实战-KisFlow流式计算框架(4)-数据流 Golang框…...

leetcode:1582. 二进制矩阵中的特殊位置(python3解法)

难度&#xff1a;简单 给定一个 m x n 的二进制矩阵 mat&#xff0c;返回矩阵 mat 中特殊位置的数量。 如果位置 (i, j) 满足 mat[i][j] 1 并且行 i 与列 j 中的所有其他元素都是 0&#xff08;行和列的下标从 0 开始计数&#xff09;&#xff0c;那么它被称为 特殊 位置。 示…...

大型语言模型的智能本质是什么

大型语言模型的智能本质是什么 基于海量数据的统计模式识别与生成系统,数据驱动的语言模拟系统 ,其价值在于高效处理文本任务(如写作、翻译、代码生成),而非真正的理解与创造 大型语言模型(如GPT-4、Claude等)的智能本质可概括为基于海量数据的统计模式识别与生成系统,…...

linux_sysctl_fs_file_nr监控项

在 Linux 系统中&#xff0c;/proc/sys/fs/file-nr 文件提供了当前系统打开文件句柄的信息。如果监控到文件打开数较高&#xff0c;可能会影响系统性能&#xff0c;甚至导致无法打开新文件&#xff08;达到文件句柄上限&#xff09;。以下是分析和解决该问题的步骤&#xff1a;…...

Cline – OpenRouter 排名第一的CLI 和 编辑器 的 AI 助手

Cline – OpenRouter 排名第一的CLI 和 编辑器 的 AI 助手&#xff0c;Cline 官网&#xff1a;https://github.com/cline/cline Star 37.8k ps&#xff0c;OpenRouter的网址是&#xff1a;OpenRouter &#xff0c;这个排名第一&#xff0c;据我观察&#xff0c;是DeepSeek v3…...

Mock.js虚拟接口

Vue3中使用Mock.js虚拟接口数据 一、创建项目 pnpm创建vite的项目,通过 PNPM来简化依赖管理。若还没有安装 PNPM&#xff0c;可以通过 npm来安装&#xff1a; 安装 PNPM npm install -g pnpm//使用国内镜像加速pnpm add -g pnpmlatestpnpm config set registry http://regis…...

2025年嵌入式大厂春招高频面试真题及解析

以下是 2025 年嵌入式大厂春招高频面试真题及解析,结合真题分类和核心知识点整理: 一、‌C/C++编程基础‌ ‌1.1 指针与内存‌ ‌野指针的成因及避免方法‌(未初始化、释放后未置空)‌ malloc与calloc的区别(后者自动初始化为0)‌ ‌指针与数组的区别‌(内存分配方…...

LoRa模块通信距离优化:如何实现低功耗覆盖30公里无线传输要求

在物联网&#xff08;IoT&#xff09;快速发展的今天&#xff0c;LoRa&#xff08;Long Range&#xff09;技术作为一种基于扩频调制的远距离无线通信技术&#xff0c;因其远距离通信、低功耗和强抗干扰能力等优势&#xff0c;在农业监测、城市智能管理、环境监测等多个领域得到…...

OpenCV 从入门到精通(day_05)

1. 模板匹配 1.1 什么是模板匹配 模板匹配就是用模板图&#xff08;通常是一个小图&#xff09;在目标图像&#xff08;通常是一个比模板图大的图片&#xff09;中不断的滑动比较&#xff0c;通过某种比较方法来判断是否匹配成功。 1.2 匹配方法 rescv2.matchTemplate(image, …...

Linux操作系统与冯·诺依曼体系结构详解

一、冯诺依曼体系结构 1. 基本概念与历史背景 冯诺依曼体系结构是由数学家约翰冯诺依曼于1945年提出的计算机设计方案&#xff0c;也称为"存储程序计算机"。这一设计奠定了现代计算机的基础架构&#xff0c;至今仍是大多数计算机系统的核心设计理念。 2. 冯诺依曼体…...

OpenRouter开源的AI大模型路由工具,统一API调用

简介 ‌OpenRouter是一个开源的路由工具‌&#xff0c;它可以绕过限制调用GPT、Claude等国外模型。以下是对它的详细介绍&#xff1a; 一、主要功能 OpenRouter专注于将用户请求智能路由到不同的AI模型&#xff0c;并提供统一的访问接口。它就像一个“路由器”&#xff0c;能…...

qt tcpsocket编程遇到的并发问题

1. 单个socket中接收消息的方法要使用局部变量而非全局&#xff0c;避免消息频发时产生脏数据 优化后的关键代码 recieveInfo() 方法通过返回内部处理后的 msg 进行传递if (data.indexOf("0103") -1) { 这里增加了判断, 对数据&#xff08;非注册和心跳&#xff0…...

zabbix监控网站(nginx、redis、mysql)

目录 前提准备&#xff1a; zabbix-server主机配置&#xff1a; 1. 安装数据库 nginx主机配置&#xff1a; 1. 安装nginx redis主机配置&#xff1a; 1. 安装redis mysql主机配置&#xff1a; 1. 安装数据库 zabbix-server&#xff1a; 1. 安装zabbix 2. 编辑配置文…...

蓝桥杯冲刺

例题1&#xff1a;握手问题 方法1&#xff1a;数学推理(简单粗暴&#xff09; 方法2&#xff1a;用代码实现方法1 #include<iostream> using namespace std; int main() {int result 0;for (int i 1; i < 49; i){for (int j i 1; j < 50; j){//第i个人与第j个…...

文心一言与 DeepSeek 的竞争分析:技术先发优势为何未能转化为市场主导地位?

目录 引言 第一部分&#xff1a;技术路径的差异——算法创新与工程优化的博弈 1.1 文心一言的技术积累与局限性 1.1.1 早期技术优势 1.1.2 技术瓶颈与局限性 1.2 DeepSeek 的技术突破 1.2.1 算法革命与工程创新 1.2.2 工程成本与效率优势 第二部分&#xff1a;生态策略…...

Spring Security(maven项目) 3.1.0

前言&#xff1a; 通过实践而发现真理&#xff0c;又通过实践而证实真理和发展真理。从感性认识而能动地发展到理性认识&#xff0c;又从理性认识而能动地指导革命实践&#xff0c;改造主观世界和客观世界。实践、认识、再实践、再认识&#xff0c;这种形式&#xff0c;循环往…...

合并两个有序数组(Java实现)

给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;使合并后的数组同样按 非递减顺序 排列。 注意&#xff1a;最终&#xff0c;合并后数组…...

Tree - Shaking

Vue 3 的 Tree - Shaking 技术详解 Tree - Shaking 是一种在打包时移除未使用代码的优化技术&#xff0c;在 Vue 3 中&#xff0c;Tree - Shaking 发挥了重要作用&#xff0c;有效减少了打包后的代码体积&#xff0c;提高了应用的加载性能。以下是对 Vue 3 中 Tree - Shaking …...

C# 从代码创建选型卡+表格

private int tabNum 1; private int sensorNum 5; private void InitializeUI() {// 创建右侧容器面板Panel rightPanel new Panel{Dock DockStyle.Right,Width 300,BackColor SystemColors.ControlDark,Parent this};// 根据防区数量创建内容if (tabNum &g…...

OpenCV 从入门到精通(day_02)

1. 边缘填充 为什么要填充边缘呢&#xff1f;我们以下图为例&#xff1a; 可以看到&#xff0c;左图在逆时针旋转45度之后原图的四个顶点在右图中已经看不到了&#xff0c;同时&#xff0c;右图的四个顶点区域其实是什么都没有的&#xff0c;因此我们需要对空出来的区域进行一个…...

VTK的两种显示刷新方式

在类中先声明vtk的显示对象 vtkRenderer out_render; vtkVertexGlyphFilter glyphFilter; vtkPolyDataMapper mapper; // 新建制图器 vtkActor actor; // 新建角色 然后在init中先初始化一下&#xff1a; out_rend…...

Ceph异地数据同步之-RBD异地同步复制(上)

#作者&#xff1a;闫乾苓 文章目录 前言基于快照的模式&#xff08;Snapshot-based Mode&#xff09;工作原理单向同步配置步骤单向同步复制测试双向同步配置步骤双向同步复制测试 前言 Ceph的RBD&#xff08;RADOS Block Device&#xff09;支持在两个Ceph集群之间进行异步镜…...

【C++】STL库_stack_queue 的模拟实现

栈&#xff08;Stack&#xff09;、队列&#xff08;Queue&#xff09;是C STL中的经典容器适配器 容器适配器特性 不是独立容器&#xff0c;依赖底层容器&#xff08;deque/vector/list&#xff09;通过限制基础容器接口实现特定访问模式不支持迭代器操作&#xff08;无法遍历…...

前端对接下载文件接口、对接dart app

嵌套在dart app里面的前端项目 1.前端调下载接口 ->后端返回 application/pdf格式的文件 ->前端将pdf处理为blob ->blob转base64 ->调用dart app的 sdk saveFile ->保存成功 async download() {try {// 调用封装的 downloadEContract 方法获取 Blob 数据const …...

一周学会Pandas2 Python数据处理与分析-编写Pandas2 HelloWord项目

锋哥原创的Pandas2 Python数据处理与分析 视频教程&#xff1a; 2025版 Pandas2 Python数据处理与分析 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 我们首先准备一个excel文件&#xff0c;用来演示pandas操作数据集(数据的集合)。excel文件属于数据集的一种&#xf…...

【易订货-注册/登录安全分析报告】

前言 由于网站注册入口容易被机器执行自动化程序攻击&#xff0c;存在如下风险&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露&#xff0c;不符合国家等级保护的要求。短信盗刷带来的拒绝服务风险 &#xff0c;造成用户无法登陆、注册&#xff0c;大量收到垃圾短信的…...

AI赋能股票:流通股本与总股本:定义、区别及投资意义解析

一、基本定义 总股本&#xff08;Total Shares Outstanding&#xff09; 指一家公司已发行的所有股票数量&#xff0c;包括流通股和非流通股&#xff08;如限售股、员工持股计划股票等&#xff09;。总股本反映公司的整体股权结构&#xff0c;是计算市值&#xff08;总股本 股…...

如何在Windows上找到Python安装路径?两种方法快速定位

原文&#xff1a;如何在Windows上找到Python安装路径&#xff1f;两种方法快速定位 | w3cschool笔记 在 Windows 系统上找到 Python 的安装路径对于设置环境变量或排查问题非常重要。本文将介绍两种方法&#xff0c;帮助你找到 Python 的安装路径&#xff1a;一种是通过命令提…...

第五课:高清修复和放大算法

文章目录 Part.01 高清修复(Hi-Res Fix)Part.02 SD放大(SD Upscale)Part.03 附加功能放大Part.01 高清修复(Hi-Res Fix) 文生图中的高清修复/高分辨率修复/超分辨率修复先低分辨率抽卡,再高分辨率修复。不能突破显存限制放大重绘幅度安全范围是0.3-0.5,如果想让AI更有想象力0…...

lvgl避坑记录

一、log调试 #if LV_USE_LOG && LV_LOG_LEVEL > LV_LOG_LEVEL_INFOswitch(src_type) {case LV_IMG_SRC_FILE:LV_LOG_TRACE("lv_img_set_src: LV_IMG_SRC_FILE type found");break;case LV_IMG_SRC_VARIABLE:LV_LOG_TRACE("lv_img_set_src: LV_IMG_S…...