当前位置: 首页 > news >正文

大数据-玩转数据-Flink 自定义Sink(Mysql)

一、说明

如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考
mysql-玩转数据-centos7下mysql的安装
创建表

CREATE TABLE `sensor` (`id` int(10)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

二、pom.xml 导入驱动

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>

三、编写程序

package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkMysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value.intValue();}});keyedStream.addSink(new MysqlSink());env.execute();}public static class  MysqlSink extends RichSinkFunction<Integer>{private Connection sunbo;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");sunbo = DriverManager.getConnection("jdbc:mysql://192.168.220.100:3306/test?useSSL=false", "sunbo", "Mysql123456#");}@Overridepublic void close() throws Exception {if (sunbo != null) {sunbo.close();}}@Overridepublic void invoke(Integer value, Context context) throws Exception {String sql = "insert into sensor(id)values(?)";PreparedStatement ps = sunbo.prepareStatement(sql);ps.setInt(1,value.intValue());ps.execute();ps.close();}}
}

四、运行测试

在这里插入图片描述

相关文章:

大数据-玩转数据-Flink 自定义Sink(Mysql)

一、说明 如果Flink没有提供给我们可以直接使用的连接器&#xff0c;那我们如果想将数据存储到我们自己的存储设备中&#xff0c;mysql 的安装使用请参考 mysql-玩转数据-centos7下mysql的安装 创建表 CREATE TABLE sensor (id int(10) ) ENGINEInnoDB DEFAULT CHARSETutf8二…...

linux17 线程安全 线程同步

1、线程安全&#xff1a; 多线程程序无论调度顺序如何&#xff0c;都能保证程序 的正确性&#xff0c;就说该程序处于线程安全的状态 1&#xff09;、同步 2&#xff09;、线程安全函数//有的函数不适合多线程使用&#xff0c;是函数自身的原因。 2、线程安全函数 1&#…...

lvs集群与nat模式

一&#xff0c;什么是集群&#xff1a; 集群&#xff0c;群集&#xff0c;Cluster&#xff0c;由多台主机构成&#xff0c;但是对外只表现为一个整体&#xff0c;只提供一个访问入口&#xff08;域名与ip地址&#xff09;&#xff0c;相当于一台大型计算机。 二&#xff0c;集…...

【开源分享】在线客服系统搭建-基于php和swoole客服系统CRMchat(附源码完整搭建教程)...

CRMChat是一款开源的在线客服系统&#xff0c;后台管理使用thinkphp框架&#xff0c;消息通讯使用swoole扩展&#xff0c;现在我来部署搭建一下。 这是一款不可商用的开源客服系统&#xff0c;如果有商用需求可以访问我的网站&#xff1a;gofly.v1kf.com 域名解析 以阿里云为例…...

Webpact学习笔记记录

Webpact学习笔记记录 一.初始化项目1.生成package.json2.安装webpack3.执行webpack体验 二、webpack的配置文件三、less-loader解析less1.安装loader2.配置 四、eslint-loader语法检查1.安装loader2.配置loader3.在package.json中加入 五、js语法转换1.安装loader2.配置loader …...

Python代码实现解析MULTIPOLYGON几何对象类型数据为嵌套列表

MULTIPOLYGON MULTIPOLYGON是一种地理信息系统&#xff08;GIS&#xff09;中的几何对象类型&#xff0c;用于表示由多个多边形组成的复杂地理区域。它是一种多边形的集合&#xff0c;每个多边形可以是简单的凸多边形或复杂的凹多边形。 MULTIPOLYGON类型的几何对象通常用于描…...

SSH连接工具汇总

xshell 这是个熟悉的软件啦&#xff0c;目前我正在使用Xshell_7 链接&#xff1a;https://www.xshell.com/zh/xshell/ FinalShell 国产软件&#xff0c;有windows和MAC版本&#xff1b;使用方便而且免费&#xff0c;但是软件比较占用内存。但是都2021年了&#xff0c;笔记本…...

Java的AQS框架是如何支撑起整个并发库的

如何设计一个抽象队列同步器 引言AQS需要解决哪些场景下的问题互斥模式获取锁抢锁失败入队 释放锁小总结 共享模式获取共享资源释放共享资源唤醒丢失问题 小总结 混合模式获取写锁释放写锁获取读锁读锁是否应该阻塞 释放读锁小总结 栅栏模式等待递减计数 条件变量模式等待条件成…...

一.net core 自动化发布到docker (Jenkins安装)

