当前位置: 首页 > news >正文

MapReduce 读写数据库

MapReduce 读写数据库

经常听到小伙伴吐槽 MapReduce 计算的结果无法直接写入数据库,
实际上 MapReduce 是有操作数据库实现的
本案例代码将实现 MapReduce 数据库读写操作和将数据表中数据复制到另外一张数据表中

准备数据表

create database htu;
use htu;
create table word(name varchar(255) comment '单词',count int comment '数量'
);
create table new_word(name varchar(255) comment '单词',count int comment '数量'
);

数据库持久化类

package com.lihaozhe.db;import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;/*** @author 李昊哲* @version 1.0.0* @create 2023/11/7*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Word implements DBWritable {/*** 单词*/private String name;/*** 单词数量*/private int count;@Overridepublic String toString() {return this.name + "\t" + this.count;}@Overridepublic void write(PreparedStatement pst) throws SQLException {pst.setString(1, this.name);pst.setInt(2, this.count);}@Overridepublic void readFields(ResultSet rs) throws SQLException {this.name = rs.getString(1);this.count = rs.getInt(2);}
}

MapReduce 将数据写入数据库

package com.lihaozhe.db;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/*** @author 李昊哲* @version 1.0* @create 2023-11-7*/
public class Write {public static class WordMapper extends Mapper<LongWritable, Text, Word, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Word, NullWritable>.Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");Word word = new Word();word.setName(split[0]);word.setCount(Integer.parseInt(split[1]));context.write(word, NullWritable.get());}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 设置环境变量 hadoop 用户名 为 rootSystem.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 配置JDBC 参数DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai","root", "Lihaozhe!!@@1122");// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 本地运行// conf.set("mapreduce.framework.name", "local");// 设置默认文件系统为 本地文件系统// conf.set("fs.defaultFS", "file:///");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "write db");// 指定当前Job的驱动类// 本地提交 注释该行job.setJarByClass(Write.class);// 本地提交启用该行// job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");// 指定当前Job的 Mapperjob.setMapperClass(WordMapper.class);// 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用// job.setCombinerClass(WordCountReduce.class);// 指定当前Job的 Reducer// job.setReducerClass(WordCountReduce.class);// 设置 reduce 数量为 零job.setNumReduceTasks(0);// 设置 map 输出 key 的数据类型job.setMapOutputValueClass(WordMapper.class);// 设置 map 输出 value 的数据类型job.setMapOutputValueClass(NullWritable.class);// 设置最终输出 key 的数据类型// job.setOutputKeyClass(Text.class);// 设置最终输出 value 的数据类型// job.setOutputValueClass(NullWritable.class);// 定义 map 输入的路径 注意:该路径默认为hdfs路径FileInputFormat.addInputPath(job, new Path("/wordcount/result/part-r-00000"));// 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
//        Path dst = new Path("/video/ods");
//        // 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录
//        DistributedFileSystem dfs = new DistributedFileSystem();
//        String nameService = conf.get("dfs.nameservices");
//        String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;
//        dfs.initialize(URI.create(hdfsRPCUrl), conf);
//        if (dfs.exists(dst)) {
//            dfs.delete(dst, true);
//        }//        FileSystem fs = FileSystem.get(conf);
//        if (fs.exists(dst)) {
//            fs.delete(dst, true);
//        }
//        FileOutputFormat.setOutputPath(job, dst);// 设置输出类job.setOutputFormatClass(DBOutputFormat.class);// 配置将数据写入表DBOutputFormat.setOutput(job, "word", "name", "count");// 提交 job// job.submit();System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

MapReduce 从数据库读取数据

注意:
由于集群环境 导致 MapTask数量不可控可导致最终输出文件可能不止一个,
可以在代码使用 conf.set(“mapreduce.job.maps”, “1”) 设置 MapTask 数量

