大数据-玩转数据-Flink 自定义Sink(Mysql)
一、说明
如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考
mysql-玩转数据-centos7下mysql的安装
创建表
CREATE TABLE `sensor` (`id` int(10)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
二、pom.xml 导入驱动
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>
三、编写程序
package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkMysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value.intValue();}});keyedStream.addSink(new MysqlSink());env.execute();}public static class MysqlSink extends RichSinkFunction<Integer>{private Connection sunbo;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");sunbo = DriverManager.getConnection("jdbc:mysql://192.168.220.100:3306/test?useSSL=false", "sunbo", "Mysql123456#");}@Overridepublic void close() throws Exception {if (sunbo != null) {sunbo.close();}}@Overridepublic void invoke(Integer value, Context context) throws Exception {String sql = "insert into sensor(id)values(?)";PreparedStatement ps = sunbo.prepareStatement(sql);ps.setInt(1,value.intValue());ps.execute();ps.close();}}
}
四、运行测试

相关文章:
大数据-玩转数据-Flink 自定义Sink(Mysql)
一、说明 如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考 mysql-玩转数据-centos7下mysql的安装 创建表 CREATE TABLE sensor (id int(10) ) ENGINEInnoDB DEFAULT CHARSETutf8二…...
linux17 线程安全 线程同步
1、线程安全: 多线程程序无论调度顺序如何,都能保证程序 的正确性,就说该程序处于线程安全的状态 1)、同步 2)、线程安全函数//有的函数不适合多线程使用,是函数自身的原因。 2、线程安全函数 1&#…...
lvs集群与nat模式
一,什么是集群: 集群,群集,Cluster,由多台主机构成,但是对外只表现为一个整体,只提供一个访问入口(域名与ip地址),相当于一台大型计算机。 二,集…...
【开源分享】在线客服系统搭建-基于php和swoole客服系统CRMchat(附源码完整搭建教程)...
CRMChat是一款开源的在线客服系统,后台管理使用thinkphp框架,消息通讯使用swoole扩展,现在我来部署搭建一下。 这是一款不可商用的开源客服系统,如果有商用需求可以访问我的网站:gofly.v1kf.com 域名解析 以阿里云为例…...
Webpact学习笔记记录
Webpact学习笔记记录 一.初始化项目1.生成package.json2.安装webpack3.执行webpack体验 二、webpack的配置文件三、less-loader解析less1.安装loader2.配置 四、eslint-loader语法检查1.安装loader2.配置loader3.在package.json中加入 五、js语法转换1.安装loader2.配置loader …...
Python代码实现解析MULTIPOLYGON几何对象类型数据为嵌套列表
MULTIPOLYGON MULTIPOLYGON是一种地理信息系统(GIS)中的几何对象类型,用于表示由多个多边形组成的复杂地理区域。它是一种多边形的集合,每个多边形可以是简单的凸多边形或复杂的凹多边形。 MULTIPOLYGON类型的几何对象通常用于描…...
SSH连接工具汇总
xshell 这是个熟悉的软件啦,目前我正在使用Xshell_7 链接:https://www.xshell.com/zh/xshell/ FinalShell 国产软件,有windows和MAC版本;使用方便而且免费,但是软件比较占用内存。但是都2021年了,笔记本…...
Java的AQS框架是如何支撑起整个并发库的
如何设计一个抽象队列同步器 引言AQS需要解决哪些场景下的问题互斥模式获取锁抢锁失败入队 释放锁小总结 共享模式获取共享资源释放共享资源唤醒丢失问题 小总结 混合模式获取写锁释放写锁获取读锁读锁是否应该阻塞 释放读锁小总结 栅栏模式等待递减计数 条件变量模式等待条件成…...
一.net core 自动化发布到docker (Jenkins安装)
目录 1.安装Jenkins 参考资料:https://www.jenkins.io/doc/book/installing/docker/#downloading-and-running-jenkins-in-docker 1.Open up a terminal window.(打开一个终端窗口。) 2.Create a bridge network in Docker using the following docker network create comma…...
二刷LeetCode--148. 排序链表(C++版本),必会题,思维题
思路,本题其实考察了两个点:合并链表、链表切分。首先从1开始,将链表切成一段一段,因为需要使用归并,所以下一次的切分长度应该是当前切分长度的二倍,每次切分,我们拿出两段,然后将第…...
css flex 上下结构布局
display: flex; flex-flow: column; justify-content: space-between;...
win下qwidget全屏弹窗后其他窗口鼠标样式无法更新的问题
在win平台下,实现截取选桌面执行推理功能,用一个qwidget(j对象名为m_selectWidget)来显示选取范围的边框,但这个qwidget显示后,其他窗口在他下面可以接受鼠标相应的事件,但原来的鼠标形状功能失效(mac正常&…...
Java【数据结构】二分查找
🌞 题目: 🌏在有序数组A中,查找目标值target 🌏如果找到返回索引 🌏如果找不到返回-1 算法描述解释前提给定一个内含n个元素的有序数组A,满足A0<A1<A2<<An-1,一个待查值target1设…...
数据库技术--数据库引擎,数据访问接口及其关系详解(附加形象的比喻)
目录 背景数据库引擎Jet数据库:ISAM:ODBC(Open Database Connectivity): 数据访问接口ADO(ActiveX Data Objects)DAO(Data Access Objects)RDO(Remote Data O…...
【BASH】回顾与知识点梳理(三十三)
【BASH】回顾与知识点梳理 三十三 三十三. 认识系统服务 (daemons)33.1 什么是 daemon 与服务 (service)早期 System V 的 init 管理行为中 daemon 的主要分类 (Optional)systemd 使用的 unit 分类systemd 的配置文件放置目录systemd 的 unit 类型分类说明 33.2 透过 systemctl…...
同步请求和异步请求
同步请求和异步请求是在网络编程中常用的两种通信模式,它们有以下区别: 同步请求: 在发送一个请求后,程序会一直等待服务器返回响应,期间无法进行其他操作。请求发出后,程序会阻塞在请求处,直…...
Transformer是什么,Transformer应用
目录 Transformer应用 Transformer是什么 Transformer应用:循环神经网络 语言翻译:注重语句前后顺序 RNN看中单个特征; CNN:看中特征之间时序性 模型关注不同位置的能力 Transformer是什么 Transformer是一个利用注意力机制来提高模型训练速度的模型。关于注意力机…...
故障011:dmap服务缺失libnsl.so修复
故障011:dmap服务缺失libnsl.so修复 1. 问题描述2. 解决方法2.1 初步分析2.2 动手实操2.2.1 模糊搜索大法2.2.2 僵桃代李大法 DM技术交流QQ群:940124259 1. 问题描述 今天遇二期XC环境,达梦DM 7.6的DmAPService备份辅助进程服务无法启动&a…...
第十三章 SpringBoot项目(总)
1.创建SpringBoot项目 1.1.设置编码 1.4.导入已有的spring boot项目 2.快速搭建Restfull风格的项目 2.1.返回字符串 RestController public class IndexController {RequestMapping("/demo1")public Object demo1() {System.out.println("demo1 ran...."…...
利用Python隧道爬虫ip轻松构建全局爬虫网络
嘿,爬虫程序员们!你们有没有碰到过需要大规模数据爬取的情况?也许你们之前遇到过网站的反爬措施,卡住你们的进度。别担心,今天我来分享一个利用Python隧道爬虫ip实现的方法,帮助你们轻松搭建全局爬虫ip网络…...
League Akari:基于LCU API的现代化英雄联盟客户端工具集
League Akari:基于LCU API的现代化英雄联盟客户端工具集 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 在英雄联盟游…...
告别卡顿闪烁!在Cesium 1.134中集成SOG格式,让400万高斯秒级加载
突破性能瓶颈:Cesium 1.134集成SOG格式实现400万高斯秒级渲染 在三维地理空间可视化领域,Cesium一直是开发者构建高精度场景的首选引擎。但当项目涉及数百万级高斯泼溅数据时,传统加载方式往往导致令人崩溃的卡顿和视角移动时的闪烁问题。最近…...
Gemma-3 Pixel Studio镜像免配置:开箱即用的12B多模态推理工作站
Gemma-3 Pixel Studio镜像免配置:开箱即用的12B多模态推理工作站 1. 产品概览 Gemma-3 Pixel Studio是基于Google最新开源Gemma-3-12b-it模型构建的高性能多模态对话终端。这个预配置的Docker镜像消除了复杂的部署流程,让用户能够立即体验12B参数大模型…...
Qwen3.5-4B-Claude-Opus高性能推理教程:Q4_K_M量化下GPU吞吐量实测分析
Qwen3.5-4B-Claude-Opus高性能推理教程:Q4_K_M量化下GPU吞吐量实测分析 1. 模型概述 Qwen3.5-4B-Claude-4.6-Opus-Reasoning-Distilled-GGUF是基于Qwen3.5-4B架构的推理蒸馏模型,特别强化了结构化分析、分步骤回答以及代码与逻辑类问题的处理能力。该版…...
2026Agent元年!手把手教你从0到1搭建高能智能体,小白也能秒变大神!
逼自己练完这些,你的Agent搭建就很牛了!!2026年可谓是Agent元年,智能体(AI Agent)正以惊人的速度重塑我们的工作方式,从简单的被动响应工具,进化为能自主规划、执行、协作的"数…...
【CPython 3.13无锁并发白皮书】:全球首批实测团队披露的4类典型崩溃场景与修复参数
第一章:Python 无锁 GIL 环境下的并发模型配置概览Python 的全局解释器锁(GIL)本质上限制了 CPython 中多线程对 CPU 密集型任务的并行执行能力。然而,“无锁 GIL 环境”并非指 GIL 被移除,而是指通过绕过 GIL 依赖的并…...
# Kafka 消息队列实战指南
大数据开发核心技能:Kafka 架构原理、生产者消费者配置、Spark/Flink 集成、消息积压处理、数据一致性保障、生产环境案例,从 0 到 1 掌握企业级消息队列📌 前言 真实生产问题 问题场景: 某电商公司数据平台遇到的问题:…...
Umi-OCR插件技术方案:5款引擎深度对比与实战配置指南
Umi-OCR插件技术方案:5款引擎深度对比与实战配置指南 【免费下载链接】Umi-OCR_plugins Umi-OCR 插件库 项目地址: https://gitcode.com/gh_mirrors/um/Umi-OCR_plugins Umi-OCR插件库为开源OCR工具提供了丰富的引擎选择,从本地CPU加速到云端AI识…...
23种设计模式 - 组合模式(Composite)
组合模式(Composite)—— 文件夹套文件夹,统一操作 大白话解释 你的电脑里: 📄 文件(不能再包含东西)📁 文件夹(可以装文件,也可以装文件夹) &…...
OpenClaw语音控制:nanobot对接Whisper实现声控自动化
OpenClaw语音控制:nanobot对接Whisper实现声控自动化 1. 为什么需要语音控制自动化 作为一个长期与命令行打交道的开发者,我一直在寻找更自然的交互方式。键盘输入固然高效,但在某些场景下——比如双手被占用时调试代码、厨房里边做饭边查菜…...
