当前位置: 首页 > news >正文

五分钟,Docker安装flink,并使用flinksql消费kafka数据

1、拉取flink镜像,创建网络

docker pull flink
docker network create flink-network

2、创建 jobmanager

# 创建 JobManager docker run \-itd \--name=jobmanager \--publish 8081:8081 \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest jobmanager 

3、创建 taskmanager

# 创建 TaskManager docker run \-itd \--name=taskmanager \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest taskmanager 

4、访问 http://localhost:8081/

在这里插入图片描述

4.1 修改Task Slots

默认的Slots num是1,我们可以修改为5:
修改的目录是jobmanager和taskmanager的/opt/flink/confflink-conf.yaml文件:
在这里插入图片描述
在这里插入图片描述
修改taskmanager.numberOfTaskSlots:即可。
注意:默认的docker容器中没有vi/vim命令,可以使用docker cp命令,复制出来修改,然后在复制回去,如下:

docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
docker cp flink-conf.yaml taskmanager:/opt/flink/conf/

在这里插入图片描述

5、通过flinksql消费Kafka

确保有一个可用的kafka,如果没有,可以五分钟内,Docker搭建一个
Docker安装kafka 3.5
并且通过python,简单写一个生产者
Python生产、消费Kafka

5.1 导入flink-sql-connector-kafka jar包

顾名思义,用于连接flinksql和kafka。
进入flink

docker exec -it jobmanager /bin/bash

进入 flink的bin目录

cd /opt/flink/bin

查看flink版本:

flink --version

可以看出,我的版本是1.18.0
在这里插入图片描述
根据自己的flink版本,下载对应的 flink-sql-connector-kafka jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
因为我是1.18.0,所以选择下图的版本包:
在这里插入图片描述
点进去进行下载:
在这里插入图片描述

将下载的jar包,分别在jobmanager,taskmanager /opt/flink/lib目录下,注意,是两个都要放,如下图:
在这里插入图片描述
可以使用docker cp test.txt jobmanager:/opt/flink/lib命令,用户宿主机和docker容器文件传输。把test.txt换成对应的jar包即可

5.2 flinksql消费kafka

进入jobmanager中,执行

cd /opt/flink/bin
sql-client.sh

在这里插入图片描述
在这里插入图片描述
Flink SQL执行以下语句:

CREATE TABLE KafkaTable (`count_num` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'kafka_demo','properties.bootstrap.servers' = '192.168.10.15:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
);show tables;
select * from KafkaTable;

可以看到Flink在消费kafka数据,如下图:
在这里插入图片描述

相关文章:

五分钟,Docker安装flink,并使用flinksql消费kafka数据

1、拉取flink镜像,创建网络 docker pull flink docker network create flink-network2、创建 jobmanager # 创建 JobManager docker run \-itd \--namejobmanager \--publish 8081:8081 \--network flink-network \--env FLINK_PROPERTIES"jobmanager.rpc.ad…...

【小聆送书第一期】让架构师的成神之路温暖你这个不景气的冬天

🌈个人主页:聆风吟 🔥系列专栏:网络奇遇记、数据结构 🔖少年有梦不应止于心动,更要付诸行动。 文章目录 📋前言 书籍一览 ⛳️书籍一⛳️书籍二⛳️书籍三⛳️书籍四⛳️书籍五⛳️书籍六⛳️书…...

网页爬虫反扒措施有哪些?

爬虫之常见的反扒 cookies 一般用requests直接请求网址的时候有时候可能会遇到反扒措施,这时候可以考虑一下加上user-agent伪装成浏览器;也可能有登录限制,这时候cookies就有用处了 浏览器中的cookie是保存我们的账号数据和访问记录&#…...

C#实现批量生成二维码

相信大家都使用过草料二维码生成器&#xff0c;单独生成二维码可以&#xff0c;但是批量生成二维码就需要收费了。既然要收费&#xff0c;那就自己写一个。 接口采用导入Excel文件生成二维码&#xff0c;首先需要读取Excel的数据&#xff0c;方法如下所示&#xff1a; /// <…...

3种在ArcGIS Pro中制作山体阴影的方法

山体阴影可以更直观的展现地貌特点&#xff0c;表达真实的地形&#xff0c;这里为大家介绍一下在ArcGIS Pro中制作山体阴影的方法&#xff0c;希望能对你有所帮助。 数据来源 本教程所使用的数据是从水经微图中下载的DEM数据&#xff0c;除了DEM数据&#xff0c;常见的GIS数据…...

【ChatGLM2-6B】Docker下部署及微调

【ChatGLM2-6B】小白入门及Docker下部署 一、简介1、ChatGLM2是什么2、组成部分3、相关地址 二、基于Docker安装部署1、前提2、CentOS7安装NVIDIA显卡驱动1&#xff09;查看服务器版本及显卡信息2&#xff09;相关依赖安装3&#xff09;显卡驱动安装 2、 CentOS7安装NVIDIA-Doc…...

输入两个整数,输出它们的乘积。 ← Python 及 C++ 代码比较

【题目描述】 输入两个整数&#xff0c;输出它们的乘积。【Python代码】 x,ymap(int,input().split()) print(x*y) 【C代码】 #include<bits/stdc.h> using namespace std;int x,y; int main() {cin>>x>>y;cout<<x*y<<endl;return 0; }/* in:…...

C语言——从键盘输人一个表示年份的整数,判断该年份是否为闰年,并显示判断结果。

