[每周一更]-(第133期):Go中MapReduce架构思想的使用场景

文章目录
- **MapReduce 工作流程**
- Go 中使用 MapReduce 的实现方式:
- **Go MapReduce 的特点**
- **哪些场景适合使用 MapReduce?**
- 使用场景
- 1. 数据聚合
- 2. 数据过滤
- 3. 数据排序
- 4. 数据转换
- 5. 数据去重
- 6. 数据分组
- 7. 数据统计
- 8.**统计文本中单词出现次数**
- **代码实现**
- MapReduce vs. 扇入/扇出
- 示例1:爬取多个网页
- 示例2:多个 goroutine 计算结果,并聚合
- 参考
- 注意事项
新年开工,2025重新出发
为什么需要 MapReduce
在 Go 中,虽然没有内置的 MapReduce 框架,但我们可以利用 Go 的并发特性(如 goroutines 和 channels)来实现 MapReduce。
在 Go 语言中,MapReduce 是一种编程模型,用于处理和生成大规模数据集。它将任务分解为两个主要阶段:Map(映射)和 Reduce(归约),并通过并行处理提高效率。MapReduce 模型最初由 Google 提出,广泛应用于大数据处理、分布式计算等领域。
它的核心思想是将问题分解成多个较小的子问题并行处理,然后将结果合并。MapReduce 分为两个主要步骤:
- Map 阶段:将输入数据映射到中间结果。这个阶段将输入数据拆分成小块,分配给不同的处理单元,并对每个数据项应用一个映射函数。
- Reduce 阶段:将 Map 阶段的中间结果进行合并。通常是通过聚合或汇总中间结果,生成最终输出。
MapReduce 工作流程
- 输入数据:将大规模数据分成多个小块。
- Map(映射):对数据进行并行处理,并生成中间结果。
- Shuffle(洗牌,可选):对中间结果进行归类,按 key 组织数据。
- Reduce(归约):合并和处理 Map 阶段的中间结果,得出最终结果。
Go 中使用 MapReduce 的实现方式:
Go 提供了 goroutine 和 channel,这使得它非常适合实现并行计算的场景。一个简单的 Go 实现通常会使用以下步骤:
- Map:通过 goroutine 处理每个数据块。
- Shuffle(可选):将中间结果通过 channel 或其他方式传递到 Reduce 阶段。
- Reduce:聚合结果,得到最终输出。
通过 Go 的并发模型,可以利用多个 CPU 核心实现 MapReduce 的并行计算。
Go MapReduce 的特点
- 高并发:
- 通过
goroutine并行执行 Map 和 Reduce 操作,提升计算效率。 - Go 的
goroutine轻量级,支持大规模并发执行 Map 任务,不会像 Java 线程那样占用大量内存。
- 通过
- 无锁数据传输:
channel作为数据流通管道,避免手动加锁,提高代码可读性和安全性。- Go 提供了
sync.WaitGroup、sync.Map等并发工具,可以更简单地管理 MapReduce 任务。
- 适用于大规模数据处理:
- 适合处理日志分析、数据聚合、分布式计算等任务。
哪些场景适合使用 MapReduce?
| 场景 | Map 阶段 | Reduce 阶段 |
|---|---|---|
| 日志分析 | 读取大量日志,提取关键字段 | 统计访问次数、错误率等 |
| 搜索引擎索引 | 解析网页,提取关键词 | 统计关键词出现次数 |
| 基因数据分析 | 解析 DNA 序列,计算某个基因的出现频率 | 归并统计结果,得出全局基因分布 |
| 机器学习 | 计算训练数据的特征 | 训练模型,计算最终的回归参数 |
| 推荐系统 | 计算用户的浏览、点击数据 | 归并计算得到推荐结果 |
| 并行图像处理 | 处理图像的每个区域 | 合并所有区域结果,生成完整图像 |
常见使用场景:
-
大规模数据处理: MapReduce 适用于批量处理大量数据,例如日志分析。
-
并发数据处理: 在需要并发处理的场景中,例如查询数据库,MapReduce 可以将任务拆分成并发请求,从而减少处理时间并提高性能。处理结果可以被聚合起来。
-
分布式数据处理和合并: MapReduce 用于以分布式方式处理和合并数据。大型数据集被分成较小的部分,由不同的机器或线程处理,然后合并。
使用场景
1. 数据聚合
场景:统计日志文件中不同状态码的出现次数。
拆解:
- Map阶段:读取日志文件,提取状态码,生成键值对(状态码, 1)。
- Reduce阶段:汇总相同状态码的计数,生成最终结果(状态码, 总次数)。
func mapFunc(line string) map[string]int {parts := strings.Split(line, " ")statusCode := parts[8] // 假设状态码在第9个字段return map[string]int{statusCode: 1}
}func reduceFunc(statusCode string, counts []int) int {return sum(counts)
}
2. 数据过滤
场景:从大量数据中筛选出符合特定条件的记录。
拆解:
- Map阶段:检查每条记录是否满足条件,满足则输出(记录, 1)。
- Reduce阶段:汇总符合条件的记录。
func mapFunc(record Record) map[Record]int {if record.Age > 30 {return map[Record]int{record: 1}}return nil
}func reduceFunc(record Record, counts []int) Record {return record
}
3. 数据排序
场景:对大规模数据集进行排序。
拆解:
- Map阶段:将数据分片并局部排序。
- Reduce阶段:合并各分片的排序结果。
func mapFunc(data []int) []int {sort.Ints(data)return data
}func reduceFunc(sortedSlices [][]int) []int {return mergeSortedSlices(sortedSlices)
}
4. 数据转换
场景:将数据从一种格式转换为另一种格式。
拆解:
- Map阶段:将原始数据转换为目标格式。
- Reduce阶段:合并转换后的数据。
func mapFunc(input InputType) OutputType {return transform(input)
}func reduceFunc(outputs []OutputType) OutputType {return combine(outputs)
}
5. 数据去重
场景:去除数据集中的重复记录。
拆解:
- Map阶段:将每条记录作为键输出(记录, 1)。
- Reduce阶段:合并相同记录,输出唯一记录。
func mapFunc(record Record) map[Record]int {return map[Record]int{record: 1}
}func reduceFunc(record Record, counts []int) Record {return record
}
6. 数据分组
场景:按某个字段对数据进行分组。
拆解:
- Map阶段:根据分组字段生成键值对(分组字段, 记录)。
- Reduce阶段:将相同分组字段的记录合并。
func mapFunc(record Record) map[string]Record {return map[string]Record{record.GroupField: record}
}func reduceFunc(groupField string, records []Record) []Record {return records
}
7. 数据统计
场景:计算数据集的平均值、最大值、最小值等统计信息。
拆解:
- Map阶段:计算局部统计信息。
- Reduce阶段:合并局部统计信息,生成全局统计结果。
func mapFunc(data []int) Stat {return calculateLocalStat(data)
}func reduceFunc(stats []Stat) Stat {return combineStats(stats)
}
8.统计文本中单词出现次数
- 同步 Map 阶段:
- 通过
sync.WaitGroup确保所有mapFunction任务完成后才关闭mapChannel,避免 Reduce 过早读取导致数据丢失。
- 通过
- 使用
go func()异步关闭 channel:mapWG.Wait()结束后,关闭mapChannel,确保 Reduce 读取完整数据。
- Reduce 处理改进:
reduceFunction直接从 channel 读取数据,并合并为最终的map[string]int结果。
代码实现
package mainimport ("fmt""strings""sync"
)// Map 阶段:统计部分数据中的单词频率
func mapFunction(text string, out chan<- map[string]int, wg *sync.WaitGroup) {defer wg.Done()wordCount := make(map[string]int)words := strings.Fields(text)for _, word := range words {wordCount[word]++}out <- wordCount
}// Reduce 阶段:合并多个 map 结果
func reduceFunction(in <-chan map[string]int) map[string]int {result := make(map[string]int)for partialMap := range in {for word, count := range partialMap {result[word] += count}}return result
}func main() {// 输入数据texts := []string{"hello world","go is great","hello go","map reduce in go","go go go",}// 创建 channel 传输 map 结果mapChannel := make(chan map[string]int, len(texts))var mapWG sync.WaitGroup// 启动多个 Map 任务for _, text := range texts {mapWG.Add(1)go mapFunction(text, mapChannel, &mapWG)}// 确保所有 map 任务完成后再关闭 channelgo func() {mapWG.Wait()close(mapChannel)}()// Reduce 阶段:合并 map 结果result := reduceFunction(mapChannel)// 输出最终结果fmt.Println("Word Count Result:", result)
}
MapReduce vs. 扇入/扇出
历史文章:[每周一更]-(第24期):Go的并发模型,提到过Go 并发模式:扇入、扇出,这里简单对比一下
MapReduce 和 Go 的 扇入(Fan-in)/扇出(Fan-out) 在并发模型上是类似的,但它们的侧重点和应用场景有所不同。
-
如果只是单机并发任务(如 API 调用、爬虫),用 扇入/扇出
-
如果要处理大数据(如日志分析、搜索索引),用 MapReduce
| 特性 | MapReduce | 扇入(Fan-in)/扇出(Fan-out) |
|---|---|---|
| 核心思想 | 拆分任务并行计算,再归并结果 | 并行处理任务,聚合结果到一个 channel |
| Map 阶段 / 扇出 | 并发执行多个子任务 | 启动多个 goroutine 处理任务 |
| Reduce 阶段 / 扇入 | 归并多个子任务的结果 | 读取多个 goroutine 结果并处理 |
| 数据流动方式 | Map → Reduce | 多个 goroutine → 单个 channel |
| 适用场景 | 大规模数据计算(如日志分析、搜索引擎索引) | 并发任务管理(如爬虫、API 并发请求) |
| 是否涉及分布式 | 适用于分布式计算 | 主要用于单机并发任务 |
示例1:爬取多个网页
package mainimport ("fmt""net/http""sync"
)var urls = []string{"https://golang.org","https://go.dev","https://gophercises.com",
}// 扇出:启动多个 goroutine 并发爬取网页
func fetch(url string, wg *sync.WaitGroup) {defer wg.Done()resp, err := http.Get(url)if err != nil {fmt.Println("Error:", err)return}fmt.Println("Fetched:", url, "Status:", resp.Status)
}func main() {var wg sync.WaitGroupfor _, url := range urls {wg.Add(1)go fetch(url, &wg)}wg.Wait()fmt.Println("All requests finished!")
}
示例2:多个 goroutine 计算结果,并聚合
package mainimport ("fmt""sync"
)func worker(id int, out chan<- int, wg *sync.WaitGroup) {defer wg.Done()out <- id * id // 计算平方并发送
}func main() {out := make(chan int, 5)var wg sync.WaitGroup// 扇出:启动多个 goroutinefor i := 1; i <= 5; i++ {wg.Add(1)go worker(i, out, &wg)}// 等待所有任务完成后关闭 channelgo func() {wg.Wait()close(out)}()// 扇入:聚合所有 goroutine 的结果sum := 0for result := range out {sum += result}fmt.Println("Total Sum:", sum) // 计算最终结果
}
参考
- go-zero中介绍MapReduce使用场景:
- 介绍原理:go-zero/core/mr/readme-cn.md at master · zeromicro/go-zero
- 示例:zero-examples/mapreduce at main · zeromicro/zero-examples
注意事项
- 数据并行性: MapReduce适合数据并行处理的任务,即任务可以分解为多个独立的子任务。
- 数据规模: 对于小规模数据,MapReduce可能引入不必要的开销,应根据数据规模选择合适的处理方式。
- 实时性要求: MapReduce不适合实时处理要求很高的任务,因为它通常用于批处理。
相关文章:
[每周一更]-(第133期):Go中MapReduce架构思想的使用场景
文章目录 **MapReduce 工作流程**Go 中使用 MapReduce 的实现方式:**Go MapReduce 的特点****哪些场景适合使用 MapReduce?**使用场景1. 数据聚合2. 数据过滤3. 数据排序4. 数据转换5. 数据去重6. 数据分组7. 数据统计8.**统计文本中单词出现次数****代码…...
QML初识
目录 一、关于QML 二、布局定位和锚点 1.布局定位 2.锚点详解 三、数据绑定 1.基本概念 2.绑定方法 3.数据模型绑定 四、附加属性及信号 1.附加属性 2.信号 一、关于QML QML是Qt框架中的一种声明式编程语言,用于描述用户界面的外观和行为;Qu…...
查询已经运行的 Docker 容器启动命令
一、导语 使用 get_command_4_run_container 查询 docker 容器的启动命令 获取镜像 docker pull cucker/get_command_4_run_container 查看容器命令 docker run --rm -v /var/run/docker.sock:/var/run/docker.sock cucker/get_command_4_run_container 容器id或容器名 …...
项目管理中的13个数据分析思维
01 信度与效度思维 信度:是指一个数据或指标自身的可靠程度,包括准确性和稳定性。 效度:是指一个数据或指标的生成,需贴合它所要衡量的事物,即指标的变化能够代表该事物的变化。 在项目管理中,信度和效度的…...
快速查看ROS节点的CPU和内存占用情况
他们可能是在排查资源泄漏的问题,所以需要监控节点的CPU和内存使用情况。可能他们遇到了节点占用过多资源导致服务器崩溃的情况,需要快速定位问题节点。现有的Linux命令方面,top和htop可以实时查看进程资源使用,但用户想要的是针对ROS节点的,可能需要更针对性的工具。ROS本…...
Centos Stream 10 根目录下的文件夹结构
/ ├── bin -> usr/bin ├── boot ├── dev ├── etc ├── home ├── lib -> usr/lib ├── lib64 -> usr/lib64 ├── lostfound ├── media ├── mnt ├── opt ├── proc ├── root ├── run ├── sbin -> usr/sbin ├── srv ├─…...
协议_CAN协议
物理层特征 信号传输原理: CAN控制器根据CAN_L和CAN_H上的电位差来判断总线电平,总线电平分为显性电平(CAN_H与CAN_L压差 2v)、隐性电平(CAN_H与CAN_L压差 0v),发送方通过总线电平的变化&am…...
nuxt3中报错: `setInterval` should not be used on the server.
那是因为在后端渲染没有浏览器的执行环境,一些浏览器环境提供的对象和方法都无法使用,代码判断下就行。 if (import.meta.client) {setInterval(() > {}, 1000) }Import meta Nuxt API...
leetcode_深度搜索和广度搜索 101. 对称二叉树
101. 对称二叉树 给你一个二叉树的根节点 root , 检查它是否轴对称思路: 1.判断根节点的左右子树是否为空, 若都为空则返回True2.根节点的左右子树其中之一为空或子树的根节点的值不同则返回False3.分别判断根节点左右子树是否相同, 判断时, 左边子树的左节点要对应…...
QT修仙之路2-2 对话框 尚欠火候
警告对话框 相关代码 错误对话框 相关代码 消息对话框 相关代码 询问对话框 相关代码 相关代码 警告对话框 QMessageBox::warning(this,"错误","账号密码不能为空",QMessageBox::Ok);错误对话框 QMessageBox msgBox(QMessageBox::Critical,"错误…...
NFT Insider #168:The Sandbox 推出新春{金蛇礼服}套装;胖企鹅合作 LINE Minini
引言:NFT Insider 由 NFT 收藏组织 WHALE Members、BeepCrypto 联合出品, 浓缩每周 NFT 新闻,为大家带来关于 NFT 最全面、最新鲜、最有价值的讯息。每期周报将从 NFT 市场数据,艺术新闻类,游戏新闻类,虚拟…...
ZooKeeper 技术全解:概念、功能、文件系统与主从同步
引言 随着分布式系统变得越来越复杂,对协调服务的需求也在不断增长。ZooKeeper 作为一个由 Apache 维护的开源分布式协调服务框架,广泛用于 Hadoop 生态系统和其他需要协调的分布式环境中。这一系统旨在解决分布式应用中常见的挑战,如配置管…...
什么是deepseek?
AI国产免费开源强大 DeepSeek 是由国内团队开发的一款开源人工智能工具库,专注于提供高效易用的 AI 模型训练与推理能力。它既包含预训练大语言模型(如 DeepSeek-R1 系列),也提供配套工具链,助力开发者快速实现 AI 应用…...
容器服务基础
1.腾讯云容器服务 使用该服务,开发者将无需安装、运维、扩展您的集群管理基础设施,只需进行简单的API调用,便可启动和停止 Docker 应用程序,查询集群的完整状态,以及使用各种云服务。 创建集群--创建工作负载/创建ingr…...
C++基础知识(二)之数据类型、指针和内存、数组
六、C数据类型 1、sizeof运算符 sizeof运算符用于求数据类型或变量占用的内存空间。 用于数据类型:sizeof(数据类型) 用于变量:sizeof(变量名) 或 sizeof 变量名 注意: 在32位和64位操作系统中,同一种数据类型占用的内存空间…...
LLMs之DeepSeek r1:Logic-RL的简介、安装和使用方法、案例应用之详细攻略
LLMs之DeepSeek r1:Logic-RL的简介、安装和使用方法、案例应用之详细攻略 目录 Logic-RL的简介 1、Logic-RL的特点 2、性能 Logic-RL 的安装和使用方法 1、安装 2、使用方法 数据准备 基础模型 指令模型 训练执行 实现细节 Logic-RL的案例应用 Logic-RL…...
AUTOSAR汽车电子嵌入式编程精讲300篇-基于FPGA的CAN FD汽车总线数据交互系统设计
目录 前言 汽车总线以及发展趋势 汽车总线技术 汽车总线发展趋势 CAN FD总线国内外研究现状 2 系统方案及CAN FD协议分析 2.1系统控制方案设计 2.2 CAN FD总线帧结构分析 2.2.1数据帧分析 2.2.2远程帧分析 2.2.3过载帧分析 2.2.4错误帧分析 2.2.5帧间隔分析 2.3位…...
【神经网络框架】非局部神经网络
一、非局部操作的数学定义与理论框架 1.1 非局部操作的通用公式 非局部操作(Non-local Operation)是该研究的核心创新点,其数学定义源自经典计算机视觉中的非局部均值算法(Non-local Means)。在深度神经网络中,非局部操作被形式化为: 其中: 1.2 与传统操作的对比分析…...
22.[前端开发]Day22-CSS单位-CSS预处理器-移动端视口
1 CSS常见单位详解 CSS中的单位 CSS中的绝对单位( Absolute length units ) CSS中的相对单位( Relative length units ) 1.em: 相对自己的font-size;如果自己没有设置, 那么会继承父元素的font-size 2.如果font-size中…...
深入讲解MyBatis
1. MyBatis 的背景和优势 背景:在 Java 开发中,传统的 JDBC 操作数据库代码繁琐,需要手动管理数据库连接、编写 SQL 语句、处理结果集等,开发效率低且容易出错。MyBatis 应运而生,它通过将 SQL 语句与 Java 代码分离&a…...
URL调用本地Ollama模型
curl http://192.168.2.247:11434/api/generate -d "{ \"model\": \"deepseek-r1:8b\", \"prompt\": \"Who r u?\" ,\"stream\":false}" 连续对话...
DeepSeek和ChatGPT的优劣或者区别(答案来DeepSeek和ChatGPT)
DeepSeek的答案 DeepSeek与ChatGPT作为当前两大主流AI模型,在架构设计、性能表现、应用场景等方面存在显著差异,以下从多个维度进行对比分析: 一、架构与训练效率 架构设计 DeepSeek:采用混合专家(MoE)框架…...
【python】matplotlib(animation)
文章目录 1、matplotlib.animation1.1、FuncAnimation1.2、修改 matplotlib 背景 2、matplotlib imageio2.1、折线图2.2、条形图2.3、散点图 3、参考 1、matplotlib.animation 1.1、FuncAnimation matplotlib.animation.FuncAnimation 是 Matplotlib 库中用于创建动画的一个…...
ubuntu24.04安装布置ros
最近换电脑布置机器人环境,下了24.04,但是网上的都不太合适,于是自己试着布置好了,留作有需要的人一起看看。 文章目录 目录 前言 一、确认 ROS 发行版名称 二、检查你的 Ubuntu 版本 三、安装正确的 ROS 发行版 四、对于Ubuntu24…...
Vue Router 导航方式详解:声明式导航与编程式导航
Vue Router 是 Vue.js 官方推荐的路由管理器,提供了两种主要的导航方式:声明式导航和编程式导航。这两种方式各有特点,适用于不同的场景。本文将详细介绍它们的用法、区别以及底层实现原理。 1. 声明式导航 1.1 什么是声明式导航?…...
【RabbitMQ重试】重试三次转入死信队列
以下是基于RabbitMQ死信队列实现消息重试三次后转存的技术方案: 方案设计要点 队列定义改造(核心参数配置) Bean public Queue auditQueue() {Map<String, Object> args new HashMap<>();args.put("x-dead-letter-exchan…...
接入 deepseek 实现AI智能问诊
1. 准备工作 注册 DeepSeek 账号 前往 DeepSeek 官网 注册账号并获取 API Key。 创建 UniApp 项目 使用 HBuilderX 创建一个新的 UniApp 项目(选择 Vue3 或 Vue2 模板)。 安装依赖 如果需要在 UniApp 中使用 HTTP 请求,推荐使用 uni.requ…...
网络爬虫js逆向之异步栈跟栈案例
【注意!!!】 前言: 1. 本章主要讲解js逆向之异步栈跟栈的知识(通过单步执行调试) 2. 使用关键字搜定位加密入口 3. 本专栏通过多篇文章【文字案例】的形式系统化进行描述 4. 本文章全文进行了脱敏处理 5. 详…...
机器学习 - 需要了解的条件概率、高斯分布、似然函数
似然函数是连接数据与参数的桥梁,通过“数据反推参数”的逆向思维,成为统计推断的核心工具。理解它的关键在于区分“参数固定时数据的概率”与“数据固定时参数的合理性”,这种视角转换是掌握现代统计学和机器学习的基础。 一、在学习似然函…...
string 与 wstring 的字符编码
测试代码: #include<stdio.h> #include<stdlib.h> #include<windows.h> #include <locale.h> #include <string> #include <iostream>// 函数用于计算UTF-8字符串中的字符数 int utf8_strlen(const char* str) {int len = 0;for (; *s…...
