当前位置: 首页 > news >正文

大数据-玩转数据-Flink 自定义Sink(Mysql)

一、说明

如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考
mysql-玩转数据-centos7下mysql的安装
创建表

CREATE TABLE `sensor` (`id` int(10)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

二、pom.xml 导入驱动

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>

三、编写程序

package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkMysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value.intValue();}});keyedStream.addSink(new MysqlSink());env.execute();}public static class  MysqlSink extends RichSinkFunction<Integer>{private Connection sunbo;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");sunbo = DriverManager.getConnection("jdbc:mysql://192.168.220.100:3306/test?useSSL=false", "sunbo", "Mysql123456#");}@Overridepublic void close() throws Exception {if (sunbo != null) {sunbo.close();}}@Overridepublic void invoke(Integer value, Context context) throws Exception {String sql = "insert into sensor(id)values(?)";PreparedStatement ps = sunbo.prepareStatement(sql);ps.setInt(1,value.intValue());ps.execute();ps.close();}}
}

四、运行测试

在这里插入图片描述

相关文章:

大数据-玩转数据-Flink 自定义Sink(Mysql)

一、说明 如果Flink没有提供给我们可以直接使用的连接器&#xff0c;那我们如果想将数据存储到我们自己的存储设备中&#xff0c;mysql 的安装使用请参考 mysql-玩转数据-centos7下mysql的安装 创建表 CREATE TABLE sensor (id int(10) ) ENGINEInnoDB DEFAULT CHARSETutf8二…...

linux17 线程安全 线程同步

1、线程安全&#xff1a; 多线程程序无论调度顺序如何&#xff0c;都能保证程序 的正确性&#xff0c;就说该程序处于线程安全的状态 1&#xff09;、同步 2&#xff09;、线程安全函数//有的函数不适合多线程使用&#xff0c;是函数自身的原因。 2、线程安全函数 1&#…...

lvs集群与nat模式

一&#xff0c;什么是集群&#xff1a; 集群&#xff0c;群集&#xff0c;Cluster&#xff0c;由多台主机构成&#xff0c;但是对外只表现为一个整体&#xff0c;只提供一个访问入口&#xff08;域名与ip地址&#xff09;&#xff0c;相当于一台大型计算机。 二&#xff0c;集…...

【开源分享】在线客服系统搭建-基于php和swoole客服系统CRMchat(附源码完整搭建教程)...

CRMChat是一款开源的在线客服系统&#xff0c;后台管理使用thinkphp框架&#xff0c;消息通讯使用swoole扩展&#xff0c;现在我来部署搭建一下。 这是一款不可商用的开源客服系统&#xff0c;如果有商用需求可以访问我的网站&#xff1a;gofly.v1kf.com 域名解析 以阿里云为例…...

Webpact学习笔记记录

Webpact学习笔记记录 一.初始化项目1.生成package.json2.安装webpack3.执行webpack体验 二、webpack的配置文件三、less-loader解析less1.安装loader2.配置 四、eslint-loader语法检查1.安装loader2.配置loader3.在package.json中加入 五、js语法转换1.安装loader2.配置loader …...

Python代码实现解析MULTIPOLYGON几何对象类型数据为嵌套列表

MULTIPOLYGON MULTIPOLYGON是一种地理信息系统&#xff08;GIS&#xff09;中的几何对象类型&#xff0c;用于表示由多个多边形组成的复杂地理区域。它是一种多边形的集合&#xff0c;每个多边形可以是简单的凸多边形或复杂的凹多边形。 MULTIPOLYGON类型的几何对象通常用于描…...

SSH连接工具汇总

xshell 这是个熟悉的软件啦&#xff0c;目前我正在使用Xshell_7 链接&#xff1a;https://www.xshell.com/zh/xshell/ FinalShell 国产软件&#xff0c;有windows和MAC版本&#xff1b;使用方便而且免费&#xff0c;但是软件比较占用内存。但是都2021年了&#xff0c;笔记本…...

Java的AQS框架是如何支撑起整个并发库的

如何设计一个抽象队列同步器 引言AQS需要解决哪些场景下的问题互斥模式获取锁抢锁失败入队 释放锁小总结 共享模式获取共享资源释放共享资源唤醒丢失问题 小总结 混合模式获取写锁释放写锁获取读锁读锁是否应该阻塞 释放读锁小总结 栅栏模式等待递减计数 条件变量模式等待条件成…...

一.net core 自动化发布到docker (Jenkins安装)

目录 1.安装Jenkins 参考资料:https://www.jenkins.io/doc/book/installing/docker/#downloading-and-running-jenkins-in-docker 1.Open up a terminal window.(打开一个终端窗口。) 2.Create a bridge network in Docker using the following docker network create comma…...

二刷LeetCode--148. 排序链表(C++版本),必会题,思维题

思路&#xff0c;本题其实考察了两个点&#xff1a;合并链表、链表切分。首先从1开始&#xff0c;将链表切成一段一段&#xff0c;因为需要使用归并&#xff0c;所以下一次的切分长度应该是当前切分长度的二倍&#xff0c;每次切分&#xff0c;我们拿出两段&#xff0c;然后将第…...

