Doris:读取Doris数据的N种方法
目录
1.MySQL Client
2.JDBC
3. 查询计划
4.Spark Doris Connector
5.Flink Doris Connector
1.MySQL Client
Doris 采用 MySQL 协议,高度兼容 MySQL 语法,支持标准 SQL,用户可以通过各类客户端工具来访问 Doris。登录到doris服务器后,可使用 select语句查询数据。
mysql -uroot -P9030 -h127.0.0.1
为了防止用户的一个查询可能因为消耗内存过大。查询进行了内存控制,一个查询任务,在单个 BE 节点上默认使用不超过 2GB 内存。用户在使用时,如果发现报 Memory limit exceeded 错误,一般是超过内存限制了。遇到内存超限时,用户应该尽量通过优化自己的 sql 语句来解决。如果确切发现2GB内存不能满足,可以手动设置内存参数。

select 查询如果使用limit分页查询,则需要指定order by 字段,否则同一个sql返回的数据可能不一样。

2.JDBC
由于Doris 采用 MySQL 协议,同样也支持通过JDBC方式读取数据。
package com.yichenkeji.demo.test;import lombok.extern.slf4j.Slf4j;import java.sql.*;
import java.util.Properties;@Slf4j
public class DorisJDBCDemo {public static void main(String[] args) throws SQLException {String jdbc_driver = "com.mysql.cj.jdbc.Driver";String jdbc_url = "jdbc:mysql://192.168.179.131:9030/demo?rewriteBatchedStatements=true";String username = "root";String password = "";Connection conn = getConnection(username,password,jdbc_url,jdbc_driver);log.info("{}",conn);String sql = "select * from dim_area limit 10";Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery(sql);while (rs.next()){log.info("id={},name={}",rs.getFloat("id"),rs.getString("name"));}closeConnection(conn);}/*** 获取连接* @param username* @param password* @param jdbcUrl* @param driver* @return*/public static Connection getConnection(String username,String password,String jdbcUrl,String driver) {Properties prop = new Properties();prop.put("user", username);prop.put("password", password);try {Class.forName(driver);log.info("jdbcUrl:{}",jdbcUrl);return DriverManager.getConnection(jdbcUrl, prop);} catch (Exception e) {throw new RuntimeException(e);}}/*** 关闭连接* @param conn*/public static void closeConnection(Connection conn) {if(conn != null){try {if(!conn.isClosed()){conn.close();}} catch (SQLException e) {log.error("SQLException:{}", e.getMessage());}}}}
3. 查询计划
由于jdbc查询暂时不支持流式读取,如果读取的数据量过大,一次性读取全部数据需要很大的资源,所有可以使用查询计划API接口,给定一个 SQL,获取该 SQL 对应的查询计划。通过返回的数据分区信息,分批读取数据。
package com.yichenkeji.demo.test;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.sdk.thrift.TDorisExternalService;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.sdk.thrift.TScanCloseParams;
import org.apache.doris.sdk.thrift.TScanColumnDesc;
import org.apache.doris.sdk.thrift.TScanNextBatchParams;
import org.apache.doris.sdk.thrift.TScanOpenParams;
import org.apache.doris.sdk.thrift.TScanOpenResult;
import org.apache.doris.sdk.thrift.TStatusCode;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;import java.io.ByteArrayInputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
public class DorisReaderDemo{static String dorisUrl = "192.168.179.131:8030";static String username = "root";static String password = "";static String database = "demo";static String table = "dim_area";static String querySql = String.format("SELECT id, name from demo.dim_area");static int readRowCount = 0;static int readTotal = 0;public static void main(String[] args) throws Exception {String queryPlanUrl = String.format("http://%s/api/%s/%s/_query_plan",dorisUrl,database,table);QueryPlanResult queryPlanResult = DorisUtil.getQueryPlan(username,password,queryPlanUrl,querySql);if (queryPlanResult != null && queryPlanResult.getOpaquedQueryPlan() != null){JSONObject partitions = queryPlanResult.getPartitions();log.info("partitions:{}",partitions);for(Map.Entry<String, Object> tablet : partitions.entrySet()){Long tabletId = Long.parseLong(tablet.getKey());JSONObject value = JSONObject.parseObject(JSON.toJSONString(tablet.getValue()));//get first backendString routingsBackend = value.getJSONArray("routings").getString(0);String backendHost = routingsBackend.split(":")[0];String backendPort = routingsBackend.split(":")[1];//connect backendTBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();TTransport transport = new TSocket(new TConfiguration(), backendHost, Integer.parseInt(backendPort));TProtocol protocol = factory.getProtocol(transport);TDorisExternalService.Client client = new TDorisExternalService.Client(protocol);if (!transport.isOpen()) {transport.open();}//build paramsTScanOpenParams params = new TScanOpenParams();params.cluster = "default_cluster";params.database = database;params.table = table;params.tablet_ids = Arrays.asList(tabletId);params.opaqued_query_plan = queryPlanResult.getOpaquedQueryPlan();// max row number of one read batchparams.setBatchSize(50000);params.setQueryTimeout(3600);params.setMemLimit(2147483648L);params.setUser(username);params.setPasswd(password);//open scannerTScanOpenResult tScanOpenResult = client.openScanner(params);if (!TStatusCode.OK.equals(tScanOpenResult.getStatus().getStatusCode())) {throw new RuntimeException(String.format("The status of open scanner result from %s is '%s', error message is: %s.",routingsBackend, tScanOpenResult.getStatus().getStatusCode(), tScanOpenResult.getStatus().getErrorMsgs()));}List<TScanColumnDesc> selectedColumns = tScanOpenResult.getSelectedColumns();TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();nextBatchParams.setContextId(tScanOpenResult.getContextId());boolean eos = false;//read dataint offset = 0;while(!eos){nextBatchParams.setOffset(offset);TScanBatchResult nextResult = client.getNext(nextBatchParams);if (!TStatusCode.OK.equals(nextResult.getStatus().getStatusCode())) {throw new RuntimeException(String.format("The status of get next result from %s is '%s', error message is: %s.",routingsBackend, nextResult.getStatus().getStatusCode(), nextResult.getStatus().getErrorMsgs()));}eos = nextResult.isEos();if(!eos){RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);ArrowStreamReader arrowStreamReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator);VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();List<List<Object>> results = new ArrayList<>();while (arrowStreamReader.loadNextBatch()) {List<FieldVector> fieldVectors = root.getFieldVectors();//total data rowsint rowCountInOneBatch = root.getRowCount();for(int row = 0 ; row < rowCountInOneBatch ;row++){List<Object> record = new ArrayList<>();for (int col = 0; col < fieldVectors.size(); col++) {FieldVector fieldVector = fieldVectors.get(col);Types.MinorType minorType = fieldVector.getMinorType();Object v = DorisUtil.convertValue(row , minorType, fieldVector);record.add(v);}results.add(record);}offset += root.getRowCount();}log.info("total data rows:{}",results.size());//处理完之后要关闭,否则容易内存溢出arrowStreamReader.close();}}//closeTScanCloseParams closeParams = new TScanCloseParams();closeParams.setContextId(tScanOpenResult.getContextId());client.closeScanner(closeParams);if ((transport != null) && transport.isOpen()) {transport.close();}}}}public static String basicAuthHeader(String username, String password) {final String tobeEncode = username + ":" + password;byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));return "Basic " + new String(encoded);}/*** 获取查询计划* @param username* @param password* @param queryPlanUrl* @param sql* @return* @throws Exception*/public static QueryPlanResult getQueryPlan(String username, String password, String queryPlanUrl, String sql){try (CloseableHttpClient client = HttpClients.custom().build()) {HttpPost post = new HttpPost(queryPlanUrl);post.setHeader(HttpHeaders.EXPECT, "100-continue");post.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username,password));log.info("queryPlanUrl:{}",queryPlanUrl);log.info("sql:{}",sql);//The param is specific SQL, and the query plan is returnedMap<String,String> params = new HashMap<>();params.put("sql",sql);StringEntity entity = new StringEntity(JSON.toJSONString(params));post.setEntity(entity);try (CloseableHttpResponse response = client.execute(post)) {if (response.getEntity() != null ) {JSONObject queryPlanJSONObject = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));JSONObject dataJSONObject = queryPlanJSONObject.getJSONObject("data");if (dataJSONObject.containsKey("exception")){throw new RuntimeException(dataJSONObject.getString("exception"));}String queryPlan = dataJSONObject.getString("opaqued_query_plan");JSONObject partitions = dataJSONObject.getJSONObject("partitions");return new QueryPlanResult(queryPlan,partitions);}}}catch (Exception e){throw new RuntimeException(e);}return null;}}
4.Spark Doris Connector
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
val dorisSparkDF = spark.read.format("doris").option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME").option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT").option("user", "$YOUR_DORIS_USERNAME").option("password", "$YOUR_DORIS_PASSWORD").load()dorisSparkDF.show(5)
5.Flink Doris Connector
Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。本文档介绍Flink如何通过Datastream和SQL操作Doris。
DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
相关文章:
Doris:读取Doris数据的N种方法
目录 1.MySQL Client 2.JDBC 3. 查询计划 4.Spark Doris Connector 5.Flink Doris Connector 1.MySQL Client Doris 采用 MySQL 协议,高度兼容 MySQL 语法,支持标准 SQL,用户可以通过各类客户端工具来访问 Doris。登录到doris服务器后&a…...
ceph-deploy bclinux aarch64 ceph 14.2.10
ssh-copy-id,部署机免密登录其他三台主机 所有机器硬盘配置参考如下,计划采用vdb作为ceph数据盘 下载ceph-deploy pip install ceph-deploy 免密登录设置主机名 hostnamectl --static set-hostname ceph-0 .. 3 配置hosts 172.17.163.105 ceph-0 172.…...
爬虫项目(13):使用lxml抓取相亲信息
文章目录 书籍推荐完整代码效果书籍推荐 如果你对Python网络爬虫感兴趣,强烈推荐你阅读《Python网络爬虫入门到实战》。这本书详细介绍了Python网络爬虫的基础知识和高级技巧,是每位爬虫开发者的必读之作。详细介绍见👉: 《Python网络爬虫入门到实战》 书籍介绍 完整代码…...
mysql-数据库三大范式是什么、mysql有哪些索引类型,分别有什么作用 、 事务的特性和隔离级别
1. 数据库三大范式是什么? 数据库三大范式是设计关系型数据库时的规范化原则,确保数据库结构的合理性和减少数据冗余。 这三大范式分别是: - **第一范式(1NF):** 数据表中的所有列都是不可分割的原子数据项…...
微信小程序案例3-2 计算器
文章目录 一、运行效果二、知识储备(一)data-*自定义属性(二)模块 三、实现步骤(一)准备工作1、创建项目2、设置导航栏 (二)实现页面结构1、编写页面整体结构2、编写结果区域的结构3…...
QT QSplitter
分裂器QSplitter类提供了一个分裂器部件。和QBoxLayout类似,可以完成布局管理器的功能,但是包含在它里面的部件,默认是可以随着分裂器的大小变化而变化的。 比如一个按钮放在布局管理器中,它的垂直方向默认是不会被拉伸的,但是放到分裂器中就可以被拉伸。还有一点不…...
银行支付凭证截图生成器在线,工商邮政农业招商建设,画板+透明标签+图片框
用易语言设计了一个非常牛X的截图生成器,娱乐使用哈,软件我在这里也不会分享,模版网上找的,百度图库搜到的,上面的LOGO用的是一个在线生成器,然后标签用的黑月透明标签,加一个通用对话框读取图片…...
微服务概述
微服务架构是一种软件设计和开发范式,旨在将大型应用程序分解为一组小而独立的服务单元,这些单元可以独立开发、测试、部署和扩展。每个服务都专注于一个明确定义的业务功能,并通过轻量级的通信机制进行交互。以下是微服务架构的一些关键方面…...
LabVIEW中NIPackageManager功能介绍
LabVIEW中PackageManager功能介绍 使用NIPackage Manager可安装、更新、修复和删除NI软件。 安装NI软件 使用PackageManager浏览和安装NI软件。 1. 在浏览产品选项卡上,单击产品类别以显示该类别中的可用产品。 2. 选择要安装的产品,然后单击…...
【C语言】sem_getvalue
sem_getvalue 是 POSIX 线程库中用于获取信号量当前值的一个函数。信号量(Semaphore)是用于编程中的同步工具,用于管理多个线程或进程对共享资源的并发访问。通常用于限制可以同时访问共享资源的线程数量。函数 sem_getvalue 的声明通常出现在…...
Linux的shell的$# | fi | 说明
$# | fi | 说明 在Linux的Shell脚本中,$# 是一个特殊变量,表示传递给脚本的参数个数。 例如,如果你运行一个脚本并传递了三个参数,那么在脚本内部使用 $# 将会得到 3。这对于确定脚本在执行时接收到了多少个参数是非常有用的。以…...
C //例 7.12 用选择法对数组中10个整数按由小到大排序。
C程序设计 (第四版) 谭浩强 例 7.12 例 7.12 用选择法对数组中10个整数按由小到大排序。 IDE工具:VS2010 Note: 使用不同的IDE工具可能有部分差异。 代码块 方法:使用指针、动态分配内存 #include <stdio.h> #include …...
Spring Bean循环依赖问题及解决
什么是循环依赖 类与类之间的依赖关系形成了闭环,就会导致循环依赖问题的产生。举例来说,假设存在两个服务类A和服务类B,如果A通过依赖注入的方式引用了B,且B通过依赖注入的方式引用了A,那么A和B之间就存在循环依赖。…...
Golang源码分析 | 程序引导过程
环境说明 CentOS Linux release 7.2 (Final) go version go1.16.3 linux/amd64 GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-80.el7使用gdb查看程序入口 编写一个简单的go程序 // main.go package mainfunc main() {print("Hello world") } 编译go …...
第三章:人工智能深度学习教程-基础神经网络(第四节-从头开始的具有前向和反向传播的深度神经网络 – Python)
本文旨在从头开始实现深度神经网络。我们将实现一个深度神经网络,其中包含一个具有四个单元的隐藏层和一个输出层。实施将从头开始,并实施以下步骤。算法: 1. 可视化输入数据 2. 确定权重和偏置矩阵的形状 3. 初始化矩阵、要使用的函数 4. 前…...
【入门Flink】- 08Flink时间语义和窗口概念
Flink-Windows 是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 注意:Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗…...
【 OpenGauss源码学习 —— 列存储(CStore)(六)】
列存储(CStore)(六) 概述CStore::GetCUDataFromRemote 函数CStore::CheckConsistenceOfCUDescCtl 函数CStore::CheckConsistenceOfCUDesc 函数CStore::CheckConsistenceOfCUData 函数额外补充 声明:本文的部分内容参考…...
MUYUCMS v2.1:一款开源、轻量级的内容管理系统基于Thinkphp开发
MuYuCMS:一款基于Thinkphp开发的轻量级开源内容管理系统,为企业、个人站长提供快速建站解决方案。它具有以下的环境要求: 支持系统:Windows/Linux/Mac WEB服务器:Apache/Nginx/ISS PHP版本:php > 5.6 (…...
SDL2 显示文字
1.简介 SDL本身没有显示文字功能,它需要用扩展库SDL_ttf来显示文字。ttf是True Type Font的缩写,ttf是Windows下的缺省字体,它有美观,放大缩小不变形的优点,因此广泛应用很多场合。 使用ttf库的第一件事要从Windows的…...
c++ future 使用详解
c future 使用详解 std::future 头文件 #include <future>。 类模板,定义如下: template<class T> class future; template<class T> class future<T&>; template<> class future<void>;作用ÿ…...
通过Wrangler CLI在worker中创建数据库和表
官方使用文档:Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后,会在本地和远程创建数据库: npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库: 现在,您的Cloudfla…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
uniapp 实现腾讯云IM群文件上传下载功能
UniApp 集成腾讯云IM实现群文件上传下载功能全攻略 一、功能背景与技术选型 在团队协作场景中,群文件共享是核心需求之一。本文将介绍如何基于腾讯云IMCOS,在uniapp中实现: 群内文件上传/下载文件元数据管理下载进度追踪跨平台文件预览 二…...
SQL Server 触发器调用存储过程实现发送 HTTP 请求
文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...
用鸿蒙HarmonyOS5实现中国象棋小游戏的过程
下面是一个基于鸿蒙OS (HarmonyOS) 的中国象棋小游戏的实现代码。这个实现使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chinesechess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├──…...
Python竞赛环境搭建全攻略
Python环境搭建竞赛技术文章大纲 竞赛背景与意义 竞赛的目的与价值Python在竞赛中的应用场景环境搭建对竞赛效率的影响 竞赛环境需求分析 常见竞赛类型(算法、数据分析、机器学习等)不同竞赛对Python版本及库的要求硬件与操作系统的兼容性问题 Pyth…...
基于开源AI智能名片链动2 + 1模式S2B2C商城小程序的沉浸式体验营销研究
摘要:在消费市场竞争日益激烈的当下,传统体验营销方式存在诸多局限。本文聚焦开源AI智能名片链动2 1模式S2B2C商城小程序,探讨其在沉浸式体验营销中的应用。通过对比传统品鉴、工厂参观等初级体验方式,分析沉浸式体验的优势与价值…...
