DataX DorisWriter 插件DorisStreamLoadObserver类详细解读
DorisStreamLoadObserver 类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用:
DorisStreamLoadObserver(Keys options): 这是类的构造函数,用于初始化加载数据所需的配置选项。void streamLoad(WriterTuple data) throws Exception: 该方法是数据加载的主要方法。它将给定的数据(WriterTuple对象)加载到 Doris 数据库中。它构建了用于将数据发送到 Doris 的 HTTP 请求,并根据响应状态来确定加载是否成功。如果加载失败,它会抛出异常。private void checkStreamLoadState(String host, String label) throws IOException: 这个方法用于检查数据加载的状态。它会不断地轮询 Doris 服务器,以获取特定加载任务的最终状态。根据加载状态的不同,它可能会抛出异常或者在加载完成时返回。private byte[] addRows(List<byte[]> rows, int totalBytes): 此方法根据给定的数据行和总字节数,构建用于加载的字节数组。它根据配置中的数据格式(CSV 或 JSON)将数据行连接起来,并添加适当的分隔符。private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException: 该方法执行 HTTP PUT 请求,将数据加载到 Doris 数据库中。它构建了包含数据的请求实体,发送到指定的加载 URL,并解析响应以获取加载结果。private String getBasicAuthHeader(String username, String password): 此方法用于生成基本身份验证头部,以便在 HTTP 请求中进行身份验证。private HttpEntity getHttpEntity(CloseableHttpResponse response): 这是一个实用方法,用于从 HTTP 响应中提取实体内容。private String getLoadHost(): 该方法从配置选项中获取用于加载数据的主机地址列表,并尝试连接到这些主机以检查其可用性。它会返回第一个可用的主机地址。
DorisStreamLoadObserver 类主要用于处理数据加载任务,它负责构建适当的 HTTP 请求,将数据发送到 Doris 数据库,并监控加载任务的状态。通过这些方法,可以实现将数据从外部系统加载到 Doris 数据库中,并在加载过程中进行必要的状态检查和错误处理。
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class DorisStreamLoadObserver {private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);private Keys options;private long pos;private static final String RESULT_FAILED = "Fail";private static final String RESULT_LABEL_EXISTED = "Label Already Exists";private static final String LAEBL_STATE_VISIBLE = "VISIBLE";private static final String LAEBL_STATE_COMMITTED = "COMMITTED";private static final String RESULT_LABEL_PREPARE = "PREPARE";private static final String RESULT_LABEL_ABORTED = "ABORTED";private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";public DorisStreamLoadObserver(Keys options) {this.options = options;}// 数据写入 Doris 的主要方法public void streamLoad(WriterTuple data) throws Exception {String host = getLoadHost();if (host == null) {throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");}String loadUrl = new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/").append(options.getTable()).append("/_stream_load").toString();LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));final String keyStatus = "Status";if (null == loadResult || !loadResult.containsKey(keyStatus)) {throw new IOException("Unable to flush data to Doris: unknown result status.");}LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {throw new IOException(new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString());} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));checkStreamLoadState(host, data.getLabel());}}// 检查数据加载状态的方法private void checkStreamLoadState(String host, String label) throws IOException {int idx = 0;while (true) {try {TimeUnit.SECONDS.sleep(Math.min(++idx, 5));} catch (InterruptedException ex) {break;}try (CloseableHttpClient httpclient = HttpClients.createDefault()) {HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpGet.setHeader("Connection", "close");try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s].\n", label), null);}Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));String labelState = (String) result.get("state");if (null == labelState) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);}LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));switch (labelState) {case LAEBL_STATE_VISIBLE:case LAEBL_STATE_COMMITTED:return;case RESULT_LABEL_PREPARE:continue;case RESULT_LABEL_ABORTED:throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null, true);case RESULT_LABEL_UNKNOWN:default:throw new IOException(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null);}}}}}// 根据格式将数据行拼接成字节数组private byte[] addRows(List<byte[]> rows, int totalBytes) {if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);for (byte[] row : rows) {bos.put(row);bos.put(lineDelimiter);}return bos.array();}if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));bos.put("[".getBytes(StandardCharsets.UTF_8));byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);boolean isFirstElement = true;for (byte[] row : rows) {if (!isFirstElement) {bos.put(jsonDelimiter);}bos.put(row);isFirstElement = false;}bos.put("]".getBytes(StandardCharsets.UTF_8));return bos.array();}throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");}private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(120 * 1000).setConnectTimeout(120 * 1000).setConnectionRequestTimeout(120 * 1000).build();try (CloseableHttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).setRedirectStrategy(new DefaultRedirectStrategy()).build()) {HttpPut httpPut = new HttpPut(loadUrl);httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpPut.setEntity(new ByteArrayEntity(data));try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");}return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));}}}// 构造 HTTP 请求中的基本认证头部private String getBasicAuthHeader(String username, String password) {String credentials = username + ":" + password;byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);String base64Credentials = Base64.encodeBase64String(credentialsBytes);return "Basic " + base64Credentials;}// 从 HTTP 响应中获取实体内容private HttpEntity getHttpEntity(CloseableHttpResponse response) {if (response != null) {return response.getEntity();}return null;}// 获取用于加载数据的主机地址private String getLoadHost() {List<String> hosts = options.getDorisStreamLoadUrls();for (String host : hosts) {try {HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();connection.setRequestMethod("HEAD");int responseCode = connection.getResponseCode();if (responseCode == HttpURLConnection.HTTP_OK) {return host;}} catch (IOException e) {LOG.warn("Failed to connect to host: {}", host);}}return null;}
}相关文章:
DataX DorisWriter 插件DorisStreamLoadObserver类详细解读
DorisStreamLoadObserver 类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用: Doris…...
leetcode:1710. 卡车上的最大单元数(python3解法)
难度:简单 请你将一些箱子装在 一辆卡车 上。给你一个二维数组 boxTypes ,其中 boxTypes[i] [numberOfBoxesi, numberOfUnitsPerBoxi] : numberOfBoxesi 是类型 i 的箱子的数量。numberOfUnitsPerBoxi 是类型 i 每个箱子可以装载的单元数量。…...
Spring_JDBC的使用
Spring 是个一站式框架:Spring 自身也提供了控制层的 SpringMVC和持久层的 Spring JdbcTemplate。 配置信息 1.下载 Spring JdbcTemplate 的 jar 包,在pom.xml中导入 <dependency><groupId>org.springframework</groupId><artifactId>spr…...
【Python从入门到进阶】34、selenium基本概念及安装流程
接上篇《33、使用bs4获取星巴克产品信息》 上一篇我们介绍了如何使用bs4来解析星巴克网站,获取其产品信息。本篇我们来了解selenium技术的基础。 一、什么是selenium? Selenium是一种用于自动化Web浏览器操作的开源工具。它提供了一组API(应…...
如何确保ChatGPT在文本生成中遵循道德和伦理准则?
确保ChatGPT在文本生成中遵循道德和伦理准则是一个复杂而重要的任务。人工智能(AI)系统,特别是语言模型,具有强大的生成能力,但如果不受到道德和伦理准则的约束,可能会导致一系列问题,包括歧视、…...
RISC-V Linux系统rootfs制作
文章目录 1、下载2、配置与编译3、运行 buildroot 是一个构建嵌入式Linux系统的框架。整个 buildroot 是由Makefile(*.mk) 脚本和 Kconfig(Config.in) 配置文件构成的,因此可以像配置 Linux 内核一样执行 make menuconfig 进行配置,编译出一个完整的、可…...
git常用场景记录 | 拉取远程分支A合并到本地分支B
文章目录 git常用场景记录拉取远程分支A合并到本地分支B本地分支B存在未add与commit的代码 git常用场景记录 doing,最后更新9.1 拉取远程分支A合并到本地分支B 需求描述 在团队合作时,我自己的本地分支B功能已经实现并合并到feature,之后发现…...
如何利用Linux进行数据管理和分析?
Linux是一款非常强大的操作系统,它不仅可以帮助你管理数据,还可以让你成为一名数据分析大师。只要你会使用命令行,你就可以用Linux进行数据管理和分析。 现在,让我们来看看如何使用Linux进行数据管理。 使用sort命令对数据进行排…...
vue3封装echarts图表数据无法渲染到页面
问题是后端的数据已经成功返回到前端了,但是Echarts图表一直不能被渲染,卡了一个多小时,最后问gpt才解决(gptyyds!!!) methods: {loadGet() {this.$axios.get(this.$httpUrl /goods…...
MySQL索引,事务和存储引擎
一、索引 1、索引的概念 ●索引是一个排序的列表,在这个列表中存储着索引的值和包含这个值的数据所在行的物理地址(类似于C语言的链表通过指针指向数据记录的内存地址)。 ●使用索引后可以不用扫描全表来定位某行的数据,而是先…...
开发指导—利用CSS动画实现HarmonyOS动效(一)
注:本文内容分享转载自 HarmonyOS Developer 官网文档 一. CSS 语法参考 CSS 是描述 HML 页面结构的样式语言。所有组件均存在系统默认样式,也可在页面 CSS 样式文件中对组件、页面自定义不同的样式。请参考通用样式了解兼容 JS 的类 Web 开发范式支持的…...
电商项目part10 高并发缓存实战
缓存的数据一致性 只要使用到缓存,无论是本地内存做缓存还是使用 redis 做缓存,那么就会存在数据同步的问题。 先读缓存数据,缓存数据有,则立即返回结果;如果没有数据,则从数据库读数据,并且把…...
MongoDB实验——MongoDB shell操作
MongoDB shell操作 实验原理 MongoDB shell是一个可执行文件,是MongoDB自带的一个交互式JavaScript shell,位于MongoDB安装路径下的/bin文件夹中。要启动MongoDB shell,可执行命令mongo。这将在控制台提示符中启动该shell,Mongo…...
数据分析师职业发展道路,工作内容是什么?
很多同学问,参加数据分析就业班后之的就业发展道路是怎样的,工作又能做什么呢? 市面上的常见的工作类型有有运营类、技术类及分析类等,可以根据自己的意愿去做适合自己的工作,但是任何工作其实都是需要一技之长。…...
Vue3 + ts的使用
一. IDE的配置 1. VSCode 插件安装搜索builtin typescript 2. 点击“TypeScript and JavaScript Language Features”右下角的小齿轮,然后选择“Disable (Workspace)” 3. 重新加载工作空间。Takeover 模式将会在你打开一个 Vue 或者 TS 文件时自动启用。 二. 依赖的…...
CF Edu152 C
Problem - C - Codeforces 题意: 思路: 首先,观察样例可知 这种是等效的 推广一下 0000.....111111 ..l..............r...... 这种是等效的 容易想到维护后面第一个1的位置和前面第一个0的位置,然后把所有区间都等效一下&…...
iBooker 技术评论 20230902
一、女子同时供职 16 家公司却从不上班,全国骗薪群体至少有七八百人,为何会出现此类骗薪群体? 社保其实很好绕过。就是这些骗薪者一起创立一个外包公司,然后通过这个公司把自己外包出去。这些人和外包公司签的是劳务合同…...
视频动态壁纸 Dynamic Wallpaper for Mac中文
Dynamic Wallpaper是一款Mac平台上的动态壁纸应用程序,它可以根据时间等因素动态切换壁纸,提供更加生动和多样化的桌面体验。 Dynamic Wallpaper包含了多个动态壁纸,用户可以根据自己的喜好选择和切换。这些动态壁纸可以根据时间等因素进行自…...
Java“牵手”京东商品列表数据,关键词搜索京东商品数据接口,京东API申请指南
京东商城是一个网上购物平台,售卖各类商品,包括服装、鞋类、家居用品、美妆产品、电子产品等。要获取京东商品列表和商品详情页面数据,您可以通过开放平台的接口或者直接访问京东商城的网页来获取商品详情信息。以下是两种常用方法的介绍&…...
springboot实战(三)之多环境部署配置文件生效方式
环境: jdk:1.8 springboot版本:2.7.15 配置: 1.新建yml文件 在resources包中创建application-dev.yml、application-testing.yml两个yml文件 2.配置 在application.yml进行配置生效文件 3.注意事项 新建yml的名称必须以&qu…...
Prompt Tuning、P-Tuning、Prefix Tuning的区别
一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...
无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...
【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...
深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...
