Flink Kafka获取数据写入到MongoDB中 样例
简述
Apache Flink 是一个流处理和批处理的开源框架,它允许从各种数据源(如 Kafka)读取数据,处理数据,然后将数据写入到不同的目标系统(如 MongoDB)。以下是一个简化的流程,描述如何使用 Flink 从 Kafka 读取数据并保存到 MongoDB:
1、环境准备
- 安装并配置 Apache Flink。
- 安装并配置 Apache Kafka。
- 安装并配置 MongoDB。
- 创建一个 Kafka 主题,并发送一些测试数据。
- 确保 Flink 可以连接到 Kafka 和 MongoDB。
部署参考:
1、flink:Flink 部署执行模式
2、kafka:Flink mongo & Kafka
3、mongoDb:mongo副本集本地部署
2. 添加依赖
在Flink 项目中,需要添加 Kafka 和 MongoDB 的连接器依赖。对于 Maven 项目,可以在 pom.xml 文件中添加相应的依赖。
对于 Kafka,需要添加 Flink Kafka Connector 的依赖。
对于 MongoDB,需要添加 Flink MongoDB Sink 的依赖。
3. 编写 Flink 作业
* 创建一个 Flink 作业,使用 Flink 的 `FlinkKafkaConsumer` 从 Kafka 主题中读取数据。
* 对读取的数据进行必要的转换或处理。
* 使用 MongoDB 的 Java 驱动程序或第三方库将处理后的数据写入 MongoDB。
4. 运行 Flink 作业
使用 Flink 的命令行工具或 IDE 运行 Flink 作业。确保 Kafka 和 MongoDB 正在运行,并且 Flink 可以访问它们。
参考:Flink 命令行提交、展示和取消作业
5. 监控和调试
使用 Flink 的 Web UI 或其他监控工具来监控作业。如果出现问题,检查日志并进行调试。
6. 优化和扩展
根据需求和数据量,优化 Flink 作业的性能和可扩展性。这可能包括调整并行度、增加资源、优化数据处理逻辑等。
代码
package com.wfg.flink.connector.kafka;import com.mongodb.client.model.InsertOneModel;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.bson.BsonDocument;import static com.wfg.flink.connector.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.connector.constants.Constants.TEST_TOPIC_PV;/*** @author wfg*/
public class KafkaToWriteMongo {public static void main(String[] args) throws Exception {// 1. 设置 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(KAFKA_BROKERS).setTopics(TEST_TOPIC_PV).setGroupId("my-test-topic-pv").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> rs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 创建RollingFileSinkMongoSink<String> sink = MongoSink.<String>builder().setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin").setDatabase("sjzz").setCollection("TestMongoPv").setMaxRetries(3)
// .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setSerializationSchema((input, context) -> {System.out.println(input);return new InsertOneModel<>(BsonDocument.parse(input));}).build();rs.sinkTo(sink);// 6. 执行 Flink 作业env.execute("Kafka Flink Job");}
}相关文章:
Flink Kafka获取数据写入到MongoDB中 样例
简述 Apache Flink 是一个流处理和批处理的开源框架,它允许从各种数据源(如 Kafka)读取数据,处理数据,然后将数据写入到不同的目标系统(如 MongoDB)。以下是一个简化的流程,描述如何…...
Android Jetpack Compose入门教程(二)
一、列表和动画 列表和动画在应用内随处可见。在本课中,您将学习如何利用 Compose 轻松创建列表并添加有趣的动画效果。 1、创建消息列表 只包含一条消息的聊天略显孤单,因此我们将更改对话,使其包含多条消息。您需要创建一个可显示多条消…...
如何避免接口重复请求(axios推荐使用AbortController)
前言: 我们日常开发中,经常会遇到点击一个按钮或者进行搜索时,请求接口的需求。 如果我们不做优化,连续点击按钮或者进行搜索,接口会重复请求。 以axios为例,我们一般以以下几种方法为主: 1…...
算法设计与分析:网络流求解棒球赛淘汰问题C++
目录 一、实验目的 二、问题描述 三、实验要求 四、算法思想 1、明显的:win[i]+remain[i][j]<> 2、不明显的:最大流 3、操作 3.1 先读入相关信息(邻接矩阵**k),进行一遍“明显的”判断。 3.2 对剩下的“不明显的”的每个球队构建流网络(邻接表vector< ve…...
Linux Ubuntu 24.04 C语言gcc编译过程详解
下面是Hello World程序源代码文件hello.c的内容,我们将以它为例来说明源文件到可执行文件的形成过程,主要分4步:预处理、汇编、机器码、链接。 #include <stdio.h> int main () {printf ( "hello, world \n " );return 0; }…...
Python自动化办公篇—pandas操作Excel:读取+查看+选择+清洗+排序+筛选+函数+写入
目录 专栏导读库的介绍库的安装1、读取数据2、查看数据3、选择数据4、数据清洗5、数据排序6、数据筛选7、数据操作8、数据写入总结 专栏导读 文章名称链接Python自动化办公—pyautogui图像定位\点击功能,实现自动截取当前屏幕并检索点击(可制作为游戏点击脚本)点我进行跳转Pyt…...
数据库大作业——音乐平台数据库管理系统
W...Y的主页😊 代码仓库分享💕 《数据库系统》课程设计 :流行音乐管理平台数据库系统(本数据库大作业使用软件sql server、dreamweaver、power designer) 目录 系统需求设计 数据库概念结构设计 实体分析 属性分…...
【DBA早下班系列】—— 并行SQL/慢SQL 问题该如何高效收集诊断信息
1. 前言 OceanBase论坛问答区或者提交工单支持的时候大部分时间都浪费在了诊断信息的获取交互上,今天我就其中大家比较头疼的SQL问题,给大家讲解一下如何一键收集并行SQL/慢SQL所需要的诊断信息,减少沟通成本,让大家早下班。 2. …...
用python实现多文件多文本替换功能
用python实现多文件多文本替换功能 今天修改单位项目代码时由于改变了一个数据结构名称,结果有几十个文件都要修改,一个个改实在太麻烦,又没有搜到比较靠谱的工具软件,于是干脆用python手撸了一个小工具,发现python在…...
【DevOps】深入探索Ubuntu操作系统:全面了解
引言 在开源软件的世界里,Ubuntu是一个闪耀的明星。它不仅是一个操作系统,更是一种社区精神、一种共享和协作的文化。Ubuntu操作系统基于强大的Linux内核,由世界各地的开发者共同维护和改进。在这篇博文中,我们将深入探索Ubuntu操…...
【Linux】—MySQL安装
文章目录 前言一、下载官方MySQL包二、下载完成后,通过xftp6上传到Linux服务器上三、解压MySQL安装包四、在安装目录下执行rpm安装,请按顺序依次执行。五、配置MySQL六、启动MySQL数据库七、退出,重新登录数据库 前言 本文主要介绍在Linux环境…...
【vue】form表单提交validate验证不进valid原因
目录 1. 原因 1. 原因 1.<el-form>是否写了ref“form”。2.是否有其它标签写了ref“form”。3.<el-form>中要写成:model,不能使用v-model。4.自定义的validate要各个路径均能返回callback()。 const validatePass (rule, value, callback) > {if (…...
如何用 Google Chrome 浏览器浏览经过 XSLT 渲染的 XML 文件
对于经过XSLT渲染的XML文件,本来,可以直接用 IE (Internet Explorer) 打开,就能看到渲染之后的样子,很方便。但是后来,微软把 IE 换成了 Microsoft Edge,按理说这是比 IE 更先进的浏览器,可是偏…...
Python学习笔记12:进阶篇(二),类的继承与组合
类的继承 我们在编写一系列的类的时候,会发现这些类很相似,但是又有各自的特点和行为。在编写这些类的时候,我们可以把相同的部分抽象成一个基类,然后根据其他不同的特点和行为,抽象出子类,继承这个基类。…...
npm install cnpm -g 报错4048
npm install cnpm -g 报错4048 设置淘宝镜像: 报错如下: 其他博主提供的方法都尝试了,比如管理员权限打开终端,删除.npmrc文件,清除缓存npm cache clean -f等都试了无效,最后怀疑是npm和cnpm版本不对应&…...
本地快速部署 SuperSonic
本地快速部署 SuperSonic 0. 引言1. 本地快速部署 supersonic2. 访问 supersonic3. 支持的数据库4. github 地址 0. 引言 SuperSonic融合Chat BI(powered by LLM)和Headless BI(powered by 语义层)打造新一代的BI平台。这种融合确…...
如何给vue开发的网站做seo?
最近公司有个需求,需要给公司的官网sqlynx做seo,但因为各种历史原因吧,原来的网站是用vue开发的。没办法,只能尝试尽量做一些seo,让网站能更好一些。 目录 1. 服务器端渲染(SSR) 2. 预渲染&am…...
算法训练营第六十天(延长12天添加图论) | LeetCode 647 回文子串、LeetCode 516 最长回文子序列
LeetCode 67 回文子串 思路很简单,每一个dp[i]等于dp[i-1]加上当前字符向前直到0各个长度字符串回文串个数即可 代码如下: class Solution {public boolean isValid(String s) {int l 0, r s.length() - 1;while (l < r) {if (s.charAt(l) ! s.ch…...
TikTok账号养号的流程分享
对于很多刚开始运营TikTok的新手小白来说,都会有一个同样的疑问,那就是:TikTok到底需不需要养号?这里明确告诉大家是需要养号的,今天就把我自己实操过的养号经验和策略总结出来,分享给大家。 一、什么是Ti…...
C++初学者指南第一步---6.枚举和枚举类
C初学者指南第一步—6.枚举和枚举类 文章目录 C初学者指南第一步---6.枚举和枚举类1.作用域的枚举(enum class类型)(C11)2.无作用域的枚举(enum类型)3.枚举类的基础类型4.自定义枚举类映射5.和基础类型的互相转换 1.作用域的枚举(enum class类…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...
.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
C# 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
