当前位置: 首页 > article >正文

【hadoop】hadoop streaming

API:
https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html(hadoop3)
https://cwiki.apache.org/confluence/display/HADOOP2/HadoopStreaming(hadoop2)
hadoop version查看hadoop版本,下面的测试基于hadoop2环境。

文章目录

  • 1.介绍
  • 2.Demo
  • 3.map任务获取数据来源
  • 4.作业提交主要参数解释
    • 1)输入输出
    • 2)资源分发
    • 3) 作业启动管理
    • 4)分区规则
    • 5)压缩
  • 5.仅map作业

1.介绍

在这里插入图片描述
hadoop streaming可以将任何可执行的文件,如sh脚本,py脚本,嵌入到分布式环境当中执行MR的逻辑,而不必局限于java语言。

hadoop会在每个启动的任务进程中初始化指定的map或reduce脚本并执行,map或reduce任务通过标准输入流读取数据,标准输出流写出数据。

需要注意的是streaming任务默认在map端数据shuffle到reduce端时没有本地合并的过程,也就是MR任务中在map=>reduce过程中,传输的数据格式:key: [value1, value2, value3] ,是key+一个value构成的迭代器。而streaming任务中,map输出的相同key的数据只会被依次相邻的送到同一个reducekey1: value1; key2: value2这样。

2.Demo

对于下面的数据:

(base) map@gzdt-map-poi-yingxiang-offline04 test_streaming$ cat test.dat
1	小王
2	小李
1	张三
3	李四

执行下面命令:

#!/bin/bash
set -e -x -u -o pipefailhadoop streaming \-input afs://xxx/test.dat \-output afs://xxx/test.res \-jobconf hadoop.job.ugi="xxx,xxx" \-jobconf mapred.job.queue.name="xxx" \-jobconf mapred.job.tracker="xxx" \-inputformat org.apache.hadoop.mapred.TextInputFormat \-mapper cat \-reducer wc

产出的结果文件:

(base) map@gzdt-map-poi-yingxiang-offline04 test_streaming$ cat test.res2       4      181       2       91       2       9

wc是Unix/Linux中的一个命令行工具,用于计算给定文件中的字节数、字数和行数。

wc命令的输出有三个或四个数字:

第一个数字是文件中的行数。
第二个数字是文件中的词数。这里的"词"是指由空格、制表符或换行符分隔的字符串。
第三个数字是文件中的字节数。这是文件的大小,不是字符数。如果文件中包含多字节字符(如UTF-8编码的非ASCII字符),字节数会大于字符数。
第四个字段是输入文件的名称。

如果只输入wc命令而不带任何文件名,wc会从标准输入读取数据,此时不会显示文件名。

3.map任务获取数据来源

一个streaming job中可以指定多个不同的输入路径,不同路径的数据可能会需要不同的处理方式,所以map任务中区分当前数据的来源非常重要。

hadoop在任务启动时会预置一些属性作为进程级别的环境变量:
在这里插入图片描述

圈起来的这个属性在map任务中可以用来获取当前map任务处理的文件块的名字,类似于hdfs://namenode:port/file_path/block_name
在这里插入图片描述

注意点:
1)During the execution of a streaming job, the names of the “mapreduce” parameters are transformed. The dots ( . ) become underscores ( _ ),上面截图中的mapreduce参数值在程序中获取时,需要将.转换为_,也就是mapreduce.map.input.file在程序中通过mapreduce_map_input_file获取。
2)python中该参数被简化,mapreduce_map_input_file写为map_input_file

下面是一个参考的demo:

