当前位置: 首页 > news >正文

Flink将数据写入MySQL(JDBC)

一、写在前面

在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。

二、代码示例

2.1 版本说明

        <flink.version>1.14.6</flink.version><spark.version>2.4.3</spark.version><hadoop.version>2.8.5</hadoop.version><hbase.version>1.4.9</hbase.version><hive.version>2.3.5</hive.version><java.version>1.8</java.version><scala.version>2.11.8</scala.version><mysql.version>8.0.22</mysql.version><scala.binary.version>2.11</scala.binary.version>

2.2 导入相关依赖

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version>
</dependency>
<!--mysql连接器依赖-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.22</version>
</dependency>

2.3 连接数据库,创建表

mysql> CREATE TABLE `ws` ( `id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8

2.4 创建POJO类

package com.flink.POJOs;import java.util.Objects;/*** TODO POJO类的特点* 类是公有(public)的* 有一个无参的构造方法* 所有属性都是公有(public)的* 所有属性的类型都是可以序列化的*/
public class WaterSensor {//类的公共属性public String id;public Long ts;public Integer vc;//无参构造方法public WaterSensor() {//System.out.println("调用了无参数的构造方法");}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}//生成get和set方法public void setId(String id) {this.id = id;}public void setTs(Long ts) {this.ts = ts;}public void setVc(Integer vc) {this.vc = vc;}public String getId() {return id;}public Long getTs() {return ts;}public Integer getVc() {return vc;}//重写toString方法@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}//重写equals和hasCode方法@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;WaterSensor that = (WaterSensor) o;return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}
//scala的case类?

2.5 自定义map函数

package com.flink.POJOs;import org.apache.flink.api.common.functions.MapFunction;public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}
}

2.5 Flink2MySQL

package com.flink.DataStream.Sink;import com.flink.POJOs.WaterSensor;
import com.flink.POJOs.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** Flink 输出到 MySQL(JDBC)*/
public class flinkSinkJdbc {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);//TODO SourceDataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO TransferSingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());/**TODO 写入 mysql* 1、只能用老的 sink 写法* 2、JDBCSink 的 4 个参数:*   第一个参数: 执行的 sql,一般就是 insert into*   第二个参数: 预编译 sql, 对占位符填充值*   第三个参数: 执行选项 ---->攒批、重试*   第四个参数: 连接选项---->url、用户名、密码*/SinkFunction<WaterSensor> sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");}}, JdbcExecutionOptions.builder().withMaxRetries(3)         // 重试次数.withBatchSize(100)        // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("********").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());//TODO 写入到MysqlwaterSensorSingleOutputStreamOperator.addSink(sinkFunction);streamExecutionEnvironment.execute();}
}

2.6 启动necat、Flink,观察数据库写入情况

nc -lk 9999 #启动necat、并监听8888端口,写入数据

在这里插入图片描述
启动Flink程序
在这里插入图片描述
查看数据库写入是否正常
在这里插入图片描述

相关文章:

Flink将数据写入MySQL(JDBC)

一、写在前面 在实际的生产环境中&#xff0c;我们经常会把Flink处理的数据写入MySQL、Doris等数据库中&#xff0c;下面以MySQL为例&#xff0c;使用JDBC的方式将Flink的数据实时数据写入MySQL。 二、代码示例 2.1 版本说明 <flink.version>1.14.6</flink.version…...

react-typescript-demo

1.使用 Context 来存储数据...

Alexon:在云原生环境中快速部署应用服务

Alexon是一个旨在快速部署WEB应用服务到分布式系统中的工具&#xff0c;适用于云原生环境。 Alexon由SymeCloud Limited(syme.dev) 发布&#xff0c;使用GNU Guile编写而成&#xff0c;支持函数编程概念。 SymeCloud 公司主要致力于 AI-Infra 方面的研发&#xff0c;从 OpenAI …...

5G技术在职业教育领域的应用:产生巨变的技术

5G技术在职业教育领域的应用&#xff1a;产生巨变的技术 职业教育领域正面临着前所未有的挑战和机遇。随着5G技术的快速发展和普及&#xff0c;其高速度、低延迟、大容量和连接数的特性给职业教育带来了革命性的改变。本文将深入探讨5G技术在职业教育领域的应用场景、技术原理和…...

【触想智能】工控一体机与5G物联网技术结合是未来发展趋势

工控一体机也叫工业电脑一体机&#xff0c;是工业应用非常重要的一种产品。目前&#xff0c;工控一体机在工业领域的应用已经非常普及&#xff0c;在繁忙的生产车间、数字化机床、自助服务终端设备等场景中&#xff0c;我们都有看到它的身影。 工控一体机应用的普及已经潜移默化…...

LuatOS-SOC接口文档(air780E)--lvgl - LVGL图像库

lvgl.draw_mask_radius_param_t() 创建一个lv_draw_mask_radius_param_t 参数 无 返回值 返回值类型 解释 userdata lv_draw_mask_radius_param_t指针 例子 local radius lvgl.draw_mask_radius_param_t()lvgl.draw_mask_radius_param_t_free(radius) 释放一个lv_d…...

LuatOS-SOC接口文档(air780E)--lora2 - lora2驱动模块(支持多挂)

常量 常量 类型 解释 lora2.SLEEP number SLEEP模式 lora2.STANDBY number STANDBY模式 lora2.init(ic, loraconfig,spiconfig) lora初始化 参数 传入值类型 解释 string lora 型号&#xff0c;当前支持&#xff1a; llcc68 sx1268 table lora配置参数,与具体…...

WKWebView iOS17设置UserAgent

WKWebView 设置 user-agent 参考文档 之前设置 user-agent 都是通过设置NSUserDefaults来实现的&#xff0c;不过升级到了iOS17之后这个方式不好用了。 老的设置方式&#xff1a; [[NSUserDefaults standardUserDefaults] registerDefaults:dictionnary];目前看通过设置 we…...

持续集成部署-k8s-服务发现-Service

持续集成部署-k8s-服务发现-Service:配置讲解及基础命令 1. Service 简介2. 基础命令3. 基于 Service 访问外部服务4. 代理外部域名5. Endpoints 常用类型1. Service 简介 在K8s中,Service 是一种可以暴露一个或多个Pod的稳定的网络终点,从而形成逻辑上的应用服务单元,为服…...

RocksDB基本架构与原理详解

Rocksdb Flink提供基于流的有状态计算&#xff0c;除了提供实时数据流的处理能力&#xff0c;还需要将计算产生的状态存储起来。 为了满足状态存取需求&#xff0c;提供了memory、flie system、rocksdb三种类型的状态存储机制。 memory存取高效单空间有限&#xff0c;且可用…...

ArcGIS笔记12_ArcGIS搜索工具没法用?ArcGIS运行很慢很卡?

本文目录 前言Step 1 ArcGIS搜索工具没法用Step 2 ArcGIS运行很慢很卡 前言 这是笔者最近遇到的两个小问题&#xff0c;新换了台式机&#xff0c;安装上ArcGIS后发现搜索工具没法用&#xff0c;而且感觉还不如原来笔记本运行的流畅&#xff0c;加载图层很慢&#xff0c;编辑要…...

【VictoriaMetrics】单机版配置

为方便查看,释义都已翻译成中文,本文配置基于VictoriaMetrics 1.87.1版本 bigMergeConcurrencyint用于大合并的最大 CPU 核数。设置为 0 时使用默认值cacheExpireDuration30m0s...

【C语言】strcpy()函数

&#x1f984;个人主页:修修修也 &#x1f38f;所属专栏:C语言 ⚙️操作环境:Visual Studio 2022 目录 一.strcpy()函数简介 1.函数功能 2.函数参数 1>.char * destination 2>.const char * source 3.函数返回值 4.函数头文件 二.strcpy()函数的具体使用 1.使用s…...

C++基础算法⑦——信奥一本通递归算法(放苹果、求最大公约数问题、2的幂次方表示、分数求和、因子分解、判断元素是否存在)

递归算法 1206&#xff1a;放苹果1207&#xff1a;求最大公约数问题1208&#xff1a;2的幂次方表示1209&#xff1a;分数求和1210&#xff1a;因子分解1211&#xff1a;判断元素是否存在 1206&#xff1a;放苹果 这道题还是有些难度的&#xff0c;我们要考虑几种放苹果的情况。…...

uni-app医院智能导诊系统源码

随着科技的迅速发展&#xff0c;人工智能已经逐渐渗透到我们生活的各个领域。在医疗行业中&#xff0c;智能导诊系统成为了一个备受关注的应用。本文将详细介绍智能导诊系统的概念、技术原理以及在医疗领域中的应用&#xff0c;分析其优势和未来发展趋势。 智能导诊系统通过人工…...

启动jar时指定nacos配置

背景 由于需要在不同服务上部署应用&#xff0c;避免频繁打包&#xff0c;需要在jar启动时灵活配置naocs配置 启动命令 java -Xms256m -Xmx512m -Dfile.encodingutf-8 -jar mes-gateway-1.0.1.jar --spring.cloud.nacos.discovery.server-addrhttp://127.0.0.1:8848 --spri…...

linux安装vscode vscode使用 创建项目并运行

下载 https://code.visualstudio.com/ 下载.deb文件 安装 假如文件被下载到了 /opt目录下 进入Opt目录&#xff0c;右键从当前目录打开终端。 输入下面的安装命令。 sudo apt-get install ./code_1.83.1-1696982868_amd64.deb 安装成功。 安装插件 使用c&#xff0c;必…...

如何解决数据倾斜

星光下的赶路人star的个人主页 臣书刷字墨淋漓&#xff0c;舒卷烟云势最奇 文章目录 1、数据倾斜的现象2、解决办法2.1 单表聚合&#xff08;group bysum()&#xff09;2.2 多表关联&#xff08;join&#xff09; 3、倾斜原因 1、数据倾斜的现象 部分Reduce一直运行&#xff0…...

宏定义实现offsetof

在C语言中&#xff0c;有这样一个特殊的宏&#xff0c;叫offsetof&#xff0c;它的功能是啥呢&#xff1f; 我们来看看它的介绍 它的功能是&#xff1a;返回一个结构体的成员的大小&#xff08;相较于起始地址的偏移量&#xff09; 引用代码&#xff1a;http://t.csdnimg.cn…...

YOLOv5— Fruit Detection

&#x1f368; 本文为[&#x1f517;365天深度学习训练营学习记录博客 &#x1f366; 参考文章&#xff1a;365天深度学习训练营-第7周&#xff1a;咖啡豆识别&#xff08;训练营内部成员可读&#xff09; &#x1f356; 原作者&#xff1a;[K同学啊 | 接辅导、项目定制](https…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?

在建筑行业&#xff0c;项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升&#xff0c;传统的管理模式已经难以满足现代工程的需求。过去&#xff0c;许多企业依赖手工记录、口头沟通和分散的信息管理&#xff0c;导致效率低下、成本失控、风险频发。例如&#…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

重启Eureka集群中的节点,对已经注册的服务有什么影响

先看答案&#xff0c;如果正确地操作&#xff0c;重启Eureka集群中的节点&#xff0c;对已经注册的服务影响非常小&#xff0c;甚至可以做到无感知。 但如果操作不当&#xff0c;可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...

中医有效性探讨

文章目录 西医是如何发展到以生物化学为药理基础的现代医学&#xff1f;传统医学奠基期&#xff08;远古 - 17 世纪&#xff09;近代医学转型期&#xff08;17 世纪 - 19 世纪末&#xff09;​现代医学成熟期&#xff08;20世纪至今&#xff09; 中医的源远流长和一脉相承远古至…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域&#xff0c;无损检测&#xff08;NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统&#xff0c;以非接触式光学麦克风技术为核心&#xff0c;打破传统检测瓶颈&#xff0c;为半导体、航空航天、汽车制造等行业提供了高灵敏…...

微服务通信安全:深入解析mTLS的原理与实践

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、引言&#xff1a;微服务时代的通信安全挑战 随着云原生和微服务架构的普及&#xff0c;服务间的通信安全成为系统设计的核心议题。传统的单体架构中&…...

以太网PHY布局布线指南

1. 简介 对于以太网布局布线遵循以下准则很重要&#xff0c;因为这将有助于减少信号发射&#xff0c;最大程度地减少噪声&#xff0c;确保器件作用&#xff0c;最大程度地减少泄漏并提高信号质量。 2. PHY设计准则 2.1 DRC错误检查 首先检查DRC规则是否设置正确&#xff0c;然…...

World-writable config file /etc/mysql/mysql.conf.d/my.cnf is ignored

https://stackoverflow.com/questions/53741107/mysql-in-docker-on-ubuntu-warning-world-writable-config-file-is-ignored 修改权限 -> 重启mysql # 检查字符集配置 SHOW VARIABLES WHERE Variable_name IN (character_set_server, character_set_database ); --------…...