当前位置: 首页 > news >正文

Doris:读取Doris数据的N种方法

目录

1.MySQL Client

2.JDBC

3. 查询计划

4.Spark Doris Connector

5.Flink Doris Connector


1.MySQL Client

        Doris 采用 MySQL 协议,高度兼容 MySQL 语法,支持标准 SQL,用户可以通过各类客户端工具来访问 Doris。登录到doris服务器后,可使用 select语句查询数据。

mysql -uroot -P9030 -h127.0.0.1

        为了防止用户的一个查询可能因为消耗内存过大。查询进行了内存控制,一个查询任务,在单个 BE 节点上默认使用不超过 2GB 内存。用户在使用时,如果发现报 Memory limit exceeded 错误,一般是超过内存限制了。遇到内存超限时,用户应该尽量通过优化自己的 sql 语句来解决。如果确切发现2GB内存不能满足,可以手动设置内存参数。

    select 查询如果使用limit分页查询,则需要指定order by 字段,否则同一个sql返回的数据可能不一样。

2.JDBC

        由于Doris 采用 MySQL 协议,同样也支持通过JDBC方式读取数据。

package com.yichenkeji.demo.test;import lombok.extern.slf4j.Slf4j;import java.sql.*;
import java.util.Properties;@Slf4j
public class DorisJDBCDemo {public static void main(String[] args) throws SQLException {String jdbc_driver = "com.mysql.cj.jdbc.Driver";String jdbc_url = "jdbc:mysql://192.168.179.131:9030/demo?rewriteBatchedStatements=true";String username = "root";String password = "";Connection conn = getConnection(username,password,jdbc_url,jdbc_driver);log.info("{}",conn);String sql = "select * from dim_area limit 10";Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery(sql);while (rs.next()){log.info("id={},name={}",rs.getFloat("id"),rs.getString("name"));}closeConnection(conn);}/*** 获取连接* @param username* @param password* @param jdbcUrl* @param driver* @return*/public static Connection getConnection(String username,String password,String jdbcUrl,String driver) {Properties prop = new Properties();prop.put("user", username);prop.put("password", password);try {Class.forName(driver);log.info("jdbcUrl:{}",jdbcUrl);return DriverManager.getConnection(jdbcUrl, prop);} catch (Exception e) {throw new RuntimeException(e);}}/*** 关闭连接* @param conn*/public static void closeConnection(Connection conn) {if(conn != null){try {if(!conn.isClosed()){conn.close();}} catch (SQLException e) {log.error("SQLException:{}", e.getMessage());}}}}

3. 查询计划

        由于jdbc查询暂时不支持流式读取,如果读取的数据量过大,一次性读取全部数据需要很大的资源,所有可以使用查询计划API接口,给定一个 SQL,获取该 SQL 对应的查询计划。通过返回的数据分区信息,分批读取数据。