package com.lihaozhe.db;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;/*** @author 李昊哲* @version 1.0* @create 2023-11-7*/
public class Read {public static class WordMapper extends Mapper<LongWritable, Word, Word, NullWritable> {@Overrideprotected void map(LongWritable key, Word value, Mapper<LongWritable, Word, Word, NullWritable>.Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 设置环境变量 hadoop 用户名 为 rootSystem.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 配置JDBC 参数DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai","root", "Lihaozhe!!@@1122");// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 设置 MapTask 数量conf.set("mapreduce.job.maps", "1");// 本地运行// conf.set("mapreduce.framework.name", "local");// 设置默认文件系统为 本地文件系统// conf.set("fs.defaultFS", "file:///");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "read db");// 指定当前Job的驱动类// 本地提交 注释该行job.setJarByClass(Read.class);// 本地提交启用该行// job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");// 指定当前Job的 Mapperjob.setMapperClass(WordMapper.class);// 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用// job.setCombinerClass(WordCountReduce.class);// 指定当前Job的 Reducer// job.setReducerClass(WordCountReduce.class);// 设置 reduce 数量为 零job.setNumReduceTasks(0);// 设置 map 输出 key 的数据类型job.setMapOutputValueClass(Word.class);// 设置 map 输出 value 的数据类型job.setMapOutputValueClass(NullWritable.class);// 设置最终输出 key 的数据类型// job.setOutputKeyClass(Text.class);// 设置最终输出 value 的数据类型// job.setOutputValueClass(NullWritable.class);// 设置输入类job.setInputFormatClass(DBInputFormat.class);// 配置将数据写入表DBInputFormat.setInput(job, Word.class,"select name,count from word","select count(*) from word");// 定义 map 输入的路径 注意:该路径默认为hdfs路径// FileInputFormat.addInputPath(job, new Path("/wordcount/result/part-r-00000"));// 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径Path dst = new Path("/wordcount/db");// 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录DistributedFileSystem dfs = new DistributedFileSystem();String nameService = conf.get("dfs.nameservices");String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;dfs.initialize(URI.create(hdfsRPCUrl), conf);if (dfs.exists(dst)) {dfs.delete(dst, true);}//        FileSystem fs = FileSystem.get(conf);
//        if (fs.exists(dst)) {
//            fs.delete(dst, true);
//        }FileOutputFormat.setOutputPath(job, dst);// 提交 job// job.submit();System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

MapReduce 实现数据库表复制

MapReduce 实现将数据库一张数据表的数据复制到另外一张数据表中

package com.lihaozhe.db;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import java.io.IOException;/*** @author 李昊哲* @version 1.0* @create 2023-11-7*/
public class Copy {public static class RWMapper extends Mapper<LongWritable, Word, Word, NullWritable> {@Overrideprotected void map(LongWritable key, Word value, Mapper<LongWritable, Word, Word, NullWritable>.Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {System.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 配置JDBC 参数DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai","root", "Lihaozhe!!@@1122");// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 设置 MapTask 数量// conf.set("mapreduce.job.maps", "1");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "read db");job.setJarByClass(Read.class);job.setMapperClass(Read.WordMapper.class);job.setNumReduceTasks(0);job.setMapOutputValueClass(Word.class);job.setMapOutputValueClass(NullWritable.class);// 设置输入类job.setInputFormatClass(DBInputFormat.class);// 配置将数据写入表DBInputFormat.setInput(job, Word.class,"select name,count from word","select count(*) from word");// 设置输出类job.setOutputFormatClass(DBOutputFormat.class);// 配置将数据写入表DBOutputFormat.setOutput(job, "new_word", "name", "count");System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

相关文章:

MapReduce 读写数据库

MapReduce 读写数据库 经常听到小伙伴吐槽 MapReduce 计算的结果无法直接写入数据库&#xff0c; 实际上 MapReduce 是有操作数据库实现的 本案例代码将实现 MapReduce 数据库读写操作和将数据表中数据复制到另外一张数据表中 准备数据表 create database htu; use htu; creat…...

设计模式 -- 状态模式(State Pattern)

状态模式&#xff1a;类的行为基于它的状态改变 属于行为型模式&#xff0c;创建表示各种状态的对象和一个行为随着状态对象改变而改变的 context 对象。在代码中包含大量与对象状态有关的条件语句可以通过此模式将各种具体的状态类抽象出来 介绍 意图&#xff1a;允许对象在…...

qt quick发布程序启动失败

qt quick/qml 程序发布之后&#xff0c;程序启动不了 经过探究测试&#xff0c;程序启动的不了的情况下是因为有dll没有添加。在release文件夹下进行发布操作&#xff08;不单独复制xx.exe拿出来&#xff09;&#xff0c;再次点击IDE的RUN按钮&#xff0c;则会提示有Moudle没有…...

nginx反向代理报错合集

本文汇集了最近在使用nginx反向代理过程中遇到的一系列错误及其解决办法。 1缺乏支持项导致nginx配置错误 在利用sudo ./configure --with-http_ssl_module --with-http_stub_status_module进行配置时&#xff0c;往往会遇到以下类型的错误 error: the HTTP rewrite module …...

【Linux精讲系列】——vim详解

​作者主页 &#x1f4da;lovewold少个r博客主页 ⚠️本文重点&#xff1a;c入门第一个程序和基本知识讲解 &#x1f449;【C-C入门系列专栏】&#xff1a;博客文章专栏传送门 &#x1f604;每日一言&#xff1a;宁静是一片强大而治愈的神奇海洋&#xff01; 目录 目录 ​作者…...

微信小程序自动化采集方案

本文仅供学习交流&#xff0c;只提供关键思路不会给出完整代码&#xff0c;严禁用于非法用途&#xff0c;拒绝转载&#xff0c;若有侵权请联系我删除&#xff01; 一、引言 1、对于一些破解难度大&#xff0c;花费时间长的目标&#xff0c;我们可以先采用自动化点击触发请求&…...

操作系统第三章王道习题_内存管理_总结易错知识点

1. 静态重定位和动态重定位 静态重定位(可重定位装入):作业在装入内存的时候,就修改它的物理地址. 静态重定位进程数据一旦确定位置&#xff0c;就不能再移动 动态重定位(动态运行时装入):作业装入内存的时候,不修改物理地址,直到运行的时候,根据重定位寄存器再修改地址. 对…...

uniapp刻度尺的实现(swiper)滑动打分器

实现图&#xff08;百分制&#xff09;&#xff1a;滑动swiper进行打分&#xff0c;分数加减 <view class"scoring"><view class"toggle"><view class"score"><text>{{0}}</text><view class"scoreId&quo…...

cordova Xcode打包ios以及发布流程(ionic3适用)

第一步 1、申请iOS证书 2、导入证书到钥匙串 第二步 1、xcode配置iOS证书 1.1用Xcode打开你的项目&#xff08;我的Xcode版本是新版&#xff09; 修改如下图 回到基本信息设置界面&#xff0c;Bundie 这项填写&#xff0c;最先创建的那个appid&#xff0c;跟创建iOS描述文件时选…...

idea中的.idea文件夹以及*.iml文件(新版idea没有*.iml文件了),新旧版idea打开同一个项目会不会出现不兼容

一、背景 我们有可能会在同一台电脑上安装2个 intellj idea。比如一个community edition一个ultimate edition&#xff08;一个安装板一个绿色解压版&#xff09; 当然了&#xff0c;两个idea之间可能版本号也会有差。 这篇文章就来讨论两个问题&#xff0c;一是关于idea产生…...

高性能网络编程 - The C10K problem 以及 网络编程技术角度的解决思路

文章目录 C10KC10K的由来C10K问题在技术层面的典型体现C10K问题的本质C10K解决思路思路一&#xff1a;每个进程/线程处理一个连接思路二&#xff1a;每个进程/线程同时处理多个连接&#xff08;IO多路复用&#xff09;● 实现方式1&#xff1a;直接循环处理多个连接● 实现方式…...

uniapp u-tabs表单如何默认选中

首先先了解该组件&#xff1b;该组件&#xff0c;是一个tabs标签组件&#xff0c;在标签多的时候&#xff0c;可以配置为左右滑动&#xff0c;标签少的时候&#xff0c;可以禁止滑动。 该组件的一个特点是配置为滚动模式时&#xff0c;激活的tab会自动移动到组件的中间位置。 …...

2023年腾讯云双11活动入口在哪里?

2023年双11腾讯云推出了11.11大促优惠活动&#xff0c;下面给大家分享腾讯云双11活动入口、活动时间、活动详情&#xff0c;希望可以助力大家轻松上云&#xff01; 一、腾讯云双11活动入口 活动地址&#xff1a;点此直达 二、腾讯云双11活动时间 腾讯云双11活动时间跨度很长…...

Windows 下编译 TensorFlow 2.12.0 CC库

大体参考 Windows 下编译 TensorFlow 2.9.1 CC库-CSDN博客 这个版本不完整&#xff0c;需要从 TensorFlow 2.14.0 根目录复制 WORKSPACE 覆盖原同名文件&#xff0c;还需要复制TensorFlow 2.14.0 的 tensorflow\tools\toolchains\python 到相同目录。...

Spring Boot 中自动装配机制的原理

&#xff08;摘自mic老师面试题&#xff09; 最近一个粉丝说&#xff0c;他面试了 4 个公司&#xff0c;有三个公司问他&#xff1a;“Spring Boot 中自动装配 机制的原理” 他回答了&#xff0c;感觉没回答错误&#xff0c;但是怎么就没给 offer 呢&#xff1f; 对于这个问题…...

如何安装Wnmp并结合内网穿透实现外网访问内网Wnmp服务

文章目录 前言1.Wnmp下载安装2.Wnmp设置3.安装cpolar内网穿透3.1 注册账号3.2 下载cpolar客户端3.3 登录cpolar web ui管理界面3.4 创建公网地址 4.固定公网地址访问 前言 WNMP是Windows系统下的绿色NginxMysqlPHP环境集成套件包&#xff0c;安装完成后即可得到一个Nginx MyS…...

网工内推 | 上市公司,云平台运维,IP认证优先,13薪

01 上海新炬网络信息技术股份有限公司 招聘岗位&#xff1a;云平台运维工程师 职责描述&#xff1a; 1、负责云平台运维&#xff0c;包括例行巡检、版本发布、问题及故障处理、平台重保等&#xff0c;保障平台全年稳定运行&#xff1b; 2、参与制定运维标准规范与流程&#x…...

Linux安装DMETL4

Linux安装DMETL4 产品与环境介绍1 规划安装路径2 DM8安装路径2.1 达梦数据库程序安装路径2.2 初始化达梦数据库2.3 创建数据库用户名 DMETL 3 安装DMETL3.1 查看安装包与授权3.2 安装DMETL程序3.3 DMETL安装日志 4 启动DMETL5 DMETL连接数据库后会自动创建相关资源表6 达梦数据…...

Python中编码声明的方法

编码声明主要是为了解决编码问题。由于Python 3的默认编码是UTF-8&#xff0c;因此在使用Python 3编写源代码时&#xff0c;通常不需要在文件开头添加编码声明。但是&#xff0c;如果您使用的编码不是UTF-8&#xff0c;则需要在文件开头添加编码声明&#xff0c;以确保Python解…...

css设置浏览器表单自动填充时的背景

浏览器自动填充表单内容&#xff0c;会自动设置背景色。对于一般的用户&#xff0c;也许不会觉得有什么&#xff0c;但对于要求比较严格的用户&#xff0c;就会“指手画脚”。这里&#xff0c;我们通过css属性来设置浏览器填充背景的过渡时间&#xff0c;使用户看不到过渡后的背…...

STM32+MPU6050传感器

#创作灵感## 在嵌入式系统开发中&#xff0c;STM32F103C8T6单片机与MPU6050传感器的组合因其高性能、低功耗以及丰富的功能而备受青睐。本文将简单介绍如何在Keil 5开发环境中实现STM32F103C8T6与MPU6050的连接和基本数据采集&#xff0c;带你快速入门智能硬件开发。 一、硬件…...

MySQL 索引优化(Explain执行计划) 详细讲解

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 MySQL 索引优化&#xff08;Explain执行计划…...

【Vmwrae】快速安装windows虚拟机

前言 虚拟机是我们在使用电脑进行开发或者平常工作时经常使用到的工具 它可以自定义各种硬件&#xff0c;运行各种不同的系统&#xff0c;且无论发生什么都不会影响到实体机。 教程主要讲了如何在零基础的情况下快速安装一台虚拟机。 下载安装 VMware Workstation Pro17 …...

企业私有化部署DeepSeek实战指南:从硬件选型到安全运维——基于国产大模型的安全可控落地实践

一、部署前的战略评估与规划 私有化部署不仅是技术工程&#xff0c;更是企业数据战略的核心环节。需重点评估三方面&#xff1a; 1、业务场景适配性​ 适用场景&#xff1a;金融风控&#xff08;需实时数据处理&#xff09;、医疗诊断&#xff08;敏感病历保护&#xff09;、政…...

MySQL中的部分问题(2)

索引失效 运算或函数影响列的使用 当查询条件中对索引列用了函数或运算&#xff0c;索引会失效。 例&#xff1a;假设有索引&#xff1a;index idx_name (name) select * from users where upper(name) ALICE; -- 索引失效因为upper(name)会对列内容进行函数处理&#xf…...

BeanFactory 和 FactoryBean 有何区别与联系?

导语&#xff1a; Spring 是后端面试中的“常青树”&#xff0c;而 BeanFactory 与 FactoryBean 的关系更是高频卡人点。很多候选人混淆两者概念&#xff0c;答非所问&#xff0c;轻则失分&#xff0c;重则直接被“pass”。本文将从面试官视角&#xff0c;深入剖析这一经典问题…...

Linux--命令行参数和环境变量

1.命令行参数 Linux 命令行参数基础 1.1参数格式 位置参数&#xff1a;无符号&#xff0c;按顺序传递&#xff08;如 ls /home/user 中 /home/user 是位置参数&#xff09; 选项参数&#xff1a; 短选项&#xff1a;以 - 开头&#xff0c;单个字母&#xff08;如 -l 表示长格…...

WEB3技术重要吗,还是可有可无?

我从几个角度给你一个全面、理性、技术导向的回答&#xff1a; ✅ 一、Web3 技术的重要性&#xff1a;“有意义&#xff0c;但不是万能” Web3 技术并不是可有可无的噱头&#xff0c;而是一种在特定场景下提供独特价值的技术体系。 它重要的原因包括&#xff1a; 1. 重构数字…...

嵌入式学习--江协stm32day4

只能说拖延没有什么好结果&#xff0c;欠下的债总是要还的。 ADC 模拟信号转化为数字信号&#xff0c;例如温度传感器将外部温度的变化&#xff08;模拟信号&#xff09;&#xff0c;转换为内部电压的变化&#xff08;数字信号&#xff09; IN是八路输入&#xff0c;下方是选择…...

强化学习入门:交叉熵方法数学推导

前言 最近想开一个关于强化学习专栏&#xff0c;因为DeepSeek-R1很火&#xff0c;但本人对于LLM连门都没入。因此&#xff0c;只是记录一些类似的读书笔记&#xff0c;内容不深&#xff0c;大多数只是一些概念的东西&#xff0c;数学公式也不会太多&#xff0c;还望读者多多指教…...