Doris:读取Doris数据的N种方法
目录
1.MySQL Client
2.JDBC
3. 查询计划
4.Spark Doris Connector
5.Flink Doris Connector
1.MySQL Client
Doris 采用 MySQL 协议,高度兼容 MySQL 语法,支持标准 SQL,用户可以通过各类客户端工具来访问 Doris。登录到doris服务器后,可使用 select语句查询数据。
mysql -uroot -P9030 -h127.0.0.1
为了防止用户的一个查询可能因为消耗内存过大。查询进行了内存控制,一个查询任务,在单个 BE 节点上默认使用不超过 2GB 内存。用户在使用时,如果发现报 Memory limit exceeded 错误,一般是超过内存限制了。遇到内存超限时,用户应该尽量通过优化自己的 sql 语句来解决。如果确切发现2GB内存不能满足,可以手动设置内存参数。
select 查询如果使用limit分页查询,则需要指定order by 字段,否则同一个sql返回的数据可能不一样。
2.JDBC
由于Doris 采用 MySQL 协议,同样也支持通过JDBC方式读取数据。
package com.yichenkeji.demo.test;import lombok.extern.slf4j.Slf4j;import java.sql.*;
import java.util.Properties;@Slf4j
public class DorisJDBCDemo {public static void main(String[] args) throws SQLException {String jdbc_driver = "com.mysql.cj.jdbc.Driver";String jdbc_url = "jdbc:mysql://192.168.179.131:9030/demo?rewriteBatchedStatements=true";String username = "root";String password = "";Connection conn = getConnection(username,password,jdbc_url,jdbc_driver);log.info("{}",conn);String sql = "select * from dim_area limit 10";Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery(sql);while (rs.next()){log.info("id={},name={}",rs.getFloat("id"),rs.getString("name"));}closeConnection(conn);}/*** 获取连接* @param username* @param password* @param jdbcUrl* @param driver* @return*/public static Connection getConnection(String username,String password,String jdbcUrl,String driver) {Properties prop = new Properties();prop.put("user", username);prop.put("password", password);try {Class.forName(driver);log.info("jdbcUrl:{}",jdbcUrl);return DriverManager.getConnection(jdbcUrl, prop);} catch (Exception e) {throw new RuntimeException(e);}}/*** 关闭连接* @param conn*/public static void closeConnection(Connection conn) {if(conn != null){try {if(!conn.isClosed()){conn.close();}} catch (SQLException e) {log.error("SQLException:{}", e.getMessage());}}}}
3. 查询计划
由于jdbc查询暂时不支持流式读取,如果读取的数据量过大,一次性读取全部数据需要很大的资源,所有可以使用查询计划API接口,给定一个 SQL,获取该 SQL 对应的查询计划。通过返回的数据分区信息,分批读取数据。
package com.yichenkeji.demo.test;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.sdk.thrift.TDorisExternalService;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.sdk.thrift.TScanCloseParams;
import org.apache.doris.sdk.thrift.TScanColumnDesc;
import org.apache.doris.sdk.thrift.TScanNextBatchParams;
import org.apache.doris.sdk.thrift.TScanOpenParams;
import org.apache.doris.sdk.thrift.TScanOpenResult;
import org.apache.doris.sdk.thrift.TStatusCode;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;import java.io.ByteArrayInputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
public class DorisReaderDemo{static String dorisUrl = "192.168.179.131:8030";static String username = "root";static String password = "";static String database = "demo";static String table = "dim_area";static String querySql = String.format("SELECT id, name from demo.dim_area");static int readRowCount = 0;static int readTotal = 0;public static void main(String[] args) throws Exception {String queryPlanUrl = String.format("http://%s/api/%s/%s/_query_plan",dorisUrl,database,table);QueryPlanResult queryPlanResult = DorisUtil.getQueryPlan(username,password,queryPlanUrl,querySql);if (queryPlanResult != null && queryPlanResult.getOpaquedQueryPlan() != null){JSONObject partitions = queryPlanResult.getPartitions();log.info("partitions:{}",partitions);for(Map.Entry<String, Object> tablet : partitions.entrySet()){Long tabletId = Long.parseLong(tablet.getKey());JSONObject value = JSONObject.parseObject(JSON.toJSONString(tablet.getValue()));//get first backendString routingsBackend = value.getJSONArray("routings").getString(0);String backendHost = routingsBackend.split(":")[0];String backendPort = routingsBackend.split(":")[1];//connect backendTBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();TTransport transport = new TSocket(new TConfiguration(), backendHost, Integer.parseInt(backendPort));TProtocol protocol = factory.getProtocol(transport);TDorisExternalService.Client client = new TDorisExternalService.Client(protocol);if (!transport.isOpen()) {transport.open();}//build paramsTScanOpenParams params = new TScanOpenParams();params.cluster = "default_cluster";params.database = database;params.table = table;params.tablet_ids = Arrays.asList(tabletId);params.opaqued_query_plan = queryPlanResult.getOpaquedQueryPlan();// max row number of one read batchparams.setBatchSize(50000);params.setQueryTimeout(3600);params.setMemLimit(2147483648L);params.setUser(username);params.setPasswd(password);//open scannerTScanOpenResult tScanOpenResult = client.openScanner(params);if (!TStatusCode.OK.equals(tScanOpenResult.getStatus().getStatusCode())) {throw new RuntimeException(String.format("The status of open scanner result from %s is '%s', error message is: %s.",routingsBackend, tScanOpenResult.getStatus().getStatusCode(), tScanOpenResult.getStatus().getErrorMsgs()));}List<TScanColumnDesc> selectedColumns = tScanOpenResult.getSelectedColumns();TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();nextBatchParams.setContextId(tScanOpenResult.getContextId());boolean eos = false;//read dataint offset = 0;while(!eos){nextBatchParams.setOffset(offset);TScanBatchResult nextResult = client.getNext(nextBatchParams);if (!TStatusCode.OK.equals(nextResult.getStatus().getStatusCode())) {throw new RuntimeException(String.format("The status of get next result from %s is '%s', error message is: %s.",routingsBackend, nextResult.getStatus().getStatusCode(), nextResult.getStatus().getErrorMsgs()));}eos = nextResult.isEos();if(!eos){RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);ArrowStreamReader arrowStreamReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator);VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();List<List<Object>> results = new ArrayList<>();while (arrowStreamReader.loadNextBatch()) {List<FieldVector> fieldVectors = root.getFieldVectors();//total data rowsint rowCountInOneBatch = root.getRowCount();for(int row = 0 ; row < rowCountInOneBatch ;row++){List<Object> record = new ArrayList<>();for (int col = 0; col < fieldVectors.size(); col++) {FieldVector fieldVector = fieldVectors.get(col);Types.MinorType minorType = fieldVector.getMinorType();Object v = DorisUtil.convertValue(row , minorType, fieldVector);record.add(v);}results.add(record);}offset += root.getRowCount();}log.info("total data rows:{}",results.size());//处理完之后要关闭,否则容易内存溢出arrowStreamReader.close();}}//closeTScanCloseParams closeParams = new TScanCloseParams();closeParams.setContextId(tScanOpenResult.getContextId());client.closeScanner(closeParams);if ((transport != null) && transport.isOpen()) {transport.close();}}}}public static String basicAuthHeader(String username, String password) {final String tobeEncode = username + ":" + password;byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));return "Basic " + new String(encoded);}/*** 获取查询计划* @param username* @param password* @param queryPlanUrl* @param sql* @return* @throws Exception*/public static QueryPlanResult getQueryPlan(String username, String password, String queryPlanUrl, String sql){try (CloseableHttpClient client = HttpClients.custom().build()) {HttpPost post = new HttpPost(queryPlanUrl);post.setHeader(HttpHeaders.EXPECT, "100-continue");post.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username,password));log.info("queryPlanUrl:{}",queryPlanUrl);log.info("sql:{}",sql);//The param is specific SQL, and the query plan is returnedMap<String,String> params = new HashMap<>();params.put("sql",sql);StringEntity entity = new StringEntity(JSON.toJSONString(params));post.setEntity(entity);try (CloseableHttpResponse response = client.execute(post)) {if (response.getEntity() != null ) {JSONObject queryPlanJSONObject = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));JSONObject dataJSONObject = queryPlanJSONObject.getJSONObject("data");if (dataJSONObject.containsKey("exception")){throw new RuntimeException(dataJSONObject.getString("exception"));}String queryPlan = dataJSONObject.getString("opaqued_query_plan");JSONObject partitions = dataJSONObject.getJSONObject("partitions");return new QueryPlanResult(queryPlan,partitions);}}}catch (Exception e){throw new RuntimeException(e);}return null;}}
4.Spark Doris Connector
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
val dorisSparkDF = spark.read.format("doris").option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME").option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT").option("user", "$YOUR_DORIS_USERNAME").option("password", "$YOUR_DORIS_PASSWORD").load()dorisSparkDF.show(5)
5.Flink Doris Connector
Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。本文档介绍Flink如何通过Datastream和SQL操作Doris。
DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
相关文章:

Doris:读取Doris数据的N种方法
目录 1.MySQL Client 2.JDBC 3. 查询计划 4.Spark Doris Connector 5.Flink Doris Connector 1.MySQL Client Doris 采用 MySQL 协议,高度兼容 MySQL 语法,支持标准 SQL,用户可以通过各类客户端工具来访问 Doris。登录到doris服务器后&a…...

ceph-deploy bclinux aarch64 ceph 14.2.10
ssh-copy-id,部署机免密登录其他三台主机 所有机器硬盘配置参考如下,计划采用vdb作为ceph数据盘 下载ceph-deploy pip install ceph-deploy 免密登录设置主机名 hostnamectl --static set-hostname ceph-0 .. 3 配置hosts 172.17.163.105 ceph-0 172.…...
爬虫项目(13):使用lxml抓取相亲信息
文章目录 书籍推荐完整代码效果书籍推荐 如果你对Python网络爬虫感兴趣,强烈推荐你阅读《Python网络爬虫入门到实战》。这本书详细介绍了Python网络爬虫的基础知识和高级技巧,是每位爬虫开发者的必读之作。详细介绍见👉: 《Python网络爬虫入门到实战》 书籍介绍 完整代码…...
mysql-数据库三大范式是什么、mysql有哪些索引类型,分别有什么作用 、 事务的特性和隔离级别
1. 数据库三大范式是什么? 数据库三大范式是设计关系型数据库时的规范化原则,确保数据库结构的合理性和减少数据冗余。 这三大范式分别是: - **第一范式(1NF):** 数据表中的所有列都是不可分割的原子数据项…...

微信小程序案例3-2 计算器
文章目录 一、运行效果二、知识储备(一)data-*自定义属性(二)模块 三、实现步骤(一)准备工作1、创建项目2、设置导航栏 (二)实现页面结构1、编写页面整体结构2、编写结果区域的结构3…...

QT QSplitter
分裂器QSplitter类提供了一个分裂器部件。和QBoxLayout类似,可以完成布局管理器的功能,但是包含在它里面的部件,默认是可以随着分裂器的大小变化而变化的。 比如一个按钮放在布局管理器中,它的垂直方向默认是不会被拉伸的,但是放到分裂器中就可以被拉伸。还有一点不…...

银行支付凭证截图生成器在线,工商邮政农业招商建设,画板+透明标签+图片框
用易语言设计了一个非常牛X的截图生成器,娱乐使用哈,软件我在这里也不会分享,模版网上找的,百度图库搜到的,上面的LOGO用的是一个在线生成器,然后标签用的黑月透明标签,加一个通用对话框读取图片…...
微服务概述
微服务架构是一种软件设计和开发范式,旨在将大型应用程序分解为一组小而独立的服务单元,这些单元可以独立开发、测试、部署和扩展。每个服务都专注于一个明确定义的业务功能,并通过轻量级的通信机制进行交互。以下是微服务架构的一些关键方面…...

LabVIEW中NIPackageManager功能介绍
LabVIEW中PackageManager功能介绍 使用NIPackage Manager可安装、更新、修复和删除NI软件。 安装NI软件 使用PackageManager浏览和安装NI软件。 1. 在浏览产品选项卡上,单击产品类别以显示该类别中的可用产品。 2. 选择要安装的产品,然后单击…...
【C语言】sem_getvalue
sem_getvalue 是 POSIX 线程库中用于获取信号量当前值的一个函数。信号量(Semaphore)是用于编程中的同步工具,用于管理多个线程或进程对共享资源的并发访问。通常用于限制可以同时访问共享资源的线程数量。函数 sem_getvalue 的声明通常出现在…...
Linux的shell的$# | fi | 说明
$# | fi | 说明 在Linux的Shell脚本中,$# 是一个特殊变量,表示传递给脚本的参数个数。 例如,如果你运行一个脚本并传递了三个参数,那么在脚本内部使用 $# 将会得到 3。这对于确定脚本在执行时接收到了多少个参数是非常有用的。以…...
C //例 7.12 用选择法对数组中10个整数按由小到大排序。
C程序设计 (第四版) 谭浩强 例 7.12 例 7.12 用选择法对数组中10个整数按由小到大排序。 IDE工具:VS2010 Note: 使用不同的IDE工具可能有部分差异。 代码块 方法:使用指针、动态分配内存 #include <stdio.h> #include …...
Spring Bean循环依赖问题及解决
什么是循环依赖 类与类之间的依赖关系形成了闭环,就会导致循环依赖问题的产生。举例来说,假设存在两个服务类A和服务类B,如果A通过依赖注入的方式引用了B,且B通过依赖注入的方式引用了A,那么A和B之间就存在循环依赖。…...

Golang源码分析 | 程序引导过程
环境说明 CentOS Linux release 7.2 (Final) go version go1.16.3 linux/amd64 GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-80.el7使用gdb查看程序入口 编写一个简单的go程序 // main.go package mainfunc main() {print("Hello world") } 编译go …...

第三章:人工智能深度学习教程-基础神经网络(第四节-从头开始的具有前向和反向传播的深度神经网络 – Python)
本文旨在从头开始实现深度神经网络。我们将实现一个深度神经网络,其中包含一个具有四个单元的隐藏层和一个输出层。实施将从头开始,并实施以下步骤。算法: 1. 可视化输入数据 2. 确定权重和偏置矩阵的形状 3. 初始化矩阵、要使用的函数 4. 前…...

【入门Flink】- 08Flink时间语义和窗口概念
Flink-Windows 是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 注意:Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗…...
【 OpenGauss源码学习 —— 列存储(CStore)(六)】
列存储(CStore)(六) 概述CStore::GetCUDataFromRemote 函数CStore::CheckConsistenceOfCUDescCtl 函数CStore::CheckConsistenceOfCUDesc 函数CStore::CheckConsistenceOfCUData 函数额外补充 声明:本文的部分内容参考…...

MUYUCMS v2.1:一款开源、轻量级的内容管理系统基于Thinkphp开发
MuYuCMS:一款基于Thinkphp开发的轻量级开源内容管理系统,为企业、个人站长提供快速建站解决方案。它具有以下的环境要求: 支持系统:Windows/Linux/Mac WEB服务器:Apache/Nginx/ISS PHP版本:php > 5.6 (…...

SDL2 显示文字
1.简介 SDL本身没有显示文字功能,它需要用扩展库SDL_ttf来显示文字。ttf是True Type Font的缩写,ttf是Windows下的缺省字体,它有美观,放大缩小不变形的优点,因此广泛应用很多场合。 使用ttf库的第一件事要从Windows的…...
c++ future 使用详解
c future 使用详解 std::future 头文件 #include <future>。 类模板,定义如下: template<class T> class future; template<class T> class future<T&>; template<> class future<void>;作用ÿ…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...

shell脚本--常见案例
1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件: 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...
Frozen-Flask :将 Flask 应用“冻结”为静态文件
Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是:将一个 Flask Web 应用生成成纯静态 HTML 文件,从而可以部署到静态网站托管服务上,如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...

2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...

三分算法与DeepSeek辅助证明是单峰函数
前置 单峰函数有唯一的最大值,最大值左侧的数值严格单调递增,最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值,最小值左侧的数值严格单调递减,最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...
4. TypeScript 类型推断与类型组合
一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式,自动确定它们的类型。 这一特性减少了显式类型注解的需要,在保持类型安全的同时简化了代码。通过分析上下文和初始值,TypeSc…...
MinIO Docker 部署:仅开放一个端口
MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...