当前位置: 首页 > news >正文

Flink问题解决及性能调优-【Flink不同并行度引起sink2es报错问题】

最近需求,仅想提高sink2es的qps,所以仅调节了sink2es的并行度,但在调节不同算子并行度时遇到一些问题,找出问题的根本原因解决问题,并分析整理。

实例代码

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 msCREATE TABLE kafka_table (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])--event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)--WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't1','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','properties.group.id' = 'g1','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset--  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交'format' = 'json'
);CREATE TABLE es_sink(send_type      STRING,account_id     STRING,publish_time   STRING,grouping_id       INTEGER,init           INTEGER,init_cancel    INTEGER,push          INTEGER,succ           INTEGER,fail           INTEGER,init_delete    INTEGER,update_time    STRING,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with ('connector' = 'elasticsearch-6','index' = 'es_sink','document-type' = 'es_sink','hosts' = 'http://xxx:9200','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb'
);CREATE view  tmp as
selectsend_type,account_id,publish_time,msg_status,case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,event_time,opt,ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6CREATE view  tmp_groupby as
selectCOALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1when send_type is not null and account_id is null and publish_time is null then 2when send_type is not null and account_id is not null and publish_time is null then 3when send_type is not null and account_id is not null and publish_time is not null then 4end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上INSERT INTO es_sink
selectsend_type,account_id,publish_time,grouping_id,init,init_cancel,push,succ,fail,init_delete,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

发现问题

由于groupby或join聚合等算子操作的并行度与sink2es算子操作的并行度不同,上游算子同一个key的数据可能会下发到下游多个不同算子中。
导致sink2es出现多个subtask同时操作同一个key(这里key作为主键id),报错如下:

...Caused by: [test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)... 1 more[CIRCULAR REFERENCE:[test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]]

问题原因

Flink中存在八种分区策略,常用Operator Chain链接方式有三种分区器:

  • forward:上下游并行度相同,且不发生shuffle,直连的分区器
  • hash:将数据按照key的Hash值下发到下游的算子中
  • rebalance:数据会被循环或者随机下发到下游算子中,改变并行度若无keyby,默认使用RebalancePartitioner分区策略

rebalance分区器,可能会将上游算子的同一个key随机下发到下游不同算子中,因而引起报错,如下图:
在这里插入图片描述在这里插入图片描述模型如下:

在这里插入图片描述

解决方案

  • 分组聚合算子与sink2es算子配置成相同的并行度,即使用forward分区器,如下图:
    在这里插入图片描述在这里插入图片描述
    模型如下:
    ![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/215b7ba208b142da803a78d85b0f474e.png

  • sink2es算子的并行度配置为1,如下图:

operator chain为forward分区器模型如下:
在这里插入图片描述

总结

归根结底就是需要保证:上游subtask中同一个key只能下发到下游一个subtask中

相关文章:

Flink问题解决及性能调优-【Flink不同并行度引起sink2es报错问题】

最近需求&#xff0c;仅想提高sink2es的qps&#xff0c;所以仅调节了sink2es的并行度&#xff0c;但在调节不同算子并行度时遇到一些问题&#xff0c;找出问题的根本原因解决问题&#xff0c;并分析整理。 实例代码 --SET table.exec.state.ttl86400s; --24 hour,默认: 0 ms …...

瑞_数据结构与算法_二叉搜索树

文章目录 1 什么是二叉搜索树1.1 二叉搜索树的特征1.2 前驱后继 2 二叉搜索树的Java实现2.1 定义二叉搜索树节点类BSTNode泛型key改进 2.2 实现查找方法get(int key)递归实现非递归实现 ★非递归实现 泛型key版本 2.3 实现查找最小方法min()递归实现非递归实现 ★ 2.4 实现查找…...

Linux 命令行访问名字中包含空格的文件或文件夹

Linux 命令行访问名字中包含空格的文件或文件夹 References 在 Windows 下命名文件或文件夹名有空格是可以的&#xff0c;甚至在 Windows 和 Ubuntu 虚拟机共享的文件中也可以这么做&#xff0c;但是在 Ubuntu 中空格要用下划线代替&#xff0c;养成好习惯。Linux 会把空格当成…...

Dart/Flutter工具模块:the_utils

Flutter笔记 Dart/Flutter工具模块&#xff1a;the_utils 作者&#xff1a;李俊才 &#xff08;jcLee95&#xff09;&#xff1a;https://blog.csdn.net/qq_28550263 邮箱 &#xff1a;291148484163.com 本文地址&#xff1a;https://blog.csdn.net/qq_28550263/article/detail…...

矩阵号:日入100+,八大提示词(Prompt)使用技巧

最近在搞头条矩阵&#xff0c;发现自己的指令写的太烂了&#xff0c;一个指令将会决定你的写作质量。 收益比较拉垮&#xff0c;50个号收益好的&#xff0c;也就这么几个号。 于是我扒了一些提示词的操作技巧&#xff0c;分享一下自己的学习心得。 先说理论知识&#xff0c;实…...

爬虫工作量由小到大的思维转变---<第三十九章 Scrapy-redis 常用的那个RetryMiddleware>

前言: 为什么要讲这个RetryMiddleware呢?因为他很重要~ 至少在你装配代理ip或者一切关于重试的时候需要用到!----最关键的是:大部分的教学视频里面,没有提及这个!!!! 正文: 源代码分析 这个RetryMiddleware是来自: from scrapy.downloadermiddlewares.retry import Retry…...

【MongoDB】mongodb安装及启动踩坑点

mongodb的安装&#xff0c;基本上参考文章[1]。 但是在过程中&#xff0c;有一些踩坑点。 1&#xff0c;高版本mongodb不自带mongo脚本 在文章1中&#xff0c;作者在解压后&#xff0c;直接使用了mongo脚本&#xff0c;而我下载的mongodb版本要更高&#xff0c;在解压后&…...

动态规划——采矿的小奇【集训笔记】

题目描述 假期小奇去采矿场体验生活&#xff0c;工头为每个员工发放了一个最多能装 M 公斤的背包&#xff0c;经过一天的辛苦小奇开采出了 n 块矿石&#xff0c;它们的重量分别是W1&#xff0c;W2&#xff0c;...,Wn,经过预估它们的价值分别为C1,C2,...,Cn&#xff0c;那么请你…...

wpf控件Expander集合下的像素滚动

项目场景&#xff1a;Expander集合滚动 如下图&#xff0c;有一个Expander集合&#xff0c;且设置 ScrollViewer.VerticalScrollBarVisibility "Auto" 每个Expaner下包含有若干元素&#xff0c;当打开Expader(即IsExpanded "true"&#xff09;时&#…...

docker 基础手册

文章目录 docker 基础手册docker 容器技术镜像与容器容器与虚拟机docker 引擎docker 架构docker 底层技术docker 二进制安装docker 镜像加速docker 相关链接docker 生态 docker 基础手册 docker 容器技术 开源的容器项目&#xff0c;使用 Go 语言开发原意“码头工人”&#x…...

记一次SPI机制导致的BUG定位【不支持:http://javax.xml.XMLConstants/property/accessExternalDTD】

1、前因 今天在生产环境启用了某个功能&#xff0c;结果发现有个文件上传华为云OBS失败了&#xff0c;报错如下&#xff1a; Caused by: java.lang.IllegalArgumentException: 不支持&#xff1a;http://javax.xml.XMLConstants/property/accessExternalDTDat org.apache.xal…...

Kali如何启动SSH服务并实现无公网ip环境远程连接

文章目录 1. 启动kali ssh 服务2. kali 安装cpolar 内网穿透3. 配置kali ssh公网地址4. 远程连接5. 固定连接SSH公网地址6. SSH固定地址连接测试 简单几步通过[cpolar 内网穿透](cpolar官网-安全的内网穿透工具 | 无需公网ip | 远程访问 | 搭建网站)软件实现ssh 远程连接kali! …...

谷粒商城配置虚拟机

一、创建虚拟机 之前有在VM里面建一个ubuntu的虚拟机&#xff0c;准备拿来直接用&#xff0c;网络设置为NAT模式&#xff0c;查看我的虚拟机是虚拟机&#xff1a;192.168.248.128 主机&#xff1a; 192.168.2.12。可以互相ping通。 二、linux安装docker Docker docker是虚拟…...

Java中文乱码浅析及解决方案

Java中文乱码浅析及解决方案 一、GBK和UTF-8编码方式二、idea和eclipse的默认编码方式三、解码和编码方法四、代码实现编码解码 五、额外知识扩展 一、GBK和UTF-8编码方式 如果采用的是UTF-8的编码方式&#xff0c;那么1个英文字母 占 1个字节&#xff0c;1个中文占3个字节如果…...

【前端基础--3】

文字样式 1.文字颜色 color 取值方式&#xff1a; 英文单词 red green blue十六进制的颜色值 #000000 也可以写为#000&#xff08;如aabbcc可以简写为abc&#xff09;rgb三原色取值 color&#xff1a;rgb(220,32,215) 取值范围都在0~255之间 2.文字大小 font-size …...

Obsidian笔记软件结合cpolar实现安卓移动端远程本地群晖WebDAV数据同步

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

51单片机电子密码锁Proteus仿真+程序+视频+报告

目录 视频 设计分析 系统结构 仿真图 资料内容 资料下载地址&#xff1a;51单片机电子密码锁Proteus仿真程序视频报告 视频 单片机电子密码锁Proteus仿真程序视频 设计分析 (1)能够从键盘中输入密码&#xff0c;并相应地在显示器上显示‘*’&#xff1b; (2)能够判断密码…...

[BSidesCF 2020]Had a bad day

先看url&#xff0c;发现可能有注入 http://655c742e-b427-485c-9e15-20a1e7ef1717.node5.buuoj.cn:81/index.php?categorywoofers 试试能不能查看index.php直接?categoryindex.php不行&#xff0c;试试伪协议 把.php去掉试试 base64解码 <?php$file $_GET[category];…...

[笔记]事务简介-springboot

在Spring Boot中&#xff0c;事务的管理通常通过注解来实现&#xff0c;使得配置变得简单而直观。这种方式与Spring Boot的设计理念一致&#xff0c;即减少显式配置&#xff0c;增加自动配置。以下是如何在Spring Boot项目中应用和管理事务的详细说明&#xff1a; Spring Boot中…...

初识计算机网络 | 计算机网络的发展 | 协议初识

1.计算机网络的发展 “矛盾是普遍存在的&#xff0c;矛盾是事物联系的实质内容和事物发展的根本动力&#xff01;” 计算机在诞生之初&#xff0c;在军事上用来计算导弹的弹道轨迹&#xff01;在发展的过程中&#xff08;商业的推动&#xff0c;国家政策推动&#xff09;&…...

告别生产翻车!用Altium Designer 21的DRC规则为你的PCB设计上好“保险”

Altium Designer 21 DRC规则深度实战&#xff1a;从设计规范到生产就绪的PCB 在硬件开发领域&#xff0c;PCB设计完成后到实际生产前的最后一道防线就是设计规则检查&#xff08;DRC&#xff09;。很多工程师将DRC视为简单的软件功能验证&#xff0c;但实际上&#xff0c;它承担…...

React Native跨平台AI聊天应用开发实战:架构设计与性能优化

1. 项目概述&#xff1a;一个全功能的跨平台AI聊天伴侣如果你和我一样&#xff0c;既是移动端开发者&#xff0c;又是AI应用的深度用户&#xff0c;那么你肯定经历过这样的困境&#xff1a;想在手机上随时随地、流畅地和ChatGPT对话&#xff0c;却发现官方App要么功能受限&…...

保姆级教程:用MNN在Android上部署你的第一个图像分类App(从模型转换到实时摄像头识别)

从零构建Android端智能图像分类应用&#xff1a;MNN实战全流程解析 在移动互联网时代&#xff0c;将AI能力嵌入移动端应用已成为提升用户体验的关键。想象一下这样的场景&#xff1a;用户打开手机就能实时识别植物种类、辨别商品真伪&#xff0c;或是自动分类相册中的照片——这…...

从经典工程恶作剧看理论派与实践派的思维碰撞与团队协作

1. 项目概述&#xff1a;一场经典的工程恶作剧及其启示在任何一个技术团队里&#xff0c;总有一些故事会口口相传&#xff0c;成为团队文化的一部分。我今天想分享的这个故事&#xff0c;发生在上世纪80年代初&#xff0c;一个微电路设计小组里。它无关乎高深的技术突破&#x…...

3PEAK思瑞浦 TPA3532-VS1R MSOP8 运算放大器

特性 超低输入偏置电流: -在TA25C时最大士1pA(实验室测试限值) 安 -在-40C至125C(实验室测试限值)下&#xff0c;最大30皮 低输入失调电压:250V(最大值) 集成保护缓冲器&#xff0c;最大偏移电压为200V 低电压噪声密度:18nV/vHz(在1kHz时) 宽带宽:2.1MHz 供电电压:4.5V至16V(2.…...

QQ音乐加密文件解密终极指南:qmcdump实战深度解析

QQ音乐加密文件解密终极指南&#xff1a;qmcdump实战深度解析 【免费下载链接】qmcdump 一个简单的QQ音乐解码&#xff08;qmcflac/qmc0/qmc3 转 flac/mp3&#xff09;&#xff0c;仅为个人学习参考用。 项目地址: https://gitcode.com/gh_mirrors/qm/qmcdump 你是否遇到…...

SRWE终极指南:5分钟学会游戏窗口分辨率自定义技巧

SRWE终极指南&#xff1a;5分钟学会游戏窗口分辨率自定义技巧 【免费下载链接】SRWE Simple Runtime Window Editor 项目地址: https://gitcode.com/gh_mirrors/sr/SRWE 想要在游戏中获得超高清截图&#xff0c;却受限于系统预设的分辨率&#xff1f;想要在窗口模式下享…...

喜马拉雅音频本地化实战:绕过xm格式,直接获取mp3文件的两种方法对比

喜马拉雅音频本地化实战&#xff1a;两种高效获取MP3文件的技术方案深度评测 作为国内领先的音频分享平台&#xff0c;喜马拉雅拥有海量优质内容&#xff0c;但其特有的XM格式却给用户跨平台使用带来了困扰。许多技术爱好者尝试过各种转换工具&#xff0c;却发现市面上几乎没有…...

网络安全事件报告:从SolarWinds事件看全球合规挑战与应对策略

1. 事件回顾&#xff1a;SolarWinds事件为何成为安全领域的“分水岭”如果你在网络安全或IT运维领域工作&#xff0c;2020年底曝光的SolarWinds供应链攻击事件&#xff0c;绝对是一个绕不开的里程碑。它不像一次简单的数据泄露&#xff0c;更像是一场精心策划、潜伏已久的“数字…...

乔布斯产品哲学对硬件工程师的启示:从参数到体验的转变

1. 项目概述&#xff1a;一次对乔布斯遗产的技术性致敬2011年10月6日&#xff0c;当史蒂夫乔布斯逝世的消息传来&#xff0c;整个科技界陷入了一种复杂的情绪。作为一名长期在电子工程与消费电子领域工作的人&#xff0c;我的感受尤为深刻。那天&#xff0c;我和我的同事们&…...