#define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> int main() {int year 0;printf("请输入年份&#xff1a;");scanf("%d",&year);if((year%4 0) && (year%100!0) || (year%400 0)){printf("%d是闰年\n",year);}else{p…...

出于隐私和安全的考虑,有时需要从谷歌删除你的个人数据,有两种方法

如果你是公众人物、企业或拥有个人品牌的人&#xff0c;那么拥有在线形象很重要。然而&#xff0c;你可能会发现&#xff0c;通过谷歌搜索&#xff0c;陌生人可以获得你的个人信息&#xff0c;如联系方式、地址和财务信息&#xff0c;这会让你感到不安。 幸运的是&#xff0c;…...

【同一局域网下】两台电脑之间互ping

两台电脑互ping 首先需要连接同一网咯关闭需要ping的电脑的防火墙 关闭防火墙步骤&#xff08;以win11系统为例&#xff09;&#xff1a; 设置 --> 隐私和安全性 --> Windows 安全中心 打开Windows安全中心 防火墙和网络保护 --> 选择正在使用的网络 关闭 ping其他…...

【精选】Ajax技术知识点合集

Ajax技术详解 Ajax简介 Ajax 即“Asynchronous Javascript And XML”&#xff08;异步 JavaScript 和 XML&#xff09;&#xff0c;是指一种创建 交互式、快速动态应用的网页开发技术&#xff0c;无需重新加载整个网页的情况下&#xff0c;能够更新页面局 部数据的技术。通过在…...

智能优化算法应用:基于水循环算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于水循环算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于水循环算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.水循环算法4.实验参数设定5.算法结果6.参考文献7.…...

java-netty知识点笔记和注意事项

如何获取ctx的id 使用ctx.ctx.toString()就可以了 public void channelRead(ChannelHandlerContext ctx, Object msg) {//传来的消息包装成字节缓冲区String byteBuf (String) msg; // ByteBuf byteBuf (ByteBuf) msg;//Netty提供了字节缓冲区的toString方法&#xff…...

英伟达不同系列GPU介绍

英伟达有以下几个系列的产品线&#xff0c;并介绍它们的特点和主要应用领域&#xff1a; 1. GeForce系列&#xff08;G系列&#xff09;&#xff1a; - 特点&#xff1a;GeForce系列是英伟达主打的消费级GPU产品线&#xff0c;注重提供高性能的图形处理能力和游戏特性。它们…...

C语言——I /深入理解指针(二)

一、数组名的理解 int arr[10] {1,2,3,4,5,6,7,8,9,10}; int *p &arr[0];这⾥我们使⽤ &arr[0] 的⽅式拿到了数组第⼀个元素的地址&#xff0c;但是其实数组名本来就是地址&#xff0c;⽽且 是数组⾸元素的地址&#xff0c;我们来做个测试。 #include <stdio.…...

MySQL使用函数和存储过程实现:向数据表快速插入大量测试数据

实现过程 1.创建表 CREATE TABLE user_info (id INT(11) NOT NULL AUTO_INCREMENT,name VARCHAR(20) DEFAULT NULL,age INT(3) DEFAULT NULL,pwd VARCHAR(20) DEFAULT NULL,phone_number VARCHAR(11) DEFAULT NULL,email VARCHAR(255) DEFAULT NULL,address VARCHAR(255) DEF…...

力扣labuladong——一刷day59

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、力扣549. 二叉树中最长的连续序列二、力扣1325. 删除给定值的叶子节点 前言 像求和、求高度这种基本的二叉树函数很容易写&#xff0c;有时候只要在它们的后…...

接口性能测试 —— Jmeter并发与持续性压测

接口压测的方式&#xff1a; 1、同时并发&#xff1a;设置线程组、执行时间、循环次数&#xff0c;这种方式可以控制接口请求的次数 2、持续压测&#xff1a;设置线程组、循环次数&#xff0c;勾选“永远”&#xff0c;调度器&#xff08;持续时间&#xff09;&#xff0c;这种…...

redis报错3

INFO: Initializing SpringDispatcherServletdispatcherServlet...

Proteus的网络标号与总线

Proteus为了减少过多、复杂的连线&#xff0c;可以使用网络标号与总线配合使用。 Proteus的导线上添加了网络标号&#xff0c;意味着在Proteus上相同的网络标号是连在一起的&#xff0c;所说在图纸上看不出来。 如下图是比较好的Proteus中使用总线的绘制的图纸。可以效仿着画…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖

在前面的练习中&#xff0c;每个页面需要使用ref&#xff0c;onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入&#xff0c;需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

LeetCode - 199. 二叉树的右视图

题目 199. 二叉树的右视图 - 力扣&#xff08;LeetCode&#xff09; 思路 右视图是指从树的右侧看&#xff0c;对于每一层&#xff0c;只能看到该层最右边的节点。实现思路是&#xff1a; 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

Mysql中select查询语句的执行过程

目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析&#xff08;Parser&#xff09; 2.4、执行sql 1. 预处理&#xff08;Preprocessor&#xff09; 2. 查询优化器&#xff08;Optimizer&#xff09; 3. 执行器…...

IP如何挑?2025年海外专线IP如何购买?

你花了时间和预算买了IP&#xff0c;结果IP质量不佳&#xff0c;项目效率低下不说&#xff0c;还可能带来莫名的网络问题&#xff0c;是不是太闹心了&#xff1f;尤其是在面对海外专线IP时&#xff0c;到底怎么才能买到适合自己的呢&#xff1f;所以&#xff0c;挑IP绝对是个技…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...