【hadoop】hadoop streaming
API:
https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html(hadoop3)
https://cwiki.apache.org/confluence/display/HADOOP2/HadoopStreaming(hadoop2)
hadoop version查看hadoop版本,下面的测试基于hadoop2环境。
文章目录
- 1.介绍
- 2.Demo
- 3.map任务获取数据来源
- 4.作业提交主要参数解释
- 1)输入输出
- 2)资源分发
- 3) 作业启动管理
- 4)分区规则
- 5)压缩
- 5.仅map作业
1.介绍

hadoop streaming可以将任何可执行的文件,如sh脚本,py脚本,嵌入到分布式环境当中执行MR的逻辑,而不必局限于java语言。
hadoop会在每个启动的任务进程中初始化指定的map或reduce脚本并执行,map或reduce任务通过标准输入流读取数据,标准输出流写出数据。
需要注意的是streaming任务默认在map端数据shuffle到reduce端时没有本地合并的过程,也就是MR任务中在map=>reduce过程中,传输的数据格式:key: [value1, value2, value3] ,是key+一个value构成的迭代器。而streaming任务中,map输出的相同key的数据只会被依次相邻的送到同一个reduce,key1: value1; key2: value2这样。
2.Demo
对于下面的数据:
(base) map@gzdt-map-poi-yingxiang-offline04 test_streaming$ cat test.dat
1 小王
2 小李
1 张三
3 李四
执行下面命令:
#!/bin/bash
set -e -x -u -o pipefailhadoop streaming \-input afs://xxx/test.dat \-output afs://xxx/test.res \-jobconf hadoop.job.ugi="xxx,xxx" \-jobconf mapred.job.queue.name="xxx" \-jobconf mapred.job.tracker="xxx" \-inputformat org.apache.hadoop.mapred.TextInputFormat \-mapper cat \-reducer wc
产出的结果文件:
(base) map@gzdt-map-poi-yingxiang-offline04 test_streaming$ cat test.res2 4 181 2 91 2 9
wc是Unix/Linux中的一个命令行工具,用于计算给定文件中的字节数、字数和行数。
wc命令的输出有三个或四个数字:
第一个数字是文件中的行数。
第二个数字是文件中的词数。这里的"词"是指由空格、制表符或换行符分隔的字符串。
第三个数字是文件中的字节数。这是文件的大小,不是字符数。如果文件中包含多字节字符(如UTF-8编码的非ASCII字符),字节数会大于字符数。
第四个字段是输入文件的名称。
如果只输入wc命令而不带任何文件名,wc会从标准输入读取数据,此时不会显示文件名。
3.map任务获取数据来源
一个streaming job中可以指定多个不同的输入路径,不同路径的数据可能会需要不同的处理方式,所以map任务中区分当前数据的来源非常重要。
hadoop在任务启动时会预置一些属性作为进程级别的环境变量:

圈起来的这个属性在map任务中可以用来获取当前map任务处理的文件块的名字,类似于hdfs://namenode:port/file_path/block_name。