import sys
import os
import io# 强制stdout以UTF-8编码输出(python3默认,但最终编码方式受限系统环境)
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')# 当前数据块的全路径
path = os.environ['map_input_file']# path是block的全路径,所以判断全路径包含哪个表名即可
if 'tb1_name' in path:for line in sys.stdin.buffer:decoded_line = line.decode('utf-8').strip('\n')items = decoded_line.split('\t')key = items[0]value = items[1:]print('\t'.join([key, 'tb1_name', value]))  # reduce中通过第二个标志位标识数据来源于哪个表
elif 'tb2_name' in path:for line in sys.stdin.buffer:decoded_line = line.decode('gb18030').strip('\n')items = decoded_line.split('\t')key = items[0]value = items[1:]print('\t'.join([key, 'tb2_name', value]))
else:for line in sys.stdin.buffer:decoded_line = line.decode('utf-8').strip('\n')items = decoded_line.split('\t')key = items[0]value = items[1:]print('\t'.join([key, 'other', value]))

reduce处理:

import sysdef process(datas):passif __name__ == '__main__':key = ''datas = []for line in sys.stdin:items = line.strip('\n').split('\t')if key == '' or items[0] == key:  # 相同bidkey = items[0]datas.append(items)else:process(datas)  # 处理上一批biddatas.clear()  # 清空key = items[0]datas.append(items)  # 添加当前记录if len(datas) > 0:  # 处理最后一批bidprocess(datas)

4.作业提交主要参数解释

1)输入输出

-input:输入文件路径,可以有多个。
-output:输出文件路径。
-inputformat:负责定义如何读取输入数据,并决定如何将数据分割成多个块,每个块由一个map任务处理。默认org.apache.hadoop.mapred.TextInputFormat普通文本读取。

2)资源分发

-file:将本地文件分发到hadoop集群的所有MR任务的当前工作目录(task级别),使任务脚本能够访问这些文件。当前工作目录表明每启动一个map或reduce任务都会拷贝一份副本。常用来分发map和reduce的任务脚本以及一些资源文件,如模型权重文件。模型资源文件和任务脚本处于同一个目录,任务脚本中可以直接./xxx相对路径读取资源文件。
-cacheArchive:将归档文件分发到所有任务节点(worker级别)并自动解压,节点上的MR任务启动时不会拷贝副本到自己的工作目录,会通过符号链接共享同一个节点上的文件。适合用来分发一些比较大的文件,如python环境包。

eg:-cacheArchive $HADOOP_PYTHON3#python3,如果hdfs上python包解压缩后的结构为python3/bin/python3,则在mapper和reducer参数中指定python环境时-mapper ./python3/python3/bin/python3 mapper.py,关于python解释器层级路径的解释:#python3,会在当前task工作目录中创建一个名为python3的文件夹,并将python环境的tar包拉到这个目录下解压。

3) 作业启动管理

-mapper: 指定mapper脚本。
-reducer: 指定reducer脚本。
-jobconf mapred.map.tasks:map任务数量,不严格依赖指定,跟输入数据的分块有关。
-jobconf mapred.reduce.tasks:reduce任务数量,等于任务最终输出包含的文件个数。
-jobconf hadoop.job.ugi:提交作业用户的身份,主要影响作业对HDFS或YARN的访问权限。
-jobconf mapred.job.queue.name:资源队列。
-jobconf mapred.job.priority:任务优先级。

4)分区规则

-partitioner:分区规则,org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner基于指定序号的字段为分区键。
-jobconf stream.num.map.output.key.fields:指定map输出的key,如-jobconf stream.num.map.output.key.fields=1指定按照map输出的第一个字段作为key。
-jobconf num.key.fields.for.partition:决定如何分区,默认按照完整的key,即上面参数配置。如果上面配置为符合key,该参数可以指定按照key中的部分字段分区。也就是说,如果配置了这个参数,指定的字段决定了map输出送往哪个reduce,而上面配置的key决定了送到同一个reduce的数据的先后顺序。(map在shuffle数据到reduce之前会对数据按照key排序)

5)压缩

-jobconf mapred.output.compress:输出结果是否开启压缩,true、false。
-jobconf mapred.output.compression.codec:指定压缩编解码器,Gzip压缩org.apache.hadoop.io.compress.GzipCodec
-jobconf mapred.output.compression.type:压缩类型,=BLOCK块压缩,适合列式存储,=RECORD记录压缩,适合行式存储。