css flex 上下结构布局

display: flex; flex-flow: column; justify-content: space-between;...

win下qwidget全屏弹窗后其他窗口鼠标样式无法更新的问题

在win平台下&#xff0c;实现截取选桌面执行推理功能&#xff0c;用一个qwidget(j对象名为m_selectWidget)来显示选取范围的边框&#xff0c;但这个qwidget显示后&#xff0c;其他窗口在他下面可以接受鼠标相应的事件&#xff0c;但原来的鼠标形状功能失效&#xff08;mac正常&…...

Java【数据结构】二分查找

&#x1f31e; 题目&#xff1a; &#x1f30f;在有序数组A中&#xff0c;查找目标值target &#x1f30f;如果找到返回索引 &#x1f30f;如果找不到返回-1 算法描述解释前提给定一个内含n个元素的有序数组A&#xff0c;满足A0<A1<A2<<An-1,一个待查值target1设…...

数据库技术--数据库引擎,数据访问接口及其关系详解(附加形象的比喻)

目录 背景数据库引擎Jet数据库&#xff1a;ISAM&#xff1a;ODBC&#xff08;Open Database Connectivity&#xff09;&#xff1a; 数据访问接口ADO&#xff08;ActiveX Data Objects&#xff09;DAO&#xff08;Data Access Objects&#xff09;RDO&#xff08;Remote Data O…...

【BASH】回顾与知识点梳理(三十三)

【BASH】回顾与知识点梳理 三十三 三十三. 认识系统服务 (daemons)33.1 什么是 daemon 与服务 (service)早期 System V 的 init 管理行为中 daemon 的主要分类 (Optional)systemd 使用的 unit 分类systemd 的配置文件放置目录systemd 的 unit 类型分类说明 33.2 透过 systemctl…...

同步请求和异步请求

同步请求和异步请求是在网络编程中常用的两种通信模式&#xff0c;它们有以下区别&#xff1a; 同步请求&#xff1a; 在发送一个请求后&#xff0c;程序会一直等待服务器返回响应&#xff0c;期间无法进行其他操作。请求发出后&#xff0c;程序会阻塞在请求处&#xff0c;直…...

Transformer是什么,Transformer应用

目录 Transformer应用 Transformer是什么 Transformer应用:循环神经网络 语言翻译:注重语句前后顺序 RNN看中单个特征; CNN:看中特征之间时序性 模型关注不同位置的能力 Transformer是什么 Transformer是一个利用注意力机制来提高模型训练速度的模型。关于注意力机…...

故障011:dmap服务缺失libnsl.so修复

故障011&#xff1a;dmap服务缺失libnsl.so修复 1. 问题描述2. 解决方法2.1 初步分析2.2 动手实操2.2.1 模糊搜索大法2.2.2 僵桃代李大法 DM技术交流QQ群&#xff1a;940124259 1. 问题描述 今天遇二期XC环境&#xff0c;达梦DM 7.6的DmAPService备份辅助进程服务无法启动&a…...

第十三章 SpringBoot项目(总)

1.创建SpringBoot项目 1.1.设置编码 1.4.导入已有的spring boot项目 2.快速搭建Restfull风格的项目 2.1.返回字符串 RestController public class IndexController {RequestMapping("/demo1")public Object demo1() {System.out.println("demo1 ran...."…...

利用Python隧道爬虫ip轻松构建全局爬虫网络

嘿&#xff0c;爬虫程序员们&#xff01;你们有没有碰到过需要大规模数据爬取的情况&#xff1f;也许你们之前遇到过网站的反爬措施&#xff0c;卡住你们的进度。别担心&#xff0c;今天我来分享一个利用Python隧道爬虫ip实现的方法&#xff0c;帮助你们轻松搭建全局爬虫ip网络…...

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

【杂谈】-递归进化:人工智能的自我改进与监管挑战

递归进化&#xff1a;人工智能的自我改进与监管挑战 文章目录 递归进化&#xff1a;人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管&#xff1f;3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

css实现圆环展示百分比,根据值动态展示所占比例

代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...

【决胜公务员考试】求职OMG——见面课测验1

2025最新版&#xff01;&#xff01;&#xff01;6.8截至答题&#xff0c;大家注意呀&#xff01; 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:&#xff08; B &#xff09; A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)

🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

基于Java+MySQL实现(GUI)客户管理系统

客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息&#xff0c;对客户进行统一管理&#xff0c;可以把所有客户信息录入系统&#xff0c;进行维护和统计功能。可通过文件的方式保存相关录入数据&#xff0c;对…...

GitHub 趋势日报 (2025年06月06日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

nnUNet V2修改网络——暴力替换网络为UNet++

更换前,要用nnUNet V2跑通所用数据集,证明nnUNet V2、数据集、运行环境等没有问题 阅读nnU-Net V2 的 U-Net结构,初步了解要修改的网络,知己知彼,修改起来才能游刃有余。 U-Net存在两个局限,一是网络的最佳深度因应用场景而异,这取决于任务的难度和可用于训练的标注数…...