注意点:
1)During the execution of a streaming job, the names of the “mapreduce” parameters are transformed. The dots ( . ) become underscores ( _ ),上面截图中的mapreduce参数值在程序中获取时,需要将.转换为_,也就是mapreduce.map.input.file在程序中通过mapreduce_map_input_file获取。
2)python中该参数被简化,mapreduce_map_input_file写为map_input_file。
下面是一个参考的demo:
import sys
import os
import io# 强制stdout以UTF-8编码输出(python3默认,但最终编码方式受限系统环境)
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')# 当前数据块的全路径
path = os.environ['map_input_file']# path是block的全路径,所以判断全路径包含哪个表名即可
if 'tb1_name' in path:for line in sys.stdin.buffer:decoded_line = line.decode('utf-8').strip('\n')items = decoded_line.split('\t')key = items[0]value = items[1:]print('\t'.join([key, 'tb1_name', value])) # reduce中通过第二个标志位标识数据来源于哪个表
elif 'tb2_name' in path:for line in sys.stdin.buffer:decoded_line = line.decode('gb18030').strip('\n')items = decoded_line.split('\t')key = items[0]value = items[1:]print('\t'.join([key, 'tb2_name', value]))
else:for line in sys.stdin.buffer:decoded_line = line.decode('utf-8').strip('\n')items = decoded_line.split('\t')key = items[0]value = items[1:]print('\t'.join([key, 'other', value]))
reduce处理:
import sysdef process(datas):passif __name__ == '__main__':key = ''datas = []for line in sys.stdin:items = line.strip('\n').split('\t')if key == '' or items[0] == key: # 相同bidkey = items[0]datas.append(items)else:process(datas) # 处理上一批biddatas.clear() # 清空key = items[0]datas.append(items) # 添加当前记录if len(datas) > 0: # 处理最后一批bidprocess(datas)
4.作业提交主要参数解释
1)输入输出
-input:输入文件路径,可以有多个。
-output:输出文件路径。
-inputformat:负责定义如何读取输入数据,并决定如何将数据分割成多个块,每个块由一个map任务处理。默认org.apache.hadoop.mapred.TextInputFormat普通文本读取。
2)资源分发
-file:将本地文件分发到hadoop集群的所有MR任务的当前工作目录(task级别),使任务脚本能够访问这些文件。当前工作目录表明每启动一个map或reduce任务都会拷贝一份副本。常用来分发map和reduce的任务脚本以及一些资源文件,如模型权重文件。模型资源文件和任务脚本处于同一个目录,任务脚本中可以直接./xxx相对路径读取资源文件。
-cacheArchive:将归档文件分发到所有任务节点(worker级别)并自动解压,节点上的MR任务启动时不会拷贝副本到自己的工作目录,会通过符号链接共享同一个节点上的文件。适合用来分发一些比较大的文件,如python环境包。
eg:
-cacheArchive $HADOOP_PYTHON3#python3,如果hdfs上python包解压缩后的结构为python3/bin/python3,则在mapper和reducer参数中指定python环境时-mapper ./python3/python3/bin/python3 mapper.py,关于python解释器层级路径的解释:#python3,会在当前task工作目录中创建一个名为python3的文件夹,并将python环境的tar包拉到这个目录下解压。
3) 作业启动管理
-mapper: 指定mapper脚本。
-reducer: 指定reducer脚本。
-jobconf mapred.map.tasks:map任务数量,不严格依赖指定,跟输入数据的分块有关。
-jobconf mapred.reduce.tasks:reduce任务数量,等于任务最终输出包含的文件个数。
-jobconf hadoop.job.ugi:提交作业用户的身份,主要影响作业对HDFS或YARN的访问权限。
-jobconf mapred.job.queue.name:资源队列。
-jobconf mapred.job.priority:任务优先级。
4)分区规则
-partitioner:分区规则,org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner基于指定序号的字段为分区键。
-jobconf stream.num.map.output.key.fields:指定map输出的key,如-jobconf stream.num.map.output.key.fields=1指定按照map输出的第一个字段作为key。
-jobconf num.key.fields.for.partition:决定如何分区,默认按照完整的key,即上面参数配置。如果上面配置为符合key,该参数可以指定按照key中的部分字段分区。也就是说,如果配置了这个参数,指定的字段决定了map输出送往哪个reduce,而上面配置的key决定了送到同一个reduce的数据的先后顺序。(map在shuffle数据到reduce之前会对数据按照key排序)
5)压缩
-jobconf mapred.output.compress:输出结果是否开启压缩,true、false。
-jobconf mapred.output.compression.codec:指定压缩编解码器,Gzip压缩org.apache.hadoop.io.compress.GzipCodec。
-jobconf mapred.output.compression.type:压缩类型,=BLOCK块压缩,适合列式存储,=RECORD记录压缩,适合行式存储。
5.仅map作业
https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html#Specifying_Map-Only_Jobs
在hadoop3版本中指定参数-D mapreduce.job.reduces=0,hadoop2版本中指定-jobconf mapred.reduce.tasks=0。
-reducer参数不可省,即使不需要reduce任务,否则实际测试中会报错如下。可以随意指定,例如-reducer cat。

