大数据-玩转数据-Flink 自定义Sink(Mysql)
一、说明
如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考
mysql-玩转数据-centos7下mysql的安装
创建表
CREATE TABLE `sensor` (`id` int(10)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
二、pom.xml 导入驱动
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>
三、编写程序
package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkMysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value.intValue();}});keyedStream.addSink(new MysqlSink());env.execute();}public static class MysqlSink extends RichSinkFunction<Integer>{private Connection sunbo;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");sunbo = DriverManager.getConnection("jdbc:mysql://192.168.220.100:3306/test?useSSL=false", "sunbo", "Mysql123456#");}@Overridepublic void close() throws Exception {if (sunbo != null) {sunbo.close();}}@Overridepublic void invoke(Integer value, Context context) throws Exception {String sql = "insert into sensor(id)values(?)";PreparedStatement ps = sunbo.prepareStatement(sql);ps.setInt(1,value.intValue());ps.execute();ps.close();}}
}
四、运行测试
相关文章:

大数据-玩转数据-Flink 自定义Sink(Mysql)
一、说明 如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考 mysql-玩转数据-centos7下mysql的安装 创建表 CREATE TABLE sensor (id int(10) ) ENGINEInnoDB DEFAULT CHARSETutf8二…...

linux17 线程安全 线程同步
1、线程安全: 多线程程序无论调度顺序如何,都能保证程序 的正确性,就说该程序处于线程安全的状态 1)、同步 2)、线程安全函数//有的函数不适合多线程使用,是函数自身的原因。 2、线程安全函数 1&#…...

lvs集群与nat模式
一,什么是集群: 集群,群集,Cluster,由多台主机构成,但是对外只表现为一个整体,只提供一个访问入口(域名与ip地址),相当于一台大型计算机。 二,集…...

【开源分享】在线客服系统搭建-基于php和swoole客服系统CRMchat(附源码完整搭建教程)...
CRMChat是一款开源的在线客服系统,后台管理使用thinkphp框架,消息通讯使用swoole扩展,现在我来部署搭建一下。 这是一款不可商用的开源客服系统,如果有商用需求可以访问我的网站:gofly.v1kf.com 域名解析 以阿里云为例…...

Webpact学习笔记记录
Webpact学习笔记记录 一.初始化项目1.生成package.json2.安装webpack3.执行webpack体验 二、webpack的配置文件三、less-loader解析less1.安装loader2.配置 四、eslint-loader语法检查1.安装loader2.配置loader3.在package.json中加入 五、js语法转换1.安装loader2.配置loader …...

Python代码实现解析MULTIPOLYGON几何对象类型数据为嵌套列表
MULTIPOLYGON MULTIPOLYGON是一种地理信息系统(GIS)中的几何对象类型,用于表示由多个多边形组成的复杂地理区域。它是一种多边形的集合,每个多边形可以是简单的凸多边形或复杂的凹多边形。 MULTIPOLYGON类型的几何对象通常用于描…...

SSH连接工具汇总
xshell 这是个熟悉的软件啦,目前我正在使用Xshell_7 链接:https://www.xshell.com/zh/xshell/ FinalShell 国产软件,有windows和MAC版本;使用方便而且免费,但是软件比较占用内存。但是都2021年了,笔记本…...

Java的AQS框架是如何支撑起整个并发库的
如何设计一个抽象队列同步器 引言AQS需要解决哪些场景下的问题互斥模式获取锁抢锁失败入队 释放锁小总结 共享模式获取共享资源释放共享资源唤醒丢失问题 小总结 混合模式获取写锁释放写锁获取读锁读锁是否应该阻塞 释放读锁小总结 栅栏模式等待递减计数 条件变量模式等待条件成…...

一.net core 自动化发布到docker (Jenkins安装)
目录 1.安装Jenkins 参考资料:https://www.jenkins.io/doc/book/installing/docker/#downloading-and-running-jenkins-in-docker 1.Open up a terminal window.(打开一个终端窗口。) 2.Create a bridge network in Docker using the following docker network create comma…...

二刷LeetCode--148. 排序链表(C++版本),必会题,思维题
思路,本题其实考察了两个点:合并链表、链表切分。首先从1开始,将链表切成一段一段,因为需要使用归并,所以下一次的切分长度应该是当前切分长度的二倍,每次切分,我们拿出两段,然后将第…...

css flex 上下结构布局
display: flex; flex-flow: column; justify-content: space-between;...