5.仅map作业

https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html#Specifying_Map-Only_Jobs
在hadoop3版本中指定参数-D mapreduce.job.reduces=0,hadoop2版本中指定-jobconf mapred.reduce.tasks=0

-reducer参数不可省,即使不需要reduce任务,否则实际测试中会报错如下。可以随意指定,例如-reducer cat

在这里插入图片描述

相关文章:

【hadoop】hadoop streaming

API: https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html(hadoop3) https://cwiki.apache.org/confluence/display/HADOOP2/HadoopStreaming(hadoop2) hadoop version查看hadoop版本&#…...

Unity-RectTransform设置UI width

不知道有没人需要这样的代码,就是.sizeDelta //不确定是不是英文翻译的原因,基本很难理解,sizeDeltaSize,//未必完全正确,但这么写好像总没错过 //image 在一个UnityEngine.UI.Image 的数组内foreach (var image in l…...

开发中后端返回下划线数据,要不要统一转驼峰?

先说结论。看情况!!!! 前端 主要用 JS/TS 建议后端返回 camelCase,减少前端转换成本。后端 主要是 Python/Go 建议保持 snake_case,前端做转换。但是团队统一风格最重要!如果统一返回驼峰就驼峰…...

【现代深度学习技术】现代卷积神经网络04:含并行连接的网络(GoogLeNet)

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈PyTorch深度学习 ⌋ ⌋ ⌋ 深度学习 (DL, Deep Learning) 特指基于深层神经网络模型和方法的机器学习。它是在统计机器学习、人工神经网络等算法模型基础上,结合当代大数据和大算力的发展而发展出来的。深度学习最重…...

链表-LeetCode

这里写目录标题 1 排序链表1.1 插入法 O(n)1.2 归并排序 1 排序链表 1.1 插入法 O(n) /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullpt…...

TypeScript 与 JavaScript 对比

核心概念对比 JavaScript 语言类型:动态类型脚本语言诞生时间:1995年(ES1标准)类型系统:运行时类型检查文件扩展名:.js编译需求:无需编译,直接执行 TypeScript 语言类型&#xf…...

Selenium之Web Driver常用属性