package com.yichenkeji.demo.test;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.sdk.thrift.TDorisExternalService;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.sdk.thrift.TScanCloseParams;
import org.apache.doris.sdk.thrift.TScanColumnDesc;
import org.apache.doris.sdk.thrift.TScanNextBatchParams;
import org.apache.doris.sdk.thrift.TScanOpenParams;
import org.apache.doris.sdk.thrift.TScanOpenResult;
import org.apache.doris.sdk.thrift.TStatusCode;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;import java.io.ByteArrayInputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
public class DorisReaderDemo{static String dorisUrl = "192.168.179.131:8030";static String username = "root";static String password = "";static String database = "demo";static String table = "dim_area";static String querySql = String.format("SELECT id, name from demo.dim_area");static int readRowCount = 0;static int readTotal = 0;public static void main(String[] args) throws Exception {String queryPlanUrl = String.format("http://%s/api/%s/%s/_query_plan",dorisUrl,database,table);QueryPlanResult queryPlanResult = DorisUtil.getQueryPlan(username,password,queryPlanUrl,querySql);if (queryPlanResult != null && queryPlanResult.getOpaquedQueryPlan() != null){JSONObject partitions = queryPlanResult.getPartitions();log.info("partitions:{}",partitions);for(Map.Entry<String, Object> tablet : partitions.entrySet()){Long tabletId = Long.parseLong(tablet.getKey());JSONObject value = JSONObject.parseObject(JSON.toJSONString(tablet.getValue()));//get first backendString routingsBackend = value.getJSONArray("routings").getString(0);String backendHost = routingsBackend.split(":")[0];String backendPort = routingsBackend.split(":")[1];//connect backendTBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();TTransport transport = new TSocket(new TConfiguration(), backendHost, Integer.parseInt(backendPort));TProtocol protocol = factory.getProtocol(transport);TDorisExternalService.Client client = new TDorisExternalService.Client(protocol);if (!transport.isOpen()) {transport.open();}//build paramsTScanOpenParams params = new TScanOpenParams();params.cluster = "default_cluster";params.database = database;params.table = table;params.tablet_ids = Arrays.asList(tabletId);params.opaqued_query_plan = queryPlanResult.getOpaquedQueryPlan();// max row number of one read batchparams.setBatchSize(50000);params.setQueryTimeout(3600);params.setMemLimit(2147483648L);params.setUser(username);params.setPasswd(password);//open scannerTScanOpenResult tScanOpenResult = client.openScanner(params);if (!TStatusCode.OK.equals(tScanOpenResult.getStatus().getStatusCode())) {throw new RuntimeException(String.format("The status of open scanner result from %s is '%s', error message is: %s.",routingsBackend, tScanOpenResult.getStatus().getStatusCode(), tScanOpenResult.getStatus().getErrorMsgs()));}List<TScanColumnDesc> selectedColumns = tScanOpenResult.getSelectedColumns();TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();nextBatchParams.setContextId(tScanOpenResult.getContextId());boolean eos = false;//read dataint offset = 0;while(!eos){nextBatchParams.setOffset(offset);TScanBatchResult nextResult = client.getNext(nextBatchParams);if (!TStatusCode.OK.equals(nextResult.getStatus().getStatusCode())) {throw new RuntimeException(String.format("The status of get next result from %s is '%s', error message is: %s.",routingsBackend, nextResult.getStatus().getStatusCode(), nextResult.getStatus().getErrorMsgs()));}eos = nextResult.isEos();if(!eos){RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);ArrowStreamReader arrowStreamReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator);VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();List<List<Object>> results = new ArrayList<>();while (arrowStreamReader.loadNextBatch()) {List<FieldVector>  fieldVectors = root.getFieldVectors();//total data rowsint rowCountInOneBatch = root.getRowCount();for(int row = 0 ; row < rowCountInOneBatch ;row++){List<Object> record = new ArrayList<>();for (int col = 0; col < fieldVectors.size(); col++) {FieldVector fieldVector = fieldVectors.get(col);Types.MinorType minorType = fieldVector.getMinorType();Object v = DorisUtil.convertValue(row , minorType, fieldVector);record.add(v);}results.add(record);}offset += root.getRowCount();}log.info("total data rows:{}",results.size());//处理完之后要关闭,否则容易内存溢出arrowStreamReader.close();}}//closeTScanCloseParams closeParams = new TScanCloseParams();closeParams.setContextId(tScanOpenResult.getContextId());client.closeScanner(closeParams);if ((transport != null) && transport.isOpen()) {transport.close();}}}}public static String basicAuthHeader(String username, String password) {final String tobeEncode = username + ":" + password;byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));return "Basic " + new String(encoded);}/*** 获取查询计划* @param username* @param password* @param queryPlanUrl* @param sql* @return* @throws Exception*/public static QueryPlanResult getQueryPlan(String username, String password, String queryPlanUrl, String sql){try (CloseableHttpClient client = HttpClients.custom().build()) {HttpPost post = new HttpPost(queryPlanUrl);post.setHeader(HttpHeaders.EXPECT, "100-continue");post.setHeader(HttpHeaders.AUTHORIZATION,  basicAuthHeader(username,password));log.info("queryPlanUrl:{}",queryPlanUrl);log.info("sql:{}",sql);//The param is specific SQL, and the query plan is returnedMap<String,String> params = new HashMap<>();params.put("sql",sql);StringEntity entity = new StringEntity(JSON.toJSONString(params));post.setEntity(entity);try (CloseableHttpResponse response = client.execute(post)) {if (response.getEntity() != null ) {JSONObject queryPlanJSONObject = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));JSONObject dataJSONObject = queryPlanJSONObject.getJSONObject("data");if (dataJSONObject.containsKey("exception")){throw new RuntimeException(dataJSONObject.getString("exception"));}String queryPlan = dataJSONObject.getString("opaqued_query_plan");JSONObject partitions = dataJSONObject.getJSONObject("partitions");return new QueryPlanResult(queryPlan,partitions);}}}catch (Exception e){throw new RuntimeException(e);}return null;}}

4.Spark Doris Connector

        Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。

val dorisSparkDF = spark.read.format("doris").option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME").option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT").option("user", "$YOUR_DORIS_USERNAME").option("password", "$YOUR_DORIS_PASSWORD").load()dorisSparkDF.show(5)

5.Flink Doris Connector

        Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。本文档介绍Flink如何通过Datastream和SQL操作Doris。

DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();

相关文章:

Doris:读取Doris数据的N种方法

目录 1.MySQL Client 2.JDBC 3. 查询计划 4.Spark Doris Connector 5.Flink Doris Connector 1.MySQL Client Doris 采用 MySQL 协议&#xff0c;高度兼容 MySQL 语法&#xff0c;支持标准 SQL&#xff0c;用户可以通过各类客户端工具来访问 Doris。登录到doris服务器后&a…...

ceph-deploy bclinux aarch64 ceph 14.2.10

ssh-copy-id&#xff0c;部署机免密登录其他三台主机 所有机器硬盘配置参考如下&#xff0c;计划采用vdb作为ceph数据盘 下载ceph-deploy pip install ceph-deploy 免密登录设置主机名 hostnamectl --static set-hostname ceph-0 .. 3 配置hosts 172.17.163.105 ceph-0 172.…...

爬虫项目(13):使用lxml抓取相亲信息

文章目录 书籍推荐完整代码效果书籍推荐 如果你对Python网络爬虫感兴趣,强烈推荐你阅读《Python网络爬虫入门到实战》。这本书详细介绍了Python网络爬虫的基础知识和高级技巧,是每位爬虫开发者的必读之作。详细介绍见👉: 《Python网络爬虫入门到实战》 书籍介绍 完整代码…...

mysql-数据库三大范式是什么、mysql有哪些索引类型,分别有什么作用 、 事务的特性和隔离级别

1. 数据库三大范式是什么&#xff1f; 数据库三大范式是设计关系型数据库时的规范化原则&#xff0c;确保数据库结构的合理性和减少数据冗余。 这三大范式分别是&#xff1a; - **第一范式&#xff08;1NF&#xff09;&#xff1a;** 数据表中的所有列都是不可分割的原子数据项…...

微信小程序案例3-2 计算器

文章目录 一、运行效果二、知识储备&#xff08;一&#xff09;data-*自定义属性&#xff08;二&#xff09;模块 三、实现步骤&#xff08;一&#xff09;准备工作1、创建项目2、设置导航栏 &#xff08;二&#xff09;实现页面结构1、编写页面整体结构2、编写结果区域的结构3…...

QT QSplitter

分裂器QSplitter类提供了一个分裂器部件。和QBoxLayout类似&#xff0c;可以完成布局管理器的功能,但是包含在它里面的部件,默认是可以随着分裂器的大小变化而变化的。 比如一个按钮放在布局管理器中,它的垂直方向默认是不会被拉伸的,但是放到分裂器中就可以被拉伸。还有一点不…...

银行支付凭证截图生成器在线,工商邮政农业招商建设,画板+透明标签+图片框

用易语言设计了一个非常牛X的截图生成器&#xff0c;娱乐使用哈&#xff0c;软件我在这里也不会分享&#xff0c;模版网上找的&#xff0c;百度图库搜到的&#xff0c;上面的LOGO用的是一个在线生成器&#xff0c;然后标签用的黑月透明标签&#xff0c;加一个通用对话框读取图片…...

微服务概述

微服务架构是一种软件设计和开发范式&#xff0c;旨在将大型应用程序分解为一组小而独立的服务单元&#xff0c;这些单元可以独立开发、测试、部署和扩展。每个服务都专注于一个明确定义的业务功能&#xff0c;并通过轻量级的通信机制进行交互。以下是微服务架构的一些关键方面…...

LabVIEW中NIPackageManager功能介绍

LabVIEW中PackageManager功能介绍 使用NIPackage Manager可安装、更新、修复和删除NI软件。 安装NI软件 使用PackageManager浏览和安装NI软件。 1. 在浏览产品选项卡上&#xff0c;单击产品类别以显示该类别中的可用产品。 2. 选择要安装的产品&#xff0c;然后单击…...

【C语言】sem_getvalue

sem_getvalue 是 POSIX 线程库中用于获取信号量当前值的一个函数。信号量&#xff08;Semaphore&#xff09;是用于编程中的同步工具&#xff0c;用于管理多个线程或进程对共享资源的并发访问。通常用于限制可以同时访问共享资源的线程数量。函数 sem_getvalue 的声明通常出现在…...

Linux的shell的$# | fi | 说明

$# | fi | 说明 在Linux的Shell脚本中&#xff0c;$# 是一个特殊变量&#xff0c;表示传递给脚本的参数个数。 例如&#xff0c;如果你运行一个脚本并传递了三个参数&#xff0c;那么在脚本内部使用 $# 将会得到 3。这对于确定脚本在执行时接收到了多少个参数是非常有用的。以…...

C //例 7.12 用选择法对数组中10个整数按由小到大排序。

C程序设计 &#xff08;第四版&#xff09; 谭浩强 例 7.12 例 7.12 用选择法对数组中10个整数按由小到大排序。 IDE工具&#xff1a;VS2010 Note: 使用不同的IDE工具可能有部分差异。 代码块 方法&#xff1a;使用指针、动态分配内存 #include <stdio.h> #include …...

Spring Bean循环依赖问题及解决

什么是循环依赖 类与类之间的依赖关系形成了闭环&#xff0c;就会导致循环依赖问题的产生。举例来说&#xff0c;假设存在两个服务类A和服务类B&#xff0c;如果A通过依赖注入的方式引用了B&#xff0c;且B通过依赖注入的方式引用了A&#xff0c;那么A和B之间就存在循环依赖。…...

Golang源码分析 | 程序引导过程

环境说明 CentOS Linux release 7.2 (Final&#xff09; go version go1.16.3 linux/amd64 GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-80.el7使用gdb查看程序入口 编写一个简单的go程序 // main.go package mainfunc main() {print("Hello world") } 编译go …...

第三章:人工智能深度学习教程-基础神经网络(第四节-从头开始的具有前向和反向传播的深度神经网络 – Python)

本文旨在从头开始实现深度神经网络。我们将实现一个深度神经网络&#xff0c;其中包含一个具有四个单元的隐藏层和一个输出层。实施将从头开始&#xff0c;并实施以下步骤。算法&#xff1a; 1. 可视化输入数据 2. 确定权重和偏置矩阵的形状 3. 初始化矩阵、要使用的函数 4. 前…...

【入门Flink】- 08Flink时间语义和窗口概念

Flink-Windows 是将无限数据切割成有限的“数据块”进行处理&#xff0c;这就是所谓的“窗口”&#xff08;Window&#xff09;。 注意&#xff1a;Flink 中窗口并不是静态准备好的&#xff0c;而是动态创建——当有落在这个窗口区间范围的数据达到时&#xff0c;才创建对应的窗…...

【 OpenGauss源码学习 —— 列存储(CStore)(六)】

列存储&#xff08;CStore&#xff09;&#xff08;六&#xff09; 概述CStore::GetCUDataFromRemote 函数CStore::CheckConsistenceOfCUDescCtl 函数CStore::CheckConsistenceOfCUDesc 函数CStore::CheckConsistenceOfCUData 函数额外补充 声明&#xff1a;本文的部分内容参考…...

MUYUCMS v2.1:一款开源、轻量级的内容管理系统基于Thinkphp开发

MuYuCMS&#xff1a;一款基于Thinkphp开发的轻量级开源内容管理系统&#xff0c;为企业、个人站长提供快速建站解决方案。它具有以下的环境要求&#xff1a; 支持系统&#xff1a;Windows/Linux/Mac WEB服务器&#xff1a;Apache/Nginx/ISS PHP版本&#xff1a;php > 5.6 (…...

SDL2 显示文字

1.简介 SDL本身没有显示文字功能&#xff0c;它需要用扩展库SDL_ttf来显示文字。ttf是True Type Font的缩写&#xff0c;ttf是Windows下的缺省字体&#xff0c;它有美观&#xff0c;放大缩小不变形的优点&#xff0c;因此广泛应用很多场合。 使用ttf库的第一件事要从Windows的…...

c++ future 使用详解

c future 使用详解 std::future 头文件 #include <future>。 类模板&#xff0c;定义如下&#xff1a; template<class T> class future; template<class T> class future<T&>; template<> class future<void>;作用&#xff…...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档&#xff1a;Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后&#xff0c;会在本地和远程创建数据库&#xff1a; npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库&#xff1a; 现在&#xff0c;您的Cloudfla…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

QT: `long long` 类型转换为 `QString` 2025.6.5

在 Qt 中&#xff0c;将 long long 类型转换为 QString 可以通过以下两种常用方法实现&#xff1a; 方法 1&#xff1a;使用 QString::number() 直接调用 QString 的静态方法 number()&#xff0c;将数值转换为字符串&#xff1a; long long value 1234567890123456789LL; …...

SQL慢可能是触发了ring buffer

简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...

uniapp 实现腾讯云IM群文件上传下载功能

UniApp 集成腾讯云IM实现群文件上传下载功能全攻略 一、功能背景与技术选型 在团队协作场景中&#xff0c;群文件共享是核心需求之一。本文将介绍如何基于腾讯云IMCOS&#xff0c;在uniapp中实现&#xff1a; 群内文件上传/下载文件元数据管理下载进度追踪跨平台文件预览 二…...

SQL Server 触发器调用存储过程实现发送 HTTP 请求

文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...

用鸿蒙HarmonyOS5实现中国象棋小游戏的过程

下面是一个基于鸿蒙OS (HarmonyOS) 的中国象棋小游戏的实现代码。这个实现使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chinesechess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├──…...

Python竞赛环境搭建全攻略

Python环境搭建竞赛技术文章大纲 竞赛背景与意义 竞赛的目的与价值Python在竞赛中的应用场景环境搭建对竞赛效率的影响 竞赛环境需求分析 常见竞赛类型&#xff08;算法、数据分析、机器学习等&#xff09;不同竞赛对Python版本及库的要求硬件与操作系统的兼容性问题 Pyth…...

基于开源AI智能名片链动2 + 1模式S2B2C商城小程序的沉浸式体验营销研究

摘要&#xff1a;在消费市场竞争日益激烈的当下&#xff0c;传统体验营销方式存在诸多局限。本文聚焦开源AI智能名片链动2 1模式S2B2C商城小程序&#xff0c;探讨其在沉浸式体验营销中的应用。通过对比传统品鉴、工厂参观等初级体验方式&#xff0c;分析沉浸式体验的优势与价值…...