相关文章:
【hadoop】hadoop streaming
API: https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html(hadoop3) https://cwiki.apache.org/confluence/display/HADOOP2/HadoopStreaming(hadoop2) hadoop version查看hadoop版本&#…...
Unity-RectTransform设置UI width
不知道有没人需要这样的代码,就是.sizeDelta //不确定是不是英文翻译的原因,基本很难理解,sizeDeltaSize,//未必完全正确,但这么写好像总没错过 //image 在一个UnityEngine.UI.Image 的数组内foreach (var image in l…...
开发中后端返回下划线数据,要不要统一转驼峰?
先说结论。看情况!!!! 前端 主要用 JS/TS 建议后端返回 camelCase,减少前端转换成本。后端 主要是 Python/Go 建议保持 snake_case,前端做转换。但是团队统一风格最重要!如果统一返回驼峰就驼峰…...
【现代深度学习技术】现代卷积神经网络04:含并行连接的网络(GoogLeNet)
【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈PyTorch深度学习 ⌋ ⌋ ⌋ 深度学习 (DL, Deep Learning) 特指基于深层神经网络模型和方法的机器学习。它是在统计机器学习、人工神经网络等算法模型基础上,结合当代大数据和大算力的发展而发展出来的。深度学习最重…...
链表-LeetCode
这里写目录标题 1 排序链表1.1 插入法 O(n)1.2 归并排序 1 排序链表 1.1 插入法 O(n) /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullpt…...
TypeScript 与 JavaScript 对比
核心概念对比 JavaScript 语言类型:动态类型脚本语言诞生时间:1995年(ES1标准)类型系统:运行时类型检查文件扩展名:.js编译需求:无需编译,直接执行 TypeScript 语言类型…...
Selenium之Web Driver常用属性
Web Driver常用属性 在上一篇文章里我们安装并且使用了selenium来操控浏览器;这一节我们来看一下Driver的一些常用属性;可以方便和浏览器进行交互 废话不多说,下面以实践为主 获取浏览器名称 browser_name browser.name print(browser_n…...
EF Core 执行原生SQL语句
文章目录 前言一、执行查询(返回数据)1) 使用 FromSqlRaw或 FromSqlInterpolated 方法,适用于 DbSet<T>,返回实体集合。2)结合 LINQ 查询 二、执行非查询操作(增删改)1&#x…...
新版 eslintrc 文件弃用 .eslintignore已弃用 替代方案
1.进入eslint.config.mjs文件 2.import { defineConfig, globalIgnores } from "eslint/config"; 引入globalIgnores 3.配置 defineConfig([ ... globalIgnores([ "config/*", ".husky", ".local", "public/*", ".…...
Python二分查找【清晰易懂】
1. 二分查找是什么? 想象你在玩“猜数字”游戏: 对方心里想一个 1~100 的数字,你每次猜一个数,对方会告诉你是“大了”还是“小了”。 最快的方法:每次都猜中间的数!比如第一次猜50,如果大了&…...
Azure SDK 使用指南
Azure SDK(软件开发工具包)是一组由微软提供的工具和库,旨在帮助开发者以多种编程语言(如 .NET、Java、Python、JavaScript 等)与 Azure 服务进行交互。 通过使用 Azure SDK,开发者可以更高效地构建、部…...
【STL】vector介绍(附部分接口模拟实现)
文章目录 1.介绍2.使用2.1 vector的构造2.2 vector空间相关接口2.2.1 size()2.2.2 capacity()2.2.3 empty()2.2.4 resize()2.2.5 reserve() 2.3 vector的增删查改2.3.1 push_back()2.3.2 insert()2.3.3 pop_back()2.3.4 erase()2.3.5 swap()2.3.6 operator[]注:关于…...
一周掌握Flutter开发--8. 调试与性能优化(上)
文章目录 8. 调试与性能优化核心技能8.1 使用 Flutter DevTools 分析性能8.2 检查 Widget 重绘(debugPaintSizeEnabled)8.3 解决 ListView 卡顿(ListView.builder itemExtent) 其他性能优化技巧8.4 减少 build 方法的调用8.5 使用…...
游戏引擎学习第182天
回顾和今天的计划 昨天的进展令人惊喜,原本的调试系统已经被一个新的系统完全替换,新系统不仅能完成原有的所有功能,还能捕获完整的调试信息,包括时间戳等关键数据。这次的替换非常顺利,效果很好。 今天的重点是在此基…...
2025计算机毕设全流程实战指南:Java/Python+协同过滤+小程序开发避坑手册
技术框架的选择是项目开发的关键起点,直接影响开发效率和最终成果质量。然而,许多开发者在选择技术框架时面临困难:现有知识储备不足以支撑复杂项目需求,团队经验有限,框架选择缺乏前瞻性常导致后期问题。尽管技术框架…...
C语言_数据结构_二叉树
【本节目标】 树的概念及结构 二叉树的概念及结构 二叉树的顺序结构及实现 二叉树的链式结构及实现 1. 树的概念及结构 1.1 树的概念 树是一种非线性的数据结构,它是由n(n>0)个有限结点组成一个具有层次关系的集合。把它叫做树是因为…...
Compare全目录文件比较内容(项目中用到过)
第一步:找到“会话”——“会话设置” 会话设置弹框信息 第二步:选择“比较”tab标签 比较内容:选中二进制比较 第三步:选中所有文件 第四步:右键选中“比较内容” 第五步:选中“基于规则的比较”...
3.26[a]paracompute homework
5555 负载不平衡指多个线程的计算量差异显著,导致部分线程空转或等待,降低并行效率。其核心矛盾在于任务划分的静态性与计算动态性不匹配,尤其在处理不规则数据或动态任务时尤为突出。以稀疏矩阵的向量乘法为例,假设其非零元素分…...
视觉大模型CLIP论文精读
论文:Learning Transferable Visual Models From Natural Language Supervision 代码:https://github.com/openai/CLIP 摘要 最先进的计算机视觉系统是针对预测一组固定的、预先确定的对象类别进行训练的。这种受限的监督形式限制了它们的通用性和可用…...
【AI】Orin NX+ubuntu22.04上移植YoloV11,并使用DeepStream测试成功
【AI】郭老二博文之:AI学习目录汇总 1、烧写系统 新到的开发板,已经烧写好Ubuntu系统,版本为22.04。 如果没有升级到Ubuntu22.04,可以在电脑Ubuntu系统中使用SDKManager来烧写Ubuntu系统,网络情况好的话,也可以直接将CUDA、cuDNN、TensorRT、Deepstream等也安装上。 2…...
HTML文档流
1. 基础定义 “文档流(Normal Flow)是指HTML元素在页面中默认的排列方式。在标准文档流中,块级元素会从上到下垂直排列,每个元素占据一整行;而行内元素则从左到右水平排列,直到空间不足才会换行。” 2. 详细解释 可以进一步展开…...
链表的创建:头插法与尾插法详解(数据结构)
C 链表的创建:头插法与尾插法详解 链表(Linked List)是一种重要的数据结构,适用于插入和删除操作频繁的场景。本文介绍 两种常见的链表构建方法: 尾插法(Append / Tail Insertion):…...
MyBatis中mapper.xml 的sql映射规则
一、SQL 映射文件核心元素 MyBatis 映射文件的顶级元素(按定义顺序): cache:命名空间的缓存配置。cache-ref:引用其他命名空间的缓存。resultMap:自定义结果集映射。sql:可重用的 SQL 片段。i…...
深入解析 Java 类加载机制及双亲委派模型
🔍 Java的类加载机制是确保应用程序正确运行的基础,特别是双亲委派模型,它通过父类加载器逐层加载类,避免冲突和重复加载。但在某些特殊场景下,破坏双亲委派模型会带来意想不到的效果。本文将深入解析Java类加载机制、…...
糖尿病大模型预测及临床应用研究智能管理系统技术文档
目录 1. 数据工程规范1.1 多源数据集成1.2 特征工程架构 2. 核心模型架构2.1 分层预测网络2.2 动态血糖预测模块 3. 实时决策系统3.1 术中预警协议3.2 麻醉方案优化器 4. 验证体系实现4.1 数字孪生验证平台4.2 临床验证流程 5. 系统部署方案5.1 边缘计算架构5.2 性能指标 6. 安…...
MySQL数据库精研之旅第四期:解锁库操作高阶技能
专栏:MySQL数据库成长记 个人主页:手握风云 目录 一、查看所有表 1.1. 语法 二、创建表 2.1. 语法 2.2. 示例 2.3. 表在磁盘上对应的⽂件 三、查看表结构 3.1. 语法 3.2. 示例 四、修改表 4.1. 语法 4.2. 示例 五、删除表 5.1. 语法 5.2.…...
【DevOps】DevOps and CI/CD Pipelines
DevOps 是一种将开发与运维实践相结合的模式,旨在缩短软件开发周期并交付高质量软件。 DevOps 是什么? 开发团队与运维团队之间的协作 • 持续集成与持续交付(CI/CD) • 流程自动化 • 基础设施即代码(IaC)…...
Oracle详解
Oracle 数据库是一款由 Oracle 公司开发和维护的关系数据库管理系统(RDBMS)。Oracle 数据库广泛应用于企业级应用中,尤其是在需要高可用性、高性能和安全性的场景。以下是对 Oracle 数据库的详细介绍,包括它的各个方面。 一、Ora…...
VS自定义静态库并在其他项目中使用
1、VS创建一个空项目或者静态库项目 2、右键项目 属性 修改生成文件类型 3、生成解决方案 4、复制.h文件和.lib文件作为静态库 5、创建一个新项目 测试使用新生成的静态库 在新项目UseStaticLib中加一个新文件夹lib,lib中放入上面的.h和.lib文件。 6、vs中右…...
5G AAU(Active Antenna Unit)详细介绍
5G AAU(Active Antenna Unit)详细介绍 1. 定义与架构 5G AAU(Active Antenna Unit,有源天线单元)是5G无线基站系统中的核心组件,它集成了射频(RF)和天线功能,是4G时代R…...
