当前位置: 首页 > news >正文

MapReduce 读写数据库

MapReduce 读写数据库

经常听到小伙伴吐槽 MapReduce 计算的结果无法直接写入数据库,
实际上 MapReduce 是有操作数据库实现的
本案例代码将实现 MapReduce 数据库读写操作和将数据表中数据复制到另外一张数据表中

准备数据表

create database htu;
use htu;
create table word(name varchar(255) comment '单词',count int comment '数量'
);
create table new_word(name varchar(255) comment '单词',count int comment '数量'
);

数据库持久化类

package com.lihaozhe.db;import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;/*** @author 李昊哲* @version 1.0.0* @create 2023/11/7*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Word implements DBWritable {/*** 单词*/private String name;/*** 单词数量*/private int count;@Overridepublic String toString() {return this.name + "\t" + this.count;}@Overridepublic void write(PreparedStatement pst) throws SQLException {pst.setString(1, this.name);pst.setInt(2, this.count);}@Overridepublic void readFields(ResultSet rs) throws SQLException {this.name = rs.getString(1);this.count = rs.getInt(2);}
}

MapReduce 将数据写入数据库

package com.lihaozhe.db;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/*** @author 李昊哲* @version 1.0* @create 2023-11-7*/
public class Write {public static class WordMapper extends Mapper<LongWritable, Text, Word, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Word, NullWritable>.Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");Word word = new Word();word.setName(split[0]);word.setCount(Integer.parseInt(split[1]));context.write(word, NullWritable.get());}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 设置环境变量 hadoop 用户名 为 rootSystem.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 配置JDBC 参数DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai","root", "Lihaozhe!!@@1122");// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 本地运行// conf.set("mapreduce.framework.name", "local");// 设置默认文件系统为 本地文件系统// conf.set("fs.defaultFS", "file:///");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "write db");// 指定当前Job的驱动类// 本地提交 注释该行job.setJarByClass(Write.class);// 本地提交启用该行// job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");// 指定当前Job的 Mapperjob.setMapperClass(WordMapper.class);// 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用// job.setCombinerClass(WordCountReduce.class);// 指定当前Job的 Reducer// job.setReducerClass(WordCountReduce.class);// 设置 reduce 数量为 零job.setNumReduceTasks(0);// 设置 map 输出 key 的数据类型job.setMapOutputValueClass(WordMapper.class);// 设置 map 输出 value 的数据类型job.setMapOutputValueClass(NullWritable.class);// 设置最终输出 key 的数据类型// job.setOutputKeyClass(Text.class);// 设置最终输出 value 的数据类型// job.setOutputValueClass(NullWritable.class);// 定义 map 输入的路径 注意:该路径默认为hdfs路径FileInputFormat.addInputPath(job, new Path("/wordcount/result/part-r-00000"));// 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
//        Path dst = new Path("/video/ods");
//        // 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录
//        DistributedFileSystem dfs = new DistributedFileSystem();
//        String nameService = conf.get("dfs.nameservices");
//        String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;
//        dfs.initialize(URI.create(hdfsRPCUrl), conf);
//        if (dfs.exists(dst)) {
//            dfs.delete(dst, true);
//        }//        FileSystem fs = FileSystem.get(conf);
//        if (fs.exists(dst)) {
//            fs.delete(dst, true);
//        }
//        FileOutputFormat.setOutputPath(job, dst);// 设置输出类job.setOutputFormatClass(DBOutputFormat.class);// 配置将数据写入表DBOutputFormat.setOutput(job, "word", "name", "count");// 提交 job// job.submit();System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

MapReduce 从数据库读取数据

注意:
由于集群环境 导致 MapTask数量不可控可导致最终输出文件可能不止一个,
可以在代码使用 conf.set(“mapreduce.job.maps”, “1”) 设置 MapTask 数量

