Kafka与Flink的整合 -- sink、source
1、首先导入依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency>
2、 source:Flink从Kafka中读取数据
public class Demo01KafkaSource {public static void main(String[] args) throws Exception{//构建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//构建kafka source 环境KafkaSource<String> source = KafkaSource.<String>builder()//指定broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定topic.setTopics("bigdata")//指定消费组.setGroupId("my-group")//指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据.setStartingOffsets(OffsetsInitializer.earliest())//读取数据格式:.setValueOnlyDeserializer(new SimpleStringSchema()).build();//使用kafka数据源DataStreamSource<String> kafkaSourceDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafkaSourceDS.print();//启动flinkenv.execute();}
}
启动生产kafka:
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
3、sink:Flink向Kafka中写入数据
public class Demo02KafkaSink {public static void main(String[] args) throws Exception{//构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//读取数据文件:DataStreamSource<String> studentDS = env.readTextFile("flink/data/students.txt");//创建kafka sinkKafkaSink<String> sink = KafkaSink.<String>builder()//指定flink broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定数据的格式:.setRecordSerializer(KafkaRecordSerializationSchema.builder()//指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic.setTopic("student")//指定数据的格式.setValueSerializationSchema(new SimpleStringSchema()).build())//指定数据处理的语义:.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//执行flinkstudentDS.sinkTo(sink);//构建flink环境env.execute();}
}
启动消费kafka:
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic student
相关文章:
Kafka与Flink的整合 -- sink、source
1、首先导入依赖: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency> 2、 source:Flink从Kafka中读取数据 p…...
小鱼ROS
git clone git clone https://ghproxy.com/https://github.com/stilleshan/ServerStatus git clone 私有仓库 Clone 私有仓库需要用户在 Personal access tokens 申请 Token 配合使用.git clone https://user:your_tokenghproxy.com/https://github.com/your_name/your_priv…...
简单讲讲RISC-V跳转指令基于具体场景的实现
背景 在 RISC-V指令集中,一共有 6 条有条件跳转指令,分别是 beq、bne、blt、bltu、bge、bgeu。如下是它们的定义与接口 BEQ rs1, rs2, imm ≠ BNE rs1, rs2, imm < BLT rs1, rs2, imm ≥ BGE rs1, rs2, imm < unsigned BLTU rs1…...
第13章 Java IO流处理(一) File类
目录 内容说明 章节内容 一、 File类 内容说明 结合章节内容重点难点,会对重要知识点进行扩展,以及做示例说明等,以便更好理解重点难点 章节内容 一、 File类 1、文件与目录的描述类——File ✔️ File类并不用来进行文件的读/写操作,并未涉及到写入或读取文件内容的…...
测试面试题集锦(四)| Linux 与 Python 编程篇(附答案)
本系列文章总结归纳了一些软件测试工程师常见的面试题,主要来源于个人面试遇到的、网络搜集(完善)、工作日常讨论等,分为以下十个部分,供大家参考。如有错误的地方,欢迎指正。有更多的面试题或面试中遇到的…...
pytorch中的矩阵乘法
1. 运算符介绍 关于运算,*运算,torch.mul(), torch.mm(), torch.mv(), tensor.t() 和 *代表矩阵的两种相乘方式: 表示常规的数学上定义的矩阵相乘; *表示两个矩阵对应位置处的两个元素相乘。 1.1 矩阵点乘 *和torch.mul()等同…...
Java--Stream流详解
Stream是Java 8 API添加的一个新的抽象,称为流Stream,以一种声明性方式处理数据集合(侧重对于源数据计算能力的封装,并且支持序列与并行两种操作方式) Stream流是从支持数据处理操作的源生成的元素序列,源可…...
[PHP]ShopXO企业级B2C免费开源商城系统 v2.3.1
ShopXO 企业级B2C免费开源电商系统! 求实进取、创新专注、自主研发、国内领先企业级B2C电商系统解决方案。 遵循Apache2开源协议发布,无需授权、可商用、可二次开发、满足99%的电商运营需求。 PCH5、支付宝小程序、微信小程序、百度小程序、头条&抖音…...
Python基础入门系列详解20篇
Python基础入门(1)----Python简介 Python基础入门(2)----安装Python环境(Windows、MacOS、CentOS、Ubuntu) Python基础入门(3)----Python基础语法:解释器、标识符、关键…...
P02项目(学习)
★ P02项目 项目描述:安全操作项目旨在提高医疗设备的安全性,特别是在医生离开操作屏幕时,以减少非授权人员的误操作风险。为实现这一目标,我们采用多层次的保护措施,包括人脸识别、姿势检测以及二维码识别等技术。这些…...
pandas 笔记:get_dummies分类变量one-hot化
1 函数介绍 pandas.get_dummies 是 pandas 库中的一个函数,它用于将分类变量转换为哑变量/指示变量。所谓的哑变量,就是将分类变量的每一个不同的值转换为一个新的0/1变量。在输出的DataFrame中,每一列都以该值的名称命名 pandas.get_dummi…...
PTE作文练习(一)
目录 65分备考建议 WE模版 范文 Supporting ideas: SWT 65分备考建议 RA重在多听标准的正确的示范,RS重在抓大放小,WFD重在整理错题,以及反反复复的车轮战,FIBRW重在“以对代记” 就是直接看答案,节约时间&#…...
如何做到一套FPGA工程无缝兼容两款不同的板卡?
试想这样一种场景,有两款不同的FPGA板卡,它们的功能代码90%都是一样的,但是两个板卡的管脚分配完全不同,一般情况下,我们需要设计两个工程,两套代码,之后还需要一直维护两个版本。 那么有没有一种自动化的方式,实现一个工程,编译出一个程序文件,下载到这两个不同的板…...
VSCode修改主题为Eclipse 绿色护眼模式
前言 从参加开发以来,一直使用eclipse进行开发,基本官方出新版本,我都会更新。后来出来很多其他的IDE工具,我也尝试了,但他们的主题都把我劝退了,黑色主题是谁想出来?😂 字体小的时…...
conan和cmake编译器版本不匹配问题解决
conan和cmake编译器版本不匹配问题解决 1 问题现象2 解决方法2.1 在CMakeLists.txt禁止编译器检查2.1.1 修改方式 2.2 探查问题出现的根本原因2.2.1 安装升级gcc2.2.2 安装升级g 注 执行环境:ubuntu 1 问题现象 conan要求的编译器版本和cmake检测到的当前的编译器…...
float单精度浮点数如何在计算机中存储
文章目录 1 float型数据组成2 实际举例3 代码测试4 写在最后 1 float型数据组成 按照IEEE浮点标准存储浮点数时,一个float型的值由1个符号位(最左边的位或最高有效位)、8个指数位以及23个小数位依次组成: 符号位为0时表示正数,为1…...
机器视觉在虚拟现实与增强现实中的作用
机器视觉在虚拟现实(VR)和增强现实(AR)中发挥着至关重要的作用。这些技术的核心是计算机视觉领域,重点是让计算机具有“看到”和理解周围世界的能力。 在虚拟现实中,计算机视觉用于创建和处理用户所见的虚…...
红黑数原理及存在原因
我红黑树那么牛,你们为什么不用?_哔哩哔哩_bilibili 面试时经常会被问到红黑树,它到底有什么优点呢? 对于查找数据,数组二分查询速度最快,时间复杂度为O(logN)。但是如果增加和删除数据,数组就…...
Ansible入门—安装部署及各个模块应用案例(超详细)
目录 前言 一、环境概况 修改主机名(可选项) 二、安装部署 1.安装epel扩展源 2.安装Ansible 3.修改Ansible的hosts文件 4.生成密钥 三、Ansible模块使用介绍 Command模块 Shell模块 User模块 Copy模块 File模块 Hostname模块 Yum模块 Se…...
Spring Boot 3系列之-启动类详解
Spring Boot是一个功能强大、灵活且易于使用的框架,它极大地简化了Spring应用程序的开发和部署流程,使得开发人员能够更专注于业务逻辑的实现。在我们的Spring Boot 3系列之一(初始化项目)文章中,我们使用了Spring官方…...
OpCore Simplify:智能化系统定制的突破与实践
OpCore Simplify:智能化系统定制的突破与实践 【免费下载链接】OpCore-Simplify A tool designed to simplify the creation of OpenCore EFI 项目地址: https://gitcode.com/GitHub_Trending/op/OpCore-Simplify 为什么超过68%的开发者在首次配置OpenCore E…...
5分钟让老旧打印机变身AirPrint无线打印神器:cups-avahi-airprint完全指南
5分钟让老旧打印机变身AirPrint无线打印神器:cups-avahi-airprint完全指南 【免费下载链接】cups-avahi-airprint Docker image for CUPS intended as an AirPrint relay 项目地址: https://gitcode.com/gh_mirrors/cu/cups-avahi-airprint 在苹果生态日益普…...
WechatBakTool终极指南:如何安全备份与恢复微信聊天记录
WechatBakTool终极指南:如何安全备份与恢复微信聊天记录 【免费下载链接】WechatBakTool 基于C#的微信PC版聊天记录备份工具,提供图形界面,解密微信数据库并导出聊天记录。 项目地址: https://gitcode.com/gh_mirrors/we/WechatBakTool …...
企业级微软产品激活管理:KMS_VL_ALL_AIO的技术实践与战略价值
企业级微软产品激活管理:KMS_VL_ALL_AIO的技术实践与战略价值 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO 1. 企业激活困境与破局思路 核心价值:揭示企业在软件激活管…...
Cocos笔记
Cocos笔记 好用的Api 碰撞体范围检测 包围盒范围检测(性能对比碰撞检测稍好) 多边形碰撞体和矩形相交检测 设置父节点并同步位置 外部增加并调用回调函数 网址 其他 代码混淆工具 引用加载过久修改tsconfig.json脚本增加以下代码 类型写法(举例) 刮刮乐脚本 修改后的挖图(…...
终极网盘直链解析解决方案:一站式解锁八大平台高速下载通道
终极网盘直链解析解决方案:一站式解锁八大平台高速下载通道 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 ,支持 百度网盘 / 阿里云盘 / 中国移动云盘 /…...
抖音批量下载工具终极指南:免费无水印下载视频、图文、合集和直播
抖音批量下载工具终极指南:免费无水印下载视频、图文、合集和直播 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fal…...
QWEN-AUDIO功能全解析:声波可视化、情感指令、四种人声,到底怎么用?
QWEN-AUDIO功能全解析:声波可视化、情感指令、四种人声,到底怎么用? 1. 认识QWEN-AUDIO语音合成系统 QWEN-AUDIO是一款基于Qwen3-Audio架构构建的智能语音合成系统,它能够将文字转换成带有情感和温度的自然语音。这个系统最特别…...
FastAPI GraphQL联合服务发现:构建现代化微服务架构的完整指南
FastAPI GraphQL联合服务发现:构建现代化微服务架构的完整指南 【免费下载链接】fastapi FastAPI framework, high performance, easy to learn, fast to code, ready for production 项目地址: https://gitcode.com/GitHub_Trending/fa/fastapi 在当今云原生…...
LoRA训练助手效果展示:GPT模型微调前后对比
LoRA训练助手效果展示:GPT模型微调前后对比 1. 引言 你是否曾经遇到过这样的情况:用GPT模型生成的内容总是差那么点意思,要么风格不对,要么专业度不够,要么就是不符合你的特定需求?就像让一个通才来处理专…...
