当前位置: 首页 > news >正文

大数据-玩转数据-Flink 自定义Sink(Mysql)

一、说明

如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考
mysql-玩转数据-centos7下mysql的安装
创建表

CREATE TABLE `sensor` (`id` int(10)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

二、pom.xml 导入驱动

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>

三、编写程序

package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkMysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value.intValue();}});keyedStream.addSink(new MysqlSink());env.execute();}public static class  MysqlSink extends RichSinkFunction<Integer>{private Connection sunbo;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");sunbo = DriverManager.getConnection("jdbc:mysql://192.168.220.100:3306/test?useSSL=false", "sunbo", "Mysql123456#");}@Overridepublic void close() throws Exception {if (sunbo != null) {sunbo.close();}}@Overridepublic void invoke(Integer value, Context context) throws Exception {String sql = "insert into sensor(id)values(?)";PreparedStatement ps = sunbo.prepareStatement(sql);ps.setInt(1,value.intValue());ps.execute();ps.close();}}
}

四、运行测试

在这里插入图片描述

相关文章:

大数据-玩转数据-Flink 自定义Sink(Mysql)

一、说明 如果Flink没有提供给我们可以直接使用的连接器&#xff0c;那我们如果想将数据存储到我们自己的存储设备中&#xff0c;mysql 的安装使用请参考 mysql-玩转数据-centos7下mysql的安装 创建表 CREATE TABLE sensor (id int(10) ) ENGINEInnoDB DEFAULT CHARSETutf8二…...

linux17 线程安全 线程同步

1、线程安全&#xff1a; 多线程程序无论调度顺序如何&#xff0c;都能保证程序 的正确性&#xff0c;就说该程序处于线程安全的状态 1&#xff09;、同步 2&#xff09;、线程安全函数//有的函数不适合多线程使用&#xff0c;是函数自身的原因。 2、线程安全函数 1&#…...

lvs集群与nat模式

一&#xff0c;什么是集群&#xff1a; 集群&#xff0c;群集&#xff0c;Cluster&#xff0c;由多台主机构成&#xff0c;但是对外只表现为一个整体&#xff0c;只提供一个访问入口&#xff08;域名与ip地址&#xff09;&#xff0c;相当于一台大型计算机。 二&#xff0c;集…...

【开源分享】在线客服系统搭建-基于php和swoole客服系统CRMchat(附源码完整搭建教程)...

CRMChat是一款开源的在线客服系统&#xff0c;后台管理使用thinkphp框架&#xff0c;消息通讯使用swoole扩展&#xff0c;现在我来部署搭建一下。 这是一款不可商用的开源客服系统&#xff0c;如果有商用需求可以访问我的网站&#xff1a;gofly.v1kf.com 域名解析 以阿里云为例…...

Webpact学习笔记记录

Webpact学习笔记记录 一.初始化项目1.生成package.json2.安装webpack3.执行webpack体验 二、webpack的配置文件三、less-loader解析less1.安装loader2.配置 四、eslint-loader语法检查1.安装loader2.配置loader3.在package.json中加入 五、js语法转换1.安装loader2.配置loader …...

Python代码实现解析MULTIPOLYGON几何对象类型数据为嵌套列表

MULTIPOLYGON MULTIPOLYGON是一种地理信息系统&#xff08;GIS&#xff09;中的几何对象类型&#xff0c;用于表示由多个多边形组成的复杂地理区域。它是一种多边形的集合&#xff0c;每个多边形可以是简单的凸多边形或复杂的凹多边形。 MULTIPOLYGON类型的几何对象通常用于描…...

SSH连接工具汇总

xshell 这是个熟悉的软件啦&#xff0c;目前我正在使用Xshell_7 链接&#xff1a;https://www.xshell.com/zh/xshell/ FinalShell 国产软件&#xff0c;有windows和MAC版本&#xff1b;使用方便而且免费&#xff0c;但是软件比较占用内存。但是都2021年了&#xff0c;笔记本…...

Java的AQS框架是如何支撑起整个并发库的

如何设计一个抽象队列同步器 引言AQS需要解决哪些场景下的问题互斥模式获取锁抢锁失败入队 释放锁小总结 共享模式获取共享资源释放共享资源唤醒丢失问题 小总结 混合模式获取写锁释放写锁获取读锁读锁是否应该阻塞 释放读锁小总结 栅栏模式等待递减计数 条件变量模式等待条件成…...

一.net core 自动化发布到docker (Jenkins安装)

目录 1.安装Jenkins 参考资料:https://www.jenkins.io/doc/book/installing/docker/#downloading-and-running-jenkins-in-docker 1.Open up a terminal window.(打开一个终端窗口。) 2.Create a bridge network in Docker using the following docker network create comma…...

二刷LeetCode--148. 排序链表(C++版本),必会题,思维题

思路&#xff0c;本题其实考察了两个点&#xff1a;合并链表、链表切分。首先从1开始&#xff0c;将链表切成一段一段&#xff0c;因为需要使用归并&#xff0c;所以下一次的切分长度应该是当前切分长度的二倍&#xff0c;每次切分&#xff0c;我们拿出两段&#xff0c;然后将第…...