win下qwidget全屏弹窗后其他窗口鼠标样式无法更新的问题
在win平台下,实现截取选桌面执行推理功能,用一个qwidget(j对象名为m_selectWidget)来显示选取范围的边框,但这个qwidget显示后,其他窗口在他下面可以接受鼠标相应的事件,但原来的鼠标形状功能失效(mac正常&…...

Java【数据结构】二分查找
🌞 题目: 🌏在有序数组A中,查找目标值target 🌏如果找到返回索引 🌏如果找不到返回-1 算法描述解释前提给定一个内含n个元素的有序数组A,满足A0<A1<A2<<An-1,一个待查值target1设…...

数据库技术--数据库引擎,数据访问接口及其关系详解(附加形象的比喻)
目录 背景数据库引擎Jet数据库:ISAM:ODBC(Open Database Connectivity): 数据访问接口ADO(ActiveX Data Objects)DAO(Data Access Objects)RDO(Remote Data O…...

【BASH】回顾与知识点梳理(三十三)
【BASH】回顾与知识点梳理 三十三 三十三. 认识系统服务 (daemons)33.1 什么是 daemon 与服务 (service)早期 System V 的 init 管理行为中 daemon 的主要分类 (Optional)systemd 使用的 unit 分类systemd 的配置文件放置目录systemd 的 unit 类型分类说明 33.2 透过 systemctl…...

同步请求和异步请求
同步请求和异步请求是在网络编程中常用的两种通信模式,它们有以下区别: 同步请求: 在发送一个请求后,程序会一直等待服务器返回响应,期间无法进行其他操作。请求发出后,程序会阻塞在请求处,直…...

Transformer是什么,Transformer应用
目录 Transformer应用 Transformer是什么 Transformer应用:循环神经网络 语言翻译:注重语句前后顺序 RNN看中单个特征; CNN:看中特征之间时序性 模型关注不同位置的能力 Transformer是什么 Transformer是一个利用注意力机制来提高模型训练速度的模型。关于注意力机…...

故障011:dmap服务缺失libnsl.so修复
故障011:dmap服务缺失libnsl.so修复 1. 问题描述2. 解决方法2.1 初步分析2.2 动手实操2.2.1 模糊搜索大法2.2.2 僵桃代李大法 DM技术交流QQ群:940124259 1. 问题描述 今天遇二期XC环境,达梦DM 7.6的DmAPService备份辅助进程服务无法启动&a…...

第十三章 SpringBoot项目(总)
1.创建SpringBoot项目 1.1.设置编码 1.4.导入已有的spring boot项目 2.快速搭建Restfull风格的项目 2.1.返回字符串 RestController public class IndexController {RequestMapping("/demo1")public Object demo1() {System.out.println("demo1 ran...."…...

利用Python隧道爬虫ip轻松构建全局爬虫网络
嘿,爬虫程序员们!你们有没有碰到过需要大规模数据爬取的情况?也许你们之前遇到过网站的反爬措施,卡住你们的进度。别担心,今天我来分享一个利用Python隧道爬虫ip实现的方法,帮助你们轻松搭建全局爬虫ip网络…...

Spring Clould 网关 - Gateway
视频地址:微服务(SpringCloudRabbitMQDockerRedis搜索分布式) Gateway网关-网关作用介绍(P35) Spring Cloud Gateway 是 Spring Cloud 的一个全新项目,该项目是基于 Spring 5.0,Spring Boot 2…...

PHP使用phpmailer及SMTP服务实现邮件发送
博客升级中,把之前没有想到的功能一点点的完善。 这篇日志记录一下,使用phpmailer实现邮件发送的这样一个操作。 博客偶尔会有留言和评论,我也会及时回复,但是有一个问题,我回复了,给我留言的人如果不再次…...

交换实验一
题目 交换机上接口配置 SW1 interface GigabitEthernet0/0/1 port hybrid tagged vlan 2 port hybrid untagged vlan 3 to 6 interface Ethernet0/0/2 port hybrid pvid vlan 3 port hybrid untagged vlan 2 to 6 interface Ethernet0/0/3 port link-type access port d…...

计算机中丢失MSVCR120.dll,找不到MSVCR120.dll是什么意思?
当计算机中缺少MSVCR120.dll文件时,意味着缺少了Microsoft Visual C Redistributable文件的一个组件。MSVCR120.dll是Visual C Redistributable 2013的动态链接库文件,它是应用程序依赖的重要文件之一。缺少MSVCR120.dll文件可能会导致一些应用程序无法正…...

avue多选列表根据后端返回的某个值去判断是否选中;avue-curd多选回显
效果如上: getSiteList().then(res > {//列表数据this.siteData res.data.datathis.$nextTick(()>{this.siteData.forEach(item>{//业务条件if(item.configid&&item.configid!0&&item.configid>0){//符合条件时调用选中的方法this.$…...

Vue2中根据权限添加动态路由
Vue2中根据权限添加动态路由 大概记录一下主要代码 1.根据后端返回的路由列表生成左侧菜单(后端返回的数据结构中用id和pid来区别包含关系) 大概结构如下: 2.前端需要处理成包含children的树形结构 //动态生成菜单 export const gener…...

搭建 Python 环境 | Python、PyCharm
计算机 计算机能完成的工作: 算术运算逻辑判断数据存储网络通信…更多的更复杂的任务 以下这些都可以称为 “计算机”: 一台计算机主要由以下这几个重要的组件构成 CPU 中央处理器:大脑,算术运算,逻辑判断 存储器&…...

NPOI 读取和写入Excel
在C#中使用NPOI库读取和写入Excel文件,你需要先下载并安装NPOI库。你可以在NuGet管理器中搜索NPOI并进行安装。 以下是一个使用NPOI库进行Excel文件读取和写入的示例: 读取Excel文件: using NPOI.SS.UserModel; using NPOI.XSSF.UserModel…...

Linux工具【2】(调试器gdb、项目自动化构建工具make/Makefile)
gdb、make/Makefile 引言调试器gdb介绍常用指令 自动化构建工具make/Makefile介绍使用依赖关系与依赖方法编辑Makefile伪目标 总结 引言 在上一篇文章中介绍了Linux中的编辑器vim与编译器gcc与g: 戳我看vim与gcc详解哦 在本篇文章中将继续来介绍Linux中的工具&…...

C++ 网络编程项目fastDFS分布式文件系统(三)-Nginx部分
目录 1. 一些基本概念 1.1 Nginx初步认识 1.2 正向/反向代理 1.3 域名和IP 2. Nginx 安装和配置 2.1 安装 2.2 配置 3. Nginx的使用 3.1 部署静态网页 3.2 反向代理和负载均衡 4 课外知识导读 1. URL和URI 编辑 2. DNS解析过程 1. 一些基本概念 1.1 Nginx初步认…...