package com.lihaozhe.db;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;/*** @author 李昊哲* @version 1.0* @create 2023-11-7*/
public class Read {public static class WordMapper extends Mapper<LongWritable, Word, Word, NullWritable> {@Overrideprotected void map(LongWritable key, Word value, Mapper<LongWritable, Word, Word, NullWritable>.Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 设置环境变量 hadoop 用户名 为 rootSystem.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 配置JDBC 参数DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai","root", "Lihaozhe!!@@1122");// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 设置 MapTask 数量conf.set("mapreduce.job.maps", "1");// 本地运行// conf.set("mapreduce.framework.name", "local");// 设置默认文件系统为 本地文件系统// conf.set("fs.defaultFS", "file:///");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "read db");// 指定当前Job的驱动类// 本地提交 注释该行job.setJarByClass(Read.class);// 本地提交启用该行// job.setJar("D:\\work\\河南师范大学\\2023\\bigdata2023\\Hadoop\\code\\hadoop\\target\\hadoop.jar");// 指定当前Job的 Mapperjob.setMapperClass(WordMapper.class);// 指定当前Job的 Combiner 注意:一定不能影响最终计算结果 否则 不使用// job.setCombinerClass(WordCountReduce.class);// 指定当前Job的 Reducer// job.setReducerClass(WordCountReduce.class);// 设置 reduce 数量为 零job.setNumReduceTasks(0);// 设置 map 输出 key 的数据类型job.setMapOutputValueClass(Word.class);// 设置 map 输出 value 的数据类型job.setMapOutputValueClass(NullWritable.class);// 设置最终输出 key 的数据类型// job.setOutputKeyClass(Text.class);// 设置最终输出 value 的数据类型// job.setOutputValueClass(NullWritable.class);// 设置输入类job.setInputFormatClass(DBInputFormat.class);// 配置将数据写入表DBInputFormat.setInput(job, Word.class,"select name,count from word","select count(*) from word");// 定义 map 输入的路径 注意:该路径默认为hdfs路径// FileInputFormat.addInputPath(job, new Path("/wordcount/result/part-r-00000"));// 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径Path dst = new Path("/wordcount/db");// 保护性代码 如果 reduce 输出目录已经存在则删除 输出目录DistributedFileSystem dfs = new DistributedFileSystem();String nameService = conf.get("dfs.nameservices");String hdfsRPCUrl = "hdfs://" + nameService + ":" + 8020;dfs.initialize(URI.create(hdfsRPCUrl), conf);if (dfs.exists(dst)) {dfs.delete(dst, true);}//        FileSystem fs = FileSystem.get(conf);
//        if (fs.exists(dst)) {
//            fs.delete(dst, true);
//        }FileOutputFormat.setOutputPath(job, dst);// 提交 job// job.submit();System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

MapReduce 实现数据库表复制

MapReduce 实现将数据库一张数据表的数据复制到另外一张数据表中

package com.lihaozhe.db;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import java.io.IOException;/*** @author 李昊哲* @version 1.0* @create 2023-11-7*/
public class Copy {public static class RWMapper extends Mapper<LongWritable, Word, Word, NullWritable> {@Overrideprotected void map(LongWritable key, Word value, Mapper<LongWritable, Word, Word, NullWritable>.Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {System.setProperty("HADOOP_USER_NAME", "root");// 参数配置对象Configuration conf = new Configuration();// 配置JDBC 参数DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://spark03:3306/htu?useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=UTF8&useSSL=false&serverTimeZone=Asia/Shanghai","root", "Lihaozhe!!@@1122");// 跨平台提交conf.set("mapreduce.app-submission.cross-platform", "true");// 设置 MapTask 数量// conf.set("mapreduce.job.maps", "1");// 声明Job对象 就是一个应用Job job = Job.getInstance(conf, "read db");job.setJarByClass(Read.class);job.setMapperClass(Read.WordMapper.class);job.setNumReduceTasks(0);job.setMapOutputValueClass(Word.class);job.setMapOutputValueClass(NullWritable.class);// 设置输入类job.setInputFormatClass(DBInputFormat.class);// 配置将数据写入表DBInputFormat.setInput(job, Word.class,"select name,count from word","select count(*) from word");// 设置输出类job.setOutputFormatClass(DBOutputFormat.class);// 配置将数据写入表DBOutputFormat.setOutput(job, "new_word", "name", "count");System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

相关文章:

MapReduce 读写数据库

MapReduce 读写数据库 经常听到小伙伴吐槽 MapReduce 计算的结果无法直接写入数据库&#xff0c; 实际上 MapReduce 是有操作数据库实现的 本案例代码将实现 MapReduce 数据库读写操作和将数据表中数据复制到另外一张数据表中 准备数据表 create database htu; use htu; creat…...

设计模式 -- 状态模式(State Pattern)

状态模式&#xff1a;类的行为基于它的状态改变 属于行为型模式&#xff0c;创建表示各种状态的对象和一个行为随着状态对象改变而改变的 context 对象。在代码中包含大量与对象状态有关的条件语句可以通过此模式将各种具体的状态类抽象出来 介绍 意图&#xff1a;允许对象在…...

qt quick发布程序启动失败

qt quick/qml 程序发布之后&#xff0c;程序启动不了 经过探究测试&#xff0c;程序启动的不了的情况下是因为有dll没有添加。在release文件夹下进行发布操作&#xff08;不单独复制xx.exe拿出来&#xff09;&#xff0c;再次点击IDE的RUN按钮&#xff0c;则会提示有Moudle没有…...

nginx反向代理报错合集

本文汇集了最近在使用nginx反向代理过程中遇到的一系列错误及其解决办法。 1缺乏支持项导致nginx配置错误 在利用sudo ./configure --with-http_ssl_module --with-http_stub_status_module进行配置时&#xff0c;往往会遇到以下类型的错误 error: the HTTP rewrite module …...

【Linux精讲系列】——vim详解

​作者主页 &#x1f4da;lovewold少个r博客主页 ⚠️本文重点&#xff1a;c入门第一个程序和基本知识讲解 &#x1f449;【C-C入门系列专栏】&#xff1a;博客文章专栏传送门 &#x1f604;每日一言&#xff1a;宁静是一片强大而治愈的神奇海洋&#xff01; 目录 目录 ​作者…...

微信小程序自动化采集方案

本文仅供学习交流&#xff0c;只提供关键思路不会给出完整代码&#xff0c;严禁用于非法用途&#xff0c;拒绝转载&#xff0c;若有侵权请联系我删除&#xff01; 一、引言 1、对于一些破解难度大&#xff0c;花费时间长的目标&#xff0c;我们可以先采用自动化点击触发请求&…...

操作系统第三章王道习题_内存管理_总结易错知识点

1. 静态重定位和动态重定位 静态重定位(可重定位装入):作业在装入内存的时候,就修改它的物理地址. 静态重定位进程数据一旦确定位置&#xff0c;就不能再移动 动态重定位(动态运行时装入):作业装入内存的时候,不修改物理地址,直到运行的时候,根据重定位寄存器再修改地址. 对…...

uniapp刻度尺的实现(swiper)滑动打分器

实现图&#xff08;百分制&#xff09;&#xff1a;滑动swiper进行打分&#xff0c;分数加减 <view class"scoring"><view class"toggle"><view class"score"><text>{{0}}</text><view class"scoreId&quo…...

cordova Xcode打包ios以及发布流程(ionic3适用)

第一步 1、申请iOS证书 2、导入证书到钥匙串 第二步 1、xcode配置iOS证书 1.1用Xcode打开你的项目&#xff08;我的Xcode版本是新版&#xff09; 修改如下图 回到基本信息设置界面&#xff0c;Bundie 这项填写&#xff0c;最先创建的那个appid&#xff0c;跟创建iOS描述文件时选…...

idea中的.idea文件夹以及*.iml文件(新版idea没有*.iml文件了),新旧版idea打开同一个项目会不会出现不兼容

一、背景 我们有可能会在同一台电脑上安装2个 intellj idea。比如一个community edition一个ultimate edition&#xff08;一个安装板一个绿色解压版&#xff09; 当然了&#xff0c;两个idea之间可能版本号也会有差。 这篇文章就来讨论两个问题&#xff0c;一是关于idea产生…...

高性能网络编程 - The C10K problem 以及 网络编程技术角度的解决思路

文章目录 C10KC10K的由来C10K问题在技术层面的典型体现C10K问题的本质C10K解决思路思路一&#xff1a;每个进程/线程处理一个连接思路二&#xff1a;每个进程/线程同时处理多个连接&#xff08;IO多路复用&#xff09;● 实现方式1&#xff1a;直接循环处理多个连接● 实现方式…...

uniapp u-tabs表单如何默认选中

首先先了解该组件&#xff1b;该组件&#xff0c;是一个tabs标签组件&#xff0c;在标签多的时候&#xff0c;可以配置为左右滑动&#xff0c;标签少的时候&#xff0c;可以禁止滑动。 该组件的一个特点是配置为滚动模式时&#xff0c;激活的tab会自动移动到组件的中间位置。 …...

2023年腾讯云双11活动入口在哪里?

2023年双11腾讯云推出了11.11大促优惠活动&#xff0c;下面给大家分享腾讯云双11活动入口、活动时间、活动详情&#xff0c;希望可以助力大家轻松上云&#xff01; 一、腾讯云双11活动入口 活动地址&#xff1a;点此直达 二、腾讯云双11活动时间 腾讯云双11活动时间跨度很长…...

Windows 下编译 TensorFlow 2.12.0 CC库

大体参考 Windows 下编译 TensorFlow 2.9.1 CC库-CSDN博客 这个版本不完整&#xff0c;需要从 TensorFlow 2.14.0 根目录复制 WORKSPACE 覆盖原同名文件&#xff0c;还需要复制TensorFlow 2.14.0 的 tensorflow\tools\toolchains\python 到相同目录。...

Spring Boot 中自动装配机制的原理

&#xff08;摘自mic老师面试题&#xff09; 最近一个粉丝说&#xff0c;他面试了 4 个公司&#xff0c;有三个公司问他&#xff1a;“Spring Boot 中自动装配 机制的原理” 他回答了&#xff0c;感觉没回答错误&#xff0c;但是怎么就没给 offer 呢&#xff1f; 对于这个问题…...

如何安装Wnmp并结合内网穿透实现外网访问内网Wnmp服务

文章目录 前言1.Wnmp下载安装2.Wnmp设置3.安装cpolar内网穿透3.1 注册账号3.2 下载cpolar客户端3.3 登录cpolar web ui管理界面3.4 创建公网地址 4.固定公网地址访问 前言 WNMP是Windows系统下的绿色NginxMysqlPHP环境集成套件包&#xff0c;安装完成后即可得到一个Nginx MyS…...

网工内推 | 上市公司,云平台运维,IP认证优先,13薪

01 上海新炬网络信息技术股份有限公司 招聘岗位&#xff1a;云平台运维工程师 职责描述&#xff1a; 1、负责云平台运维&#xff0c;包括例行巡检、版本发布、问题及故障处理、平台重保等&#xff0c;保障平台全年稳定运行&#xff1b; 2、参与制定运维标准规范与流程&#x…...

Linux安装DMETL4

Linux安装DMETL4 产品与环境介绍1 规划安装路径2 DM8安装路径2.1 达梦数据库程序安装路径2.2 初始化达梦数据库2.3 创建数据库用户名 DMETL 3 安装DMETL3.1 查看安装包与授权3.2 安装DMETL程序3.3 DMETL安装日志 4 启动DMETL5 DMETL连接数据库后会自动创建相关资源表6 达梦数据…...

Python中编码声明的方法

编码声明主要是为了解决编码问题。由于Python 3的默认编码是UTF-8&#xff0c;因此在使用Python 3编写源代码时&#xff0c;通常不需要在文件开头添加编码声明。但是&#xff0c;如果您使用的编码不是UTF-8&#xff0c;则需要在文件开头添加编码声明&#xff0c;以确保Python解…...

css设置浏览器表单自动填充时的背景

浏览器自动填充表单内容&#xff0c;会自动设置背景色。对于一般的用户&#xff0c;也许不会觉得有什么&#xff0c;但对于要求比较严格的用户&#xff0c;就会“指手画脚”。这里&#xff0c;我们通过css属性来设置浏览器填充背景的过渡时间&#xff0c;使用户看不到过渡后的背…...

设计模式和设计原则回顾

设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

简易版抽奖活动的设计技术方案

1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...

【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】

1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件&#xff08;System Property Definition File&#xff09;&#xff0c;用于声明和管理 Bluetooth 模块相…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

是否存在路径(FIFOBB算法)

题目描述 一个具有 n 个顶点e条边的无向图&#xff0c;该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序&#xff0c;确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数&#xff0c;分别表示n 和 e 的值&#xff08;1…...

python执行测试用例,allure报乱码且未成功生成报告

allure执行测试用例时显示乱码&#xff1a;‘allure’ &#xfffd;&#xfffd;&#xfffd;&#xfffd;&#xfffd;ڲ&#xfffd;&#xfffd;&#xfffd;&#xfffd;ⲿ&#xfffd;&#xfffd;&#xfffd;Ҳ&#xfffd;&#xfffd;&#xfffd;ǿ&#xfffd;&am…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习

禁止商业或二改转载&#xff0c;仅供自学使用&#xff0c;侵权必究&#xff0c;如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

MFC 抛体运动模拟:常见问题解决与界面美化

在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域&#xff0c;无损检测&#xff08;NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统&#xff0c;以非接触式光学麦克风技术为核心&#xff0c;打破传统检测瓶颈&#xff0c;为半导体、航空航天、汽车制造等行业提供了高灵敏…...

微服务通信安全:深入解析mTLS的原理与实践

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、引言&#xff1a;微服务时代的通信安全挑战 随着云原生和微服务架构的普及&#xff0c;服务间的通信安全成为系统设计的核心议题。传统的单体架构中&…...