css flex 上下结构布局

display: flex; flex-flow: column; justify-content: space-between;...

win下qwidget全屏弹窗后其他窗口鼠标样式无法更新的问题

在win平台下&#xff0c;实现截取选桌面执行推理功能&#xff0c;用一个qwidget(j对象名为m_selectWidget)来显示选取范围的边框&#xff0c;但这个qwidget显示后&#xff0c;其他窗口在他下面可以接受鼠标相应的事件&#xff0c;但原来的鼠标形状功能失效&#xff08;mac正常&…...

Java【数据结构】二分查找

&#x1f31e; 题目&#xff1a; &#x1f30f;在有序数组A中&#xff0c;查找目标值target &#x1f30f;如果找到返回索引 &#x1f30f;如果找不到返回-1 算法描述解释前提给定一个内含n个元素的有序数组A&#xff0c;满足A0<A1<A2<<An-1,一个待查值target1设…...

数据库技术--数据库引擎,数据访问接口及其关系详解(附加形象的比喻)

目录 背景数据库引擎Jet数据库&#xff1a;ISAM&#xff1a;ODBC&#xff08;Open Database Connectivity&#xff09;&#xff1a; 数据访问接口ADO&#xff08;ActiveX Data Objects&#xff09;DAO&#xff08;Data Access Objects&#xff09;RDO&#xff08;Remote Data O…...

【BASH】回顾与知识点梳理(三十三)

【BASH】回顾与知识点梳理 三十三 三十三. 认识系统服务 (daemons)33.1 什么是 daemon 与服务 (service)早期 System V 的 init 管理行为中 daemon 的主要分类 (Optional)systemd 使用的 unit 分类systemd 的配置文件放置目录systemd 的 unit 类型分类说明 33.2 透过 systemctl…...

同步请求和异步请求

同步请求和异步请求是在网络编程中常用的两种通信模式&#xff0c;它们有以下区别&#xff1a; 同步请求&#xff1a; 在发送一个请求后&#xff0c;程序会一直等待服务器返回响应&#xff0c;期间无法进行其他操作。请求发出后&#xff0c;程序会阻塞在请求处&#xff0c;直…...

Transformer是什么,Transformer应用

目录 Transformer应用 Transformer是什么 Transformer应用:循环神经网络 语言翻译:注重语句前后顺序 RNN看中单个特征; CNN:看中特征之间时序性 模型关注不同位置的能力 Transformer是什么 Transformer是一个利用注意力机制来提高模型训练速度的模型。关于注意力机…...

故障011:dmap服务缺失libnsl.so修复

故障011&#xff1a;dmap服务缺失libnsl.so修复 1. 问题描述2. 解决方法2.1 初步分析2.2 动手实操2.2.1 模糊搜索大法2.2.2 僵桃代李大法 DM技术交流QQ群&#xff1a;940124259 1. 问题描述 今天遇二期XC环境&#xff0c;达梦DM 7.6的DmAPService备份辅助进程服务无法启动&a…...

第十三章 SpringBoot项目(总)

1.创建SpringBoot项目 1.1.设置编码 1.4.导入已有的spring boot项目 2.快速搭建Restfull风格的项目 2.1.返回字符串 RestController public class IndexController {RequestMapping("/demo1")public Object demo1() {System.out.println("demo1 ran...."…...

利用Python隧道爬虫ip轻松构建全局爬虫网络

嘿&#xff0c;爬虫程序员们&#xff01;你们有没有碰到过需要大规模数据爬取的情况&#xff1f;也许你们之前遇到过网站的反爬措施&#xff0c;卡住你们的进度。别担心&#xff0c;今天我来分享一个利用Python隧道爬虫ip实现的方法&#xff0c;帮助你们轻松搭建全局爬虫ip网络…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地

借阿里云中企出海大会的东风&#xff0c;以**「云启出海&#xff0c;智联未来&#xff5c;打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办&#xff0c;现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...

基础测试工具使用经验

背景 vtune&#xff0c;perf, nsight system等基础测试工具&#xff0c;都是用过的&#xff0c;但是没有记录&#xff0c;都逐渐忘了。所以写这篇博客总结记录一下&#xff0c;只要以后发现新的用法&#xff0c;就记得来编辑补充一下 perf 比较基础的用法&#xff1a; 先改这…...

蓝桥杯3498 01串的熵

问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798&#xff0c; 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

LeetCode - 199. 二叉树的右视图

题目 199. 二叉树的右视图 - 力扣&#xff08;LeetCode&#xff09; 思路 右视图是指从树的右侧看&#xff0c;对于每一层&#xff0c;只能看到该层最右边的节点。实现思路是&#xff1a; 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

C++.OpenGL (14/64)多光源(Multiple Lights)

多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

搭建DNS域名解析服务器(正向解析资源文件)

正向解析资源文件 1&#xff09;准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2&#xff09;服务端安装软件&#xff1a;bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...

CSS | transition 和 transform的用处和区别

省流总结&#xff1a; transform用于变换/变形&#xff0c;transition是动画控制器 transform 用来对元素进行变形&#xff0c;常见的操作如下&#xff0c;它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...