大数据-玩转数据-Flink 自定义Sink(Mysql)
一、说明
如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考
mysql-玩转数据-centos7下mysql的安装
创建表
CREATE TABLE `sensor` (`id` int(10)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
二、pom.xml 导入驱动
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>
三、编写程序
package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkMysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value.intValue();}});keyedStream.addSink(new MysqlSink());env.execute();}public static class MysqlSink extends RichSinkFunction<Integer>{private Connection sunbo;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");sunbo = DriverManager.getConnection("jdbc:mysql://192.168.220.100:3306/test?useSSL=false", "sunbo", "Mysql123456#");}@Overridepublic void close() throws Exception {if (sunbo != null) {sunbo.close();}}@Overridepublic void invoke(Integer value, Context context) throws Exception {String sql = "insert into sensor(id)values(?)";PreparedStatement ps = sunbo.prepareStatement(sql);ps.setInt(1,value.intValue());ps.execute();ps.close();}}
}
四、运行测试

相关文章:
大数据-玩转数据-Flink 自定义Sink(Mysql)
一、说明 如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考 mysql-玩转数据-centos7下mysql的安装 创建表 CREATE TABLE sensor (id int(10) ) ENGINEInnoDB DEFAULT CHARSETutf8二…...
linux17 线程安全 线程同步
1、线程安全: 多线程程序无论调度顺序如何,都能保证程序 的正确性,就说该程序处于线程安全的状态 1)、同步 2)、线程安全函数//有的函数不适合多线程使用,是函数自身的原因。 2、线程安全函数 1&#…...
lvs集群与nat模式
一,什么是集群: 集群,群集,Cluster,由多台主机构成,但是对外只表现为一个整体,只提供一个访问入口(域名与ip地址),相当于一台大型计算机。 二,集…...
【开源分享】在线客服系统搭建-基于php和swoole客服系统CRMchat(附源码完整搭建教程)...
CRMChat是一款开源的在线客服系统,后台管理使用thinkphp框架,消息通讯使用swoole扩展,现在我来部署搭建一下。 这是一款不可商用的开源客服系统,如果有商用需求可以访问我的网站:gofly.v1kf.com 域名解析 以阿里云为例…...
Webpact学习笔记记录
Webpact学习笔记记录 一.初始化项目1.生成package.json2.安装webpack3.执行webpack体验 二、webpack的配置文件三、less-loader解析less1.安装loader2.配置 四、eslint-loader语法检查1.安装loader2.配置loader3.在package.json中加入 五、js语法转换1.安装loader2.配置loader …...
Python代码实现解析MULTIPOLYGON几何对象类型数据为嵌套列表
MULTIPOLYGON MULTIPOLYGON是一种地理信息系统(GIS)中的几何对象类型,用于表示由多个多边形组成的复杂地理区域。它是一种多边形的集合,每个多边形可以是简单的凸多边形或复杂的凹多边形。 MULTIPOLYGON类型的几何对象通常用于描…...
SSH连接工具汇总
xshell 这是个熟悉的软件啦,目前我正在使用Xshell_7 链接:https://www.xshell.com/zh/xshell/ FinalShell 国产软件,有windows和MAC版本;使用方便而且免费,但是软件比较占用内存。但是都2021年了,笔记本…...
Java的AQS框架是如何支撑起整个并发库的
如何设计一个抽象队列同步器 引言AQS需要解决哪些场景下的问题互斥模式获取锁抢锁失败入队 释放锁小总结 共享模式获取共享资源释放共享资源唤醒丢失问题 小总结 混合模式获取写锁释放写锁获取读锁读锁是否应该阻塞 释放读锁小总结 栅栏模式等待递减计数 条件变量模式等待条件成…...
一.net core 自动化发布到docker (Jenkins安装)
目录 1.安装Jenkins 参考资料:https://www.jenkins.io/doc/book/installing/docker/#downloading-and-running-jenkins-in-docker 1.Open up a terminal window.(打开一个终端窗口。) 2.Create a bridge network in Docker using the following docker network create comma…...
二刷LeetCode--148. 排序链表(C++版本),必会题,思维题
思路,本题其实考察了两个点:合并链表、链表切分。首先从1开始,将链表切成一段一段,因为需要使用归并,所以下一次的切分长度应该是当前切分长度的二倍,每次切分,我们拿出两段,然后将第…...
css flex 上下结构布局
display: flex; flex-flow: column; justify-content: space-between;...
win下qwidget全屏弹窗后其他窗口鼠标样式无法更新的问题
在win平台下,实现截取选桌面执行推理功能,用一个qwidget(j对象名为m_selectWidget)来显示选取范围的边框,但这个qwidget显示后,其他窗口在他下面可以接受鼠标相应的事件,但原来的鼠标形状功能失效(mac正常&…...
Java【数据结构】二分查找
🌞 题目: 🌏在有序数组A中,查找目标值target 🌏如果找到返回索引 🌏如果找不到返回-1 算法描述解释前提给定一个内含n个元素的有序数组A,满足A0<A1<A2<<An-1,一个待查值target1设…...
数据库技术--数据库引擎,数据访问接口及其关系详解(附加形象的比喻)
目录 背景数据库引擎Jet数据库:ISAM:ODBC(Open Database Connectivity): 数据访问接口ADO(ActiveX Data Objects)DAO(Data Access Objects)RDO(Remote Data O…...
【BASH】回顾与知识点梳理(三十三)
【BASH】回顾与知识点梳理 三十三 三十三. 认识系统服务 (daemons)33.1 什么是 daemon 与服务 (service)早期 System V 的 init 管理行为中 daemon 的主要分类 (Optional)systemd 使用的 unit 分类systemd 的配置文件放置目录systemd 的 unit 类型分类说明 33.2 透过 systemctl…...
同步请求和异步请求
同步请求和异步请求是在网络编程中常用的两种通信模式,它们有以下区别: 同步请求: 在发送一个请求后,程序会一直等待服务器返回响应,期间无法进行其他操作。请求发出后,程序会阻塞在请求处,直…...
Transformer是什么,Transformer应用
目录 Transformer应用 Transformer是什么 Transformer应用:循环神经网络 语言翻译:注重语句前后顺序 RNN看中单个特征; CNN:看中特征之间时序性 模型关注不同位置的能力 Transformer是什么 Transformer是一个利用注意力机制来提高模型训练速度的模型。关于注意力机…...
故障011:dmap服务缺失libnsl.so修复
故障011:dmap服务缺失libnsl.so修复 1. 问题描述2. 解决方法2.1 初步分析2.2 动手实操2.2.1 模糊搜索大法2.2.2 僵桃代李大法 DM技术交流QQ群:940124259 1. 问题描述 今天遇二期XC环境,达梦DM 7.6的DmAPService备份辅助进程服务无法启动&a…...
第十三章 SpringBoot项目(总)
1.创建SpringBoot项目 1.1.设置编码 1.4.导入已有的spring boot项目 2.快速搭建Restfull风格的项目 2.1.返回字符串 RestController public class IndexController {RequestMapping("/demo1")public Object demo1() {System.out.println("demo1 ran...."…...
利用Python隧道爬虫ip轻松构建全局爬虫网络
嘿,爬虫程序员们!你们有没有碰到过需要大规模数据爬取的情况?也许你们之前遇到过网站的反爬措施,卡住你们的进度。别担心,今天我来分享一个利用Python隧道爬虫ip实现的方法,帮助你们轻松搭建全局爬虫ip网络…...
极致精简,功能强大的PDF编辑工具
这是一款功能全面的PDF编辑工具 你只需要导入一份PDF格式文件 就可以快速的对它进行插入 批注编辑保护转换等各种操作 而且无需登录 也可以直接使用 在插入选项中可以进行插入文字图片 页面页眉页脚页码文档背景水印视频音频等 在批注选项中可以管理批注隐藏批注 高亮显示 文本…...
力扣HOT100(30)两两交换链表中的节点
链表的交换要注意 “链表不断链”。前驱和后继都要连着迭代法(必学死磕!O (n) 时间,O (1) 空间)1. 为什么必须用虚拟头节点?因为交换后链表的头节点会变! 比如示例 1 中,原来的头是 1࿰…...
从开题到定稿零焦虑:okbiye AI 论文写作,帮你把毕业季的 “大山” 变成坦途
okbiye-免费查重复率aigc检测/开题报告/毕业论文/智能排版/文献综述/AI PPT毕业论文 - Okbiye智能写作https://www.okbiye.com/ai/bylw 毕业季的深夜,宿舍台灯下的屏幕亮着刺眼的光,文档里的字数停留在三位数,而 deadline 正一天天逼近。你是…...
具身智能:面向新兴交叉学科建设的思考与建议 2026
这份由 CCF YOCSEF 长三角五地学术委员会 2026 年 5 月发布的白皮书,聚焦具身智能作为新兴交叉学科的建设,明确其并非 AI 与机器人学的简单拼接,而是围绕物理交互中的智能行为形成的新问题域,提出 “三大基本问题 一个应用需求”…...
Elden Ring帧率解锁终极指南:从60帧到144+的完整教程
Elden Ring帧率解锁终极指南:从60帧到144的完整教程 【免费下载链接】EldenRingFpsUnlockAndMore A small utility to remove frame rate limit, change FOV, add widescreen support and more for Elden Ring 项目地址: https://gitcode.com/gh_mirrors/el/Elden…...
DIY智能USB充电器:基于电流检测与双稳态继电器的零功耗节能方案
1. 项目概述:打造一款智能、节能的USB手机充电器作为一名电子爱好者,我经常折腾各种电源项目。市面上很多手机充电器,包括一些原装货,都存在一个通病:手机充满电后,充电器依然插在插座上,内部电…...
微信红包助手终极指南:无需ROOT的智能抢红包解决方案
微信红包助手终极指南:无需ROOT的智能抢红包解决方案 【免费下载链接】WeChatLuckyMoney :money_with_wings: WeChats lucky money helper (微信抢红包插件) by Zhongyi Tong. An Android app that helps you snatch red packets in WeChat groups. 项目地址: ht…...
Nacos CVE-2021-29441漏洞深度解析:User-Agent绕过与鉴权失效
1. 这个漏洞不是“改个Header就能登录”,而是Nacos鉴权体系的一道裂缝CVE-2021-29441这个编号在Nacos社区里曾被轻描淡写地归为“低危”,直到我接手一个金融客户线上告警——他们的Nacos集群在凌晨三点被批量创建了37个高权限用户,所有操作日…...
抖音批量下载助手:一键构建你的专属视频素材库
抖音批量下载助手:一键构建你的专属视频素材库 【免费下载链接】douyinhelper 抖音批量下载助手 项目地址: https://gitcode.com/gh_mirrors/do/douyinhelper 还在为手动保存抖音视频而烦恼吗?想要批量获取心仪创作者的精彩内容却无从下手&#x…...
数字合成器d-FORMANT:从模拟经典到数字复刻的工程实践
1. 项目概述:从模拟经典到数字复刻如果你对合成器稍有了解,或者对电子音乐制作背后的硬件感兴趣,那么“FORMANT”这个名字你一定不陌生。它最初是上世纪70年代由《Elektor》杂志发布的一款模拟单音合成器,以其清晰的模块化设计和出…...