Web Driver常用属性 在上一篇文章里我们安装并且使用了selenium来操控浏览器;这一节我们来看一下Driver的一些常用属性;可以方便和浏览器进行交互 废话不多说,下面以实践为主 获取浏览器名称 browser_name browser.name print(browser_n…...

EF Core 执行原生SQL语句

文章目录 前言一、执行查询&#xff08;返回数据&#xff09;1&#xff09; 使用 FromSqlRaw或 FromSqlInterpolated 方法&#xff0c;适用于 DbSet<T>&#xff0c;返回实体集合。2&#xff09;结合 LINQ 查询 二、执行非查询操作&#xff08;增删改&#xff09;1&#x…...

新版 eslintrc 文件弃用 .eslintignore已弃用 替代方案

1.进入eslint.config.mjs文件 2.import { defineConfig, globalIgnores } from "eslint/config"; 引入globalIgnores 3.配置 defineConfig([ ... globalIgnores([ "config/*", ".husky", ".local", "public/*", ".…...

Python二分查找【清晰易懂】

1. 二分查找是什么&#xff1f; 想象你在玩“猜数字”游戏&#xff1a; 对方心里想一个 1~100 的数字&#xff0c;你每次猜一个数&#xff0c;对方会告诉你是“大了”还是“小了”。 最快的方法&#xff1a;每次都猜中间的数&#xff01;比如第一次猜50&#xff0c;如果大了&…...

Azure SDK 使用指南

​Azure SDK&#xff08;软件开发工具包&#xff09;是一组由微软提供的工具和库&#xff0c;旨在帮助开发者以多种编程语言&#xff08;如 .NET、Java、Python、JavaScript 等&#xff09;与 Azure 服务进行交互。 ​通过使用 Azure SDK&#xff0c;开发者可以更高效地构建、部…...

【STL】vector介绍(附部分接口模拟实现)

文章目录 1.介绍2.使用2.1 vector的构造2.2 vector空间相关接口2.2.1 size()2.2.2 capacity()2.2.3 empty()2.2.4 resize()2.2.5 reserve() 2.3 vector的增删查改2.3.1 push_back()2.3.2 insert()2.3.3 pop_back()2.3.4 erase()2.3.5 swap()2.3.6 operator[]注&#xff1a;关于…...

一周掌握Flutter开发--8. 调试与性能优化(上)

文章目录 8. 调试与性能优化核心技能8.1 使用 Flutter DevTools 分析性能8.2 检查 Widget 重绘&#xff08;debugPaintSizeEnabled&#xff09;8.3 解决 ListView 卡顿&#xff08;ListView.builder itemExtent&#xff09; 其他性能优化技巧8.4 减少 build 方法的调用8.5 使用…...

游戏引擎学习第182天

回顾和今天的计划 昨天的进展令人惊喜&#xff0c;原本的调试系统已经被一个新的系统完全替换&#xff0c;新系统不仅能完成原有的所有功能&#xff0c;还能捕获完整的调试信息&#xff0c;包括时间戳等关键数据。这次的替换非常顺利&#xff0c;效果很好。 今天的重点是在此基…...

2025计算机毕设全流程实战指南:Java/Python+协同过滤+小程序开发避坑手册​

技术框架的选择是项目开发的关键起点&#xff0c;直接影响开发效率和最终成果质量。然而&#xff0c;许多开发者在选择技术框架时面临困难&#xff1a;现有知识储备不足以支撑复杂项目需求&#xff0c;团队经验有限&#xff0c;框架选择缺乏前瞻性常导致后期问题。尽管技术框架…...

C语言_数据结构_二叉树

【本节目标】 树的概念及结构 二叉树的概念及结构 二叉树的顺序结构及实现 二叉树的链式结构及实现 1. 树的概念及结构 1.1 树的概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为…...

Compare全目录文件比较内容(项目中用到过)

第一步&#xff1a;找到“会话”——“会话设置” 会话设置弹框信息 第二步&#xff1a;选择“比较”tab标签 比较内容&#xff1a;选中二进制比较 第三步&#xff1a;选中所有文件 第四步&#xff1a;右键选中“比较内容” 第五步&#xff1a;选中“基于规则的比较”...

3.26[a]paracompute homework

5555 负载不平衡指多个线程的计算量差异显著&#xff0c;导致部分线程空转或等待&#xff0c;降低并行效率。其核心矛盾在于任务划分的静态性与计算动态性不匹配&#xff0c;尤其在处理不规则数据或动态任务时尤为突出。以稀疏矩阵的向量乘法为例&#xff0c;假设其非零元素分…...

视觉大模型CLIP论文精读

论文&#xff1a;Learning Transferable Visual Models From Natural Language Supervision 代码&#xff1a;https://github.com/openai/CLIP 摘要 最先进的计算机视觉系统是针对预测一组固定的、预先确定的对象类别进行训练的。这种受限的监督形式限制了它们的通用性和可用…...

【AI】Orin NX+ubuntu22.04上移植YoloV11,并使用DeepStream测试成功

【AI】郭老二博文之:AI学习目录汇总 1、烧写系统 新到的开发板,已经烧写好Ubuntu系统,版本为22.04。 如果没有升级到Ubuntu22.04,可以在电脑Ubuntu系统中使用SDKManager来烧写Ubuntu系统,网络情况好的话,也可以直接将CUDA、cuDNN、TensorRT、Deepstream等也安装上。 2…...

HTML文档流

1. 基础定义 “文档流(Normal Flow)是指HTML元素在页面中默认的排列方式。在标准文档流中&#xff0c;块级元素会从上到下垂直排列&#xff0c;每个元素占据一整行&#xff1b;而行内元素则从左到右水平排列&#xff0c;直到空间不足才会换行。” 2. 详细解释 可以进一步展开…...

链表的创建:头插法与尾插法详解(数据结构)

C 链表的创建&#xff1a;头插法与尾插法详解 链表&#xff08;Linked List&#xff09;是一种重要的数据结构&#xff0c;适用于插入和删除操作频繁的场景。本文介绍 两种常见的链表构建方法&#xff1a; 尾插法&#xff08;Append / Tail Insertion&#xff09;&#xff1a;…...

MyBatis中mapper.xml 的sql映射规则

一、SQL 映射文件核心元素 MyBatis 映射文件的顶级元素&#xff08;按定义顺序&#xff09;&#xff1a; cache&#xff1a;命名空间的缓存配置。cache-ref&#xff1a;引用其他命名空间的缓存。resultMap&#xff1a;自定义结果集映射。sql&#xff1a;可重用的 SQL 片段。i…...

深入解析 Java 类加载机制及双亲委派模型

&#x1f50d; Java的类加载机制是确保应用程序正确运行的基础&#xff0c;特别是双亲委派模型&#xff0c;它通过父类加载器逐层加载类&#xff0c;避免冲突和重复加载。但在某些特殊场景下&#xff0c;破坏双亲委派模型会带来意想不到的效果。本文将深入解析Java类加载机制、…...

糖尿病大模型预测及临床应用研究智能管理系统技术文档

目录 1. 数据工程规范1.1 多源数据集成1.2 特征工程架构 2. 核心模型架构2.1 分层预测网络2.2 动态血糖预测模块 3. 实时决策系统3.1 术中预警协议3.2 麻醉方案优化器 4. 验证体系实现4.1 数字孪生验证平台4.2 临床验证流程 5. 系统部署方案5.1 边缘计算架构5.2 性能指标 6. 安…...

MySQL数据库精研之旅第四期:解锁库操作高阶技能

专栏&#xff1a;MySQL数据库成长记 个人主页&#xff1a;手握风云 目录 一、查看所有表 1.1. 语法 二、创建表 2.1. 语法 2.2. 示例 2.3. 表在磁盘上对应的⽂件 三、查看表结构 3.1. 语法 3.2. 示例 四、修改表 4.1. 语法 4.2. 示例 五、删除表 5.1. 语法 5.2.…...

【DevOps】DevOps and CI/CD Pipelines

DevOps 是一种将开发与运维实践相结合的模式&#xff0c;旨在缩短软件开发周期并交付高质量软件。 DevOps 是什么&#xff1f; 开发团队与运维团队之间的协作 • 持续集成与持续交付&#xff08;CI/CD&#xff09; • 流程自动化 • 基础设施即代码&#xff08;IaC&#xff09;…...

Oracle详解

Oracle 数据库是一款由 Oracle 公司开发和维护的关系数据库管理系统&#xff08;RDBMS&#xff09;。Oracle 数据库广泛应用于企业级应用中&#xff0c;尤其是在需要高可用性、高性能和安全性的场景。以下是对 Oracle 数据库的详细介绍&#xff0c;包括它的各个方面。 一、Ora…...

VS自定义静态库并在其他项目中使用

1、VS创建一个空项目或者静态库项目 2、右键项目 属性 修改生成文件类型 3、生成解决方案 4、复制.h文件和.lib文件作为静态库 5、创建一个新项目 测试使用新生成的静态库 在新项目UseStaticLib中加一个新文件夹lib&#xff0c;lib中放入上面的.h和.lib文件。 6、vs中右…...

5G AAU(Active Antenna Unit)详细介绍

5G AAU&#xff08;Active Antenna Unit&#xff09;详细介绍 1. 定义与架构 5G AAU&#xff08;Active Antenna Unit&#xff0c;有源天线单元&#xff09;是5G无线基站系统中的核心组件&#xff0c;它集成了射频&#xff08;RF&#xff09;和天线功能&#xff0c;是4G时代R…...