目录 1.安装Jenkins 参考资料:https://www.jenkins.io/doc/book/installing/docker/#downloading-and-running-jenkins-in-docker 1.Open up a terminal window.(打开一个终端窗口。) 2.Create a bridge network in Docker using the following docker network create comma…...

二刷LeetCode--148. 排序链表(C++版本),必会题,思维题

思路&#xff0c;本题其实考察了两个点&#xff1a;合并链表、链表切分。首先从1开始&#xff0c;将链表切成一段一段&#xff0c;因为需要使用归并&#xff0c;所以下一次的切分长度应该是当前切分长度的二倍&#xff0c;每次切分&#xff0c;我们拿出两段&#xff0c;然后将第…...

css flex 上下结构布局

display: flex; flex-flow: column; justify-content: space-between;...

win下qwidget全屏弹窗后其他窗口鼠标样式无法更新的问题

在win平台下&#xff0c;实现截取选桌面执行推理功能&#xff0c;用一个qwidget(j对象名为m_selectWidget)来显示选取范围的边框&#xff0c;但这个qwidget显示后&#xff0c;其他窗口在他下面可以接受鼠标相应的事件&#xff0c;但原来的鼠标形状功能失效&#xff08;mac正常&…...

Java【数据结构】二分查找

&#x1f31e; 题目&#xff1a; &#x1f30f;在有序数组A中&#xff0c;查找目标值target &#x1f30f;如果找到返回索引 &#x1f30f;如果找不到返回-1 算法描述解释前提给定一个内含n个元素的有序数组A&#xff0c;满足A0<A1<A2<<An-1,一个待查值target1设…...

数据库技术--数据库引擎,数据访问接口及其关系详解(附加形象的比喻)

目录 背景数据库引擎Jet数据库&#xff1a;ISAM&#xff1a;ODBC&#xff08;Open Database Connectivity&#xff09;&#xff1a; 数据访问接口ADO&#xff08;ActiveX Data Objects&#xff09;DAO&#xff08;Data Access Objects&#xff09;RDO&#xff08;Remote Data O…...

【BASH】回顾与知识点梳理(三十三)

【BASH】回顾与知识点梳理 三十三 三十三. 认识系统服务 (daemons)33.1 什么是 daemon 与服务 (service)早期 System V 的 init 管理行为中 daemon 的主要分类 (Optional)systemd 使用的 unit 分类systemd 的配置文件放置目录systemd 的 unit 类型分类说明 33.2 透过 systemctl…...

同步请求和异步请求

同步请求和异步请求是在网络编程中常用的两种通信模式&#xff0c;它们有以下区别&#xff1a; 同步请求&#xff1a; 在发送一个请求后&#xff0c;程序会一直等待服务器返回响应&#xff0c;期间无法进行其他操作。请求发出后&#xff0c;程序会阻塞在请求处&#xff0c;直…...

Transformer是什么,Transformer应用

目录 Transformer应用 Transformer是什么 Transformer应用:循环神经网络 语言翻译:注重语句前后顺序 RNN看中单个特征; CNN:看中特征之间时序性 模型关注不同位置的能力 Transformer是什么 Transformer是一个利用注意力机制来提高模型训练速度的模型。关于注意力机…...

故障011:dmap服务缺失libnsl.so修复

故障011&#xff1a;dmap服务缺失libnsl.so修复 1. 问题描述2. 解决方法2.1 初步分析2.2 动手实操2.2.1 模糊搜索大法2.2.2 僵桃代李大法 DM技术交流QQ群&#xff1a;940124259 1. 问题描述 今天遇二期XC环境&#xff0c;达梦DM 7.6的DmAPService备份辅助进程服务无法启动&a…...

第十三章 SpringBoot项目(总)

1.创建SpringBoot项目 1.1.设置编码 1.4.导入已有的spring boot项目 2.快速搭建Restfull风格的项目 2.1.返回字符串 RestController public class IndexController {RequestMapping("/demo1")public Object demo1() {System.out.println("demo1 ran...."…...

利用Python隧道爬虫ip轻松构建全局爬虫网络

嘿&#xff0c;爬虫程序员们&#xff01;你们有没有碰到过需要大规模数据爬取的情况&#xff1f;也许你们之前遇到过网站的反爬措施&#xff0c;卡住你们的进度。别担心&#xff0c;今天我来分享一个利用Python隧道爬虫ip实现的方法&#xff0c;帮助你们轻松搭建全局爬虫ip网络…...

League Akari:基于LCU API的现代化英雄联盟客户端工具集

League Akari&#xff1a;基于LCU API的现代化英雄联盟客户端工具集 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 在英雄联盟游…...

告别卡顿闪烁!在Cesium 1.134中集成SOG格式,让400万高斯秒级加载

突破性能瓶颈&#xff1a;Cesium 1.134集成SOG格式实现400万高斯秒级渲染 在三维地理空间可视化领域&#xff0c;Cesium一直是开发者构建高精度场景的首选引擎。但当项目涉及数百万级高斯泼溅数据时&#xff0c;传统加载方式往往导致令人崩溃的卡顿和视角移动时的闪烁问题。最近…...

Gemma-3 Pixel Studio镜像免配置:开箱即用的12B多模态推理工作站

Gemma-3 Pixel Studio镜像免配置&#xff1a;开箱即用的12B多模态推理工作站 1. 产品概览 Gemma-3 Pixel Studio是基于Google最新开源Gemma-3-12b-it模型构建的高性能多模态对话终端。这个预配置的Docker镜像消除了复杂的部署流程&#xff0c;让用户能够立即体验12B参数大模型…...

Qwen3.5-4B-Claude-Opus高性能推理教程:Q4_K_M量化下GPU吞吐量实测分析

Qwen3.5-4B-Claude-Opus高性能推理教程&#xff1a;Q4_K_M量化下GPU吞吐量实测分析 1. 模型概述 Qwen3.5-4B-Claude-4.6-Opus-Reasoning-Distilled-GGUF是基于Qwen3.5-4B架构的推理蒸馏模型&#xff0c;特别强化了结构化分析、分步骤回答以及代码与逻辑类问题的处理能力。该版…...

2026Agent元年!手把手教你从0到1搭建高能智能体,小白也能秒变大神!

逼自己练完这些&#xff0c;你的Agent搭建就很牛了&#xff01;&#xff01;2026年可谓是Agent元年&#xff0c;智能体&#xff08;AI Agent&#xff09;正以惊人的速度重塑我们的工作方式&#xff0c;从简单的被动响应工具&#xff0c;进化为能自主规划、执行、协作的"数…...

【CPython 3.13无锁并发白皮书】:全球首批实测团队披露的4类典型崩溃场景与修复参数

第一章&#xff1a;Python 无锁 GIL 环境下的并发模型配置概览Python 的全局解释器锁&#xff08;GIL&#xff09;本质上限制了 CPython 中多线程对 CPU 密集型任务的并行执行能力。然而&#xff0c;“无锁 GIL 环境”并非指 GIL 被移除&#xff0c;而是指通过绕过 GIL 依赖的并…...

# Kafka 消息队列实战指南

大数据开发核心技能&#xff1a;Kafka 架构原理、生产者消费者配置、Spark/Flink 集成、消息积压处理、数据一致性保障、生产环境案例&#xff0c;从 0 到 1 掌握企业级消息队列&#x1f4cc; 前言 真实生产问题 问题场景&#xff1a; 某电商公司数据平台遇到的问题&#xff1a…...

Umi-OCR插件技术方案:5款引擎深度对比与实战配置指南

Umi-OCR插件技术方案&#xff1a;5款引擎深度对比与实战配置指南 【免费下载链接】Umi-OCR_plugins Umi-OCR 插件库 项目地址: https://gitcode.com/gh_mirrors/um/Umi-OCR_plugins Umi-OCR插件库为开源OCR工具提供了丰富的引擎选择&#xff0c;从本地CPU加速到云端AI识…...

23种设计模式 - 组合模式(Composite)

组合模式&#xff08;Composite&#xff09;—— 文件夹套文件夹&#xff0c;统一操作 大白话解释 你的电脑里&#xff1a; &#x1f4c4; 文件&#xff08;不能再包含东西&#xff09;&#x1f4c1; 文件夹&#xff08;可以装文件&#xff0c;也可以装文件夹&#xff09; &…...

OpenClaw语音控制:nanobot对接Whisper实现声控自动化

OpenClaw语音控制&#xff1a;nanobot对接Whisper实现声控自动化 1. 为什么需要语音控制自动化 作为一个长期与命令行打交道的开发者&#xff0c;我一直在寻找更自然的交互方式。键盘输入固然高效&#xff0c;但在某些场景下——比如双手被占用时调试代码、厨房里边做饭边查菜…...