Kafka与Flink的整合 -- sink、source
1、首先导入依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency>
2、 source:Flink从Kafka中读取数据
public class Demo01KafkaSource {public static void main(String[] args) throws Exception{//构建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//构建kafka source 环境KafkaSource<String> source = KafkaSource.<String>builder()//指定broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定topic.setTopics("bigdata")//指定消费组.setGroupId("my-group")//指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据.setStartingOffsets(OffsetsInitializer.earliest())//读取数据格式:.setValueOnlyDeserializer(new SimpleStringSchema()).build();//使用kafka数据源DataStreamSource<String> kafkaSourceDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafkaSourceDS.print();//启动flinkenv.execute();}
}
启动生产kafka:
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
3、sink:Flink向Kafka中写入数据
public class Demo02KafkaSink {public static void main(String[] args) throws Exception{//构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//读取数据文件:DataStreamSource<String> studentDS = env.readTextFile("flink/data/students.txt");//创建kafka sinkKafkaSink<String> sink = KafkaSink.<String>builder()//指定flink broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定数据的格式:.setRecordSerializer(KafkaRecordSerializationSchema.builder()//指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic.setTopic("student")//指定数据的格式.setValueSerializationSchema(new SimpleStringSchema()).build())//指定数据处理的语义:.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//执行flinkstudentDS.sinkTo(sink);//构建flink环境env.execute();}
}
启动消费kafka:
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic student
相关文章:
Kafka与Flink的整合 -- sink、source
1、首先导入依赖: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency> 2、 source:Flink从Kafka中读取数据 p…...
小鱼ROS
git clone git clone https://ghproxy.com/https://github.com/stilleshan/ServerStatus git clone 私有仓库 Clone 私有仓库需要用户在 Personal access tokens 申请 Token 配合使用.git clone https://user:your_tokenghproxy.com/https://github.com/your_name/your_priv…...

简单讲讲RISC-V跳转指令基于具体场景的实现
背景 在 RISC-V指令集中,一共有 6 条有条件跳转指令,分别是 beq、bne、blt、bltu、bge、bgeu。如下是它们的定义与接口 BEQ rs1, rs2, imm ≠ BNE rs1, rs2, imm < BLT rs1, rs2, imm ≥ BGE rs1, rs2, imm < unsigned BLTU rs1…...
第13章 Java IO流处理(一) File类
目录 内容说明 章节内容 一、 File类 内容说明 结合章节内容重点难点,会对重要知识点进行扩展,以及做示例说明等,以便更好理解重点难点 章节内容 一、 File类 1、文件与目录的描述类——File ✔️ File类并不用来进行文件的读/写操作,并未涉及到写入或读取文件内容的…...

测试面试题集锦(四)| Linux 与 Python 编程篇(附答案)
本系列文章总结归纳了一些软件测试工程师常见的面试题,主要来源于个人面试遇到的、网络搜集(完善)、工作日常讨论等,分为以下十个部分,供大家参考。如有错误的地方,欢迎指正。有更多的面试题或面试中遇到的…...
pytorch中的矩阵乘法
1. 运算符介绍 关于运算,*运算,torch.mul(), torch.mm(), torch.mv(), tensor.t() 和 *代表矩阵的两种相乘方式: 表示常规的数学上定义的矩阵相乘; *表示两个矩阵对应位置处的两个元素相乘。 1.1 矩阵点乘 *和torch.mul()等同…...

Java--Stream流详解
Stream是Java 8 API添加的一个新的抽象,称为流Stream,以一种声明性方式处理数据集合(侧重对于源数据计算能力的封装,并且支持序列与并行两种操作方式) Stream流是从支持数据处理操作的源生成的元素序列,源可…...

[PHP]ShopXO企业级B2C免费开源商城系统 v2.3.1
ShopXO 企业级B2C免费开源电商系统! 求实进取、创新专注、自主研发、国内领先企业级B2C电商系统解决方案。 遵循Apache2开源协议发布,无需授权、可商用、可二次开发、满足99%的电商运营需求。 PCH5、支付宝小程序、微信小程序、百度小程序、头条&抖音…...
Python基础入门系列详解20篇
Python基础入门(1)----Python简介 Python基础入门(2)----安装Python环境(Windows、MacOS、CentOS、Ubuntu) Python基础入门(3)----Python基础语法:解释器、标识符、关键…...

P02项目(学习)
★ P02项目 项目描述:安全操作项目旨在提高医疗设备的安全性,特别是在医生离开操作屏幕时,以减少非授权人员的误操作风险。为实现这一目标,我们采用多层次的保护措施,包括人脸识别、姿势检测以及二维码识别等技术。这些…...
pandas 笔记:get_dummies分类变量one-hot化
1 函数介绍 pandas.get_dummies 是 pandas 库中的一个函数,它用于将分类变量转换为哑变量/指示变量。所谓的哑变量,就是将分类变量的每一个不同的值转换为一个新的0/1变量。在输出的DataFrame中,每一列都以该值的名称命名 pandas.get_dummi…...

PTE作文练习(一)
目录 65分备考建议 WE模版 范文 Supporting ideas: SWT 65分备考建议 RA重在多听标准的正确的示范,RS重在抓大放小,WFD重在整理错题,以及反反复复的车轮战,FIBRW重在“以对代记” 就是直接看答案,节约时间&#…...
如何做到一套FPGA工程无缝兼容两款不同的板卡?
试想这样一种场景,有两款不同的FPGA板卡,它们的功能代码90%都是一样的,但是两个板卡的管脚分配完全不同,一般情况下,我们需要设计两个工程,两套代码,之后还需要一直维护两个版本。 那么有没有一种自动化的方式,实现一个工程,编译出一个程序文件,下载到这两个不同的板…...

VSCode修改主题为Eclipse 绿色护眼模式
前言 从参加开发以来,一直使用eclipse进行开发,基本官方出新版本,我都会更新。后来出来很多其他的IDE工具,我也尝试了,但他们的主题都把我劝退了,黑色主题是谁想出来?😂 字体小的时…...
conan和cmake编译器版本不匹配问题解决
conan和cmake编译器版本不匹配问题解决 1 问题现象2 解决方法2.1 在CMakeLists.txt禁止编译器检查2.1.1 修改方式 2.2 探查问题出现的根本原因2.2.1 安装升级gcc2.2.2 安装升级g 注 执行环境:ubuntu 1 问题现象 conan要求的编译器版本和cmake检测到的当前的编译器…...
float单精度浮点数如何在计算机中存储
文章目录 1 float型数据组成2 实际举例3 代码测试4 写在最后 1 float型数据组成 按照IEEE浮点标准存储浮点数时,一个float型的值由1个符号位(最左边的位或最高有效位)、8个指数位以及23个小数位依次组成: 符号位为0时表示正数,为1…...
机器视觉在虚拟现实与增强现实中的作用
机器视觉在虚拟现实(VR)和增强现实(AR)中发挥着至关重要的作用。这些技术的核心是计算机视觉领域,重点是让计算机具有“看到”和理解周围世界的能力。 在虚拟现实中,计算机视觉用于创建和处理用户所见的虚…...

红黑数原理及存在原因
我红黑树那么牛,你们为什么不用?_哔哩哔哩_bilibili 面试时经常会被问到红黑树,它到底有什么优点呢? 对于查找数据,数组二分查询速度最快,时间复杂度为O(logN)。但是如果增加和删除数据,数组就…...

Ansible入门—安装部署及各个模块应用案例(超详细)
目录 前言 一、环境概况 修改主机名(可选项) 二、安装部署 1.安装epel扩展源 2.安装Ansible 3.修改Ansible的hosts文件 4.生成密钥 三、Ansible模块使用介绍 Command模块 Shell模块 User模块 Copy模块 File模块 Hostname模块 Yum模块 Se…...

Spring Boot 3系列之-启动类详解
Spring Boot是一个功能强大、灵活且易于使用的框架,它极大地简化了Spring应用程序的开发和部署流程,使得开发人员能够更专注于业务逻辑的实现。在我们的Spring Boot 3系列之一(初始化项目)文章中,我们使用了Spring官方…...
conda相比python好处
Conda 作为 Python 的环境和包管理工具,相比原生 Python 生态(如 pip 虚拟环境)有许多独特优势,尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处: 一、一站式环境管理:…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...

多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...

以光量子为例,详解量子获取方式
光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学(silicon photonics)的光波导(optical waveguide)芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中,光既是波又是粒子。光子本…...
比较数据迁移后MySQL数据库和OceanBase数据仓库中的表
设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...

wpf在image控件上快速显示内存图像
wpf在image控件上快速显示内存图像https://www.cnblogs.com/haodafeng/p/10431387.html 如果你在寻找能够快速在image控件刷新大图像(比如分辨率3000*3000的图像)的办法,尤其是想把内存中的裸数据(只有图像的数据,不包…...
tomcat指定使用的jdk版本
说明 有时候需要对tomcat配置指定的jdk版本号,此时,我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...