当前位置: 首页 > article >正文

利用DeepSeek编写能在DuckDB中读PostgreSQL表的表函数

前文实现了UDF和UDAF,还有一类函数是表函数,它放在From 子句中,返回一个集合。DuckDB中已有PostgreSQL插件,但我们可以用pqxx库实现一个简易的只读read_pg()表函数。
提示词如下:

请将libpqxx库集成到我们的程序,使它能对postgresql数据库操作,并把数据与duckdb打通,比如能在一个sql中访问pg和duckdb中的表的关联结果,先做一个简单的表函数read_pg(db,table)返回一个表,可以执行select * from read_pg(db,table);select * from read_pg(db,table1) a,read_pg(db,table2) b where a.x=b.x;其中db是postgresql连接字符串,比如postgresql://user:secret@localhost/mydb,table是一个varchar

DeepSeek编写的代码一开始总是无限重复调用Function函数,后来经过添加

    // 执行已完成的处理if (state.execution_finished) {output.SetCardinality(0);return;}

后能正确输出postgresql某个表的数据了,加where 条件也能查出。另一个插曲是,DeepSeek编写的表函数注册代码总是不对,我从网上找了一个例子,虽然我的程序不是插件,却能调用ExtensionUtil::RegisterFunction来注册。

源代码如下

#include <pqxx/pqxx>
#include <memory>
#include <unordered_map>
#include <duckdb.hpp>#include "duckdb/function/function_set.hpp"
#include "duckdb/parser/parsed_data/create_aggregate_function_info.hpp"class PGConnectionPool {
private:std::unordered_map<std::string, std::shared_ptr<pqxx::connection>> connections;public:pqxx::connection& getConnection(const std::string& conn_str) {auto it = connections.find(conn_str);if (it == connections.end()) {auto conn = std::make_shared<pqxx::connection>(conn_str);connections[conn_str] = conn;return *conn;}return *(it->second);}
};static PGConnectionPool pg_pool;struct PGTableFunctionData : public duckdb::TableFunctionData {std::string conn_str;std::string table_name;duckdb::vector<duckdb::LogicalType> return_types;duckdb::vector<std::string> return_names;PGTableFunctionData(std::string conn_str, std::string table_name, duckdb::vector<duckdb::LogicalType> return_types,duckdb::vector<std::string> return_names): conn_str(std::move(conn_str)), table_name(std::move(table_name)),return_types(std::move(return_types)), return_names(std::move(return_names)) {}duckdb::unique_ptr<duckdb::FunctionData> Copy() const override {return duckdb::make_uniq<PGTableFunctionData>(conn_str, table_name, return_types, return_names);}bool Equals(const duckdb::FunctionData &other) const override {auto &other_data = other.Cast<PGTableFunctionData>();return conn_str == other_data.conn_str && table_name == other_data.table_name;}
};struct PGGlobalState : public duckdb::GlobalTableFunctionState {pqxx::connection* conn = nullptr;std::unique_ptr<pqxx::work> txn;pqxx::result result;pqxx::result::const_iterator it;bool initialized = false;bool execution_finished = false;
};struct PGTableFunction {static duckdb::TableFunction GetFunction() {return duckdb::TableFunction("read_pg",{duckdb::LogicalType::VARCHAR, duckdb::LogicalType::VARCHAR},Function,Bind,InitGlobal);}static duckdb::unique_ptr<duckdb::GlobalTableFunctionState> InitGlobal(duckdb::ClientContext &context,duckdb::TableFunctionInitInput &input) {return duckdb::make_uniq<PGGlobalState>();}static duckdb::unique_ptr<duckdb::FunctionData> Bind(duckdb::ClientContext &context,duckdb::TableFunctionBindInput &input,duckdb::vector<duckdb::LogicalType> &return_types,duckdb::vector<std::string> &return_names) {auto conn_str = input.inputs[0].GetValue<std::string>();auto table_name = input.inputs[1].GetValue<std::string>();try {auto& conn = pg_pool.getConnection(conn_str);pqxx::work txn(conn);auto r = txn.exec("SELECT column_name, data_type FROM information_schema.columns ""WHERE table_name = " + txn.quote(table_name) + " ORDER BY ordinal_position");for (const auto& row : r) {return_names.push_back(row[0].as<std::string>());std::string pg_type = row[1].as<std::string>();if (pg_type == "integer" || pg_type == "bigint") {return_types.push_back(duckdb::LogicalType::BIGINT);} else if (pg_type == "text" || pg_type == "varchar") {return_types.push_back(duckdb::LogicalType::VARCHAR);} else if (pg_type == "double precision") {return_types.push_back(duckdb::LogicalType::DOUBLE);} else if (pg_type == "boolean") {return_types.push_back(duckdb::LogicalType::BOOLEAN);} else {return_types.push_back(duckdb::LogicalType::VARCHAR);}}return duckdb::make_uniq<PGTableFunctionData>(conn_str, table_name, return_types, return_names);} catch (const std::exception& e) {throw std::runtime_error("PostgreSQL error: " + std::string(e.what()));}}static void Function(duckdb::ClientContext &context,duckdb::TableFunctionInput &data,duckdb::DataChunk &output) {auto &bind_data = data.bind_data->Cast<PGTableFunctionData>();auto &state = data.global_state->Cast<PGGlobalState>();if (state.execution_finished) {output.SetCardinality(0);return;}try {if (!state.initialized) {state.conn = &pg_pool.getConnection(bind_data.conn_str);state.txn = std::make_unique<pqxx::work>(*state.conn);state.result = state.txn->exec("SELECT * FROM " + state.txn->quote_name(bind_data.table_name));state.it = state.result.begin();state.initialized = true;}idx_t row_count = 0;while (state.it != state.result.end() && row_count < STANDARD_VECTOR_SIZE) {const auto& row = *state.it;for (duckdb::idx_t col = 0; col < static_cast<duckdb::idx_t>(row.size()); col++) {auto field = row[static_cast<pqxx::row::size_type>(col)];if (field.is_null()) {output.data[col].SetValue(row_count, duckdb::Value());} else {std::string value = field.as<std::string>();switch (bind_data.return_types[col].id()) {case duckdb::LogicalTypeId::BIGINT:output.data[col].SetValue(row_count, duckdb::Value::BIGINT(std::stoll(value)));break;case duckdb::LogicalTypeId::DOUBLE:output.data[col].SetValue(row_count, duckdb::Value::DOUBLE(std::stod(value)));break;case duckdb::LogicalTypeId::BOOLEAN:output.data[col].SetValue(row_count, duckdb::Value::BOOLEAN(value == "t" || value == "true"));break;default:output.data[col].SetValue(row_count, duckdb::Value(value));}}}row_count++;++state.it;}output.SetCardinality(row_count);if (state.it == state.result.end()) {state.execution_finished = true;state.txn->commit();}} catch (const std::exception& e) {//if (state.txn && state.txn->is_open()) state.txn->abort();throw std::runtime_error("PostgreSQL error: " + std::string(e.what()));}}
};

测试代码如下

#include "duckdb.hpp"
#include "readpg5.cpp"
#include <iostream>
#include "duckdb/main/extension_util.hpp"using namespace duckdb;
using namespace std;int main() {DuckDB db(nullptr);Connection con(db);try {DatabaseInstance& db_instance = *db.instance;ExtensionUtil::RegisterFunction(db_instance, PGTableFunction::GetFunction());} catch (const exception &e) {cerr << "初始化错误: " << e.what() << endl;return 1;}cout << "=== 测试1: 查询PostgreSQL表函数 ===" << endl;auto result = con.Query("SELECT * FROM read_pg('postgresql://postgres@127.0.0.1/postgres', 't2')");if (result->HasError()) {cerr << "查询错误: " << result->GetError() << endl;} else {result->Print();}cout << "\n=== 测试1.1: 带条件的同一个表查询 ===" << endl;result = con.Query("SELECT * FROM read_pg('postgresql://postgres@127.0.0.1/postgres', 't2') WHERE tid = 2");if (result->HasError()) {cerr << "查询错误: " << result->GetError() << endl;} else {result->Print();}cout << "\n=== 测试2: 带条件的查询 ===" << endl;result = con.Query("SELECT * FROM read_pg('postgresql://postgres@127.0.0.1/postgres', 't') WHERE a = 2");if (result->HasError()) {cerr << "查询错误: " << result->GetError() << endl;} else {result->Print();}cout << "\n=== 测试3: 多表查询 ===" << endl;result = con.Query("SELECT tname FROM read_pg('postgresql://postgres@127.0.0.1/postgres', 't2') a, ""read_pg('postgresql://postgres@127.0.0.1/postgres', 't') b WHERE a.tid = b.a");if (result->HasError()) {cerr << "查询错误: " << result->GetError() << endl;} else {result->Print();}cout << "\n=== 测试4: pg和duckdb多表查询 ===" << endl;con.Query("create table duckdb_t as select 2 a union all select 3");result = con.Query("SELECT tname FROM read_pg('postgresql://postgres@127.0.0.1/postgres', 't2') a, ""duckdb_t b WHERE a.tid = b.a");if (result->HasError()) {cerr << "查询错误: " << result->GetError() << endl;} else {result->Print();}cout << "\n=== 测试完成 ===" << endl;return 0;
}

编译命令行

export LIBRARY_PATH=/par/duck/build/src
export LD_LIBRARY_PATH=/par/duck/build/src
g++ -std=c++17 -o readpg2 testpg5.cpp -lduckdb -lpqxx -lpq -I /par/duck/src/include

相关文章:

利用DeepSeek编写能在DuckDB中读PostgreSQL表的表函数

前文实现了UDF和UDAF&#xff0c;还有一类函数是表函数&#xff0c;它放在From 子句中&#xff0c;返回一个集合。DuckDB中已有PostgreSQL插件&#xff0c;但我们可以用pqxx库实现一个简易的只读read_pg()表函数。 提示词如下&#xff1a; 请将libpqxx库集成到我们的程序&#…...

树莓派安装openwrt搭建软路由(ImmortalWrt固件方案)

&#x1f923;&#x1f449;我这里准备了两个版本的openwrt安装方案给大家参考使用&#xff0c;分别是原版的OpenWrt固件以及在原版基础上进行改进的ImmortalWrt固件。推荐使用ImmortalWrt固件&#xff0c;当然如果想直接在原版上进行开发也可以&#xff0c;看个人选择。 &…...

排序算法——详解

排序算法 &#xff08;冒泡、选择、插入、快排、归并、堆排、计数、桶、基数&#xff09; 稳定性 (Stability): 如果排序算法能保证&#xff0c;当待排序序列中存在值相等的元素时&#xff0c;排序后这些元素的相对次序保持不变&#xff0c;那么该算法就是稳定的。 例如&#…...

Go整合Redis2.0发布订阅

Go整合Redis2.0发布订阅 Redis goredis-cli --version redis-cli 5.0.14.1 (git:ec77f72d)Go go get github.com/go-redis/redis/v8package redisimport ("MyKindom-Server-v2.0/com/xzm/core/config/yaml""MyKindom-Server-v2.0/com/xzm/core/config/yaml/po…...

电子电气架构 --- 如何应对未来区域式电子电气(E/E)架构的挑战?

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 做到欲望极简,了解自己的真实欲望,不受外在潮流的影响,不盲从,不跟风。把自己的精力全部用在自己。一是去掉多余,凡事找规律,基础是诚信;二是…...

鸿蒙OS基于UniApp的区块链钱包开发实践:打造支持鸿蒙生态的Web3应用#三方框架 #Uniapp

基于UniApp的区块链钱包开发实践&#xff1a;打造支持鸿蒙生态的Web3应用 前言 最近在带领团队开发一个支持多链的区块链钱包项目时&#xff0c;我们选择了UniApp作为开发框架。这个选择让我们不仅实现了传统移动平台的覆盖&#xff0c;还成功将应用引入了快速发展的鸿蒙生态…...

易学探索助手-个人记录(十二)

近期我完成了古籍处理板块页面升级&#xff0c;补充完成原文、句读、翻译的清空、保存和编辑&#xff08;其中句读仅可修改标点&#xff09;功能&#xff0c;新增原文和句读的繁简体切换功能 一、古籍处理板块整体页面升级 将原来一整个页面呈现的布局改为分栏呈现&#xff0…...

Windows 账号管理与安全指南

Windows 账号管理与安全指南 概述 Windows 账号管理是系统安全的基础&#xff0c;了解如何正确创建、管理和保护用户账户对于系统管理员和安全专业人员至关重要。本文详细介绍 Windows 系统中的账户管理命令、隐藏账户创建方法以及安全防护措施。 基础账户管理命令 net use…...

Python窗体编程技术详解

文章目录 1. Tkinter简介示例代码优势劣势 2. PyQt/PySide简介示例代码(PyQt5)优势劣势 3. wxPython简介示例代码优势劣势 4. Kivy简介示例代码优势劣势 5. PySimpleGUI简介示例代码优势劣势 技术对比总结选择建议 Python提供了多种实现图形用户界面(GUI)编程的技术&#xff0c…...

思维链提示:激发大语言模型推理能力的突破性方法

论文出处&#xff1a; Chain-of-Thought Prompting Elicits Reasoning in Large Language Models 作者&#xff1a; Jason Wei, Xuezhi Wang, Dale Schuurmans, Maarten Bosma, Brian Ichter, Fei Xia, Ed H. Chi, Quoc V. Le, Denny Zhou 机构&#xff1a; Google Research, B…...

NVMe协议简介之AXI总线更新

更新AXI4总线知识 AXI4总线协议 AXI4总线协议是由ARM公司提出的一种片内总线协议 &#xff0c;旨在实现SOC中各模块之间的高效可靠的数据传输和管理。AXI4协议具有高性能、高吞吐量和低延迟等优点&#xff0c;在SOC设计中被广泛应用 。随着时间的推移&#xff0c;AXI4的影响不…...

设计模式——责任链设计模式(行为型)

摘要 责任链设计模式是一种行为型设计模式&#xff0c;旨在将请求的发送者与接收者解耦&#xff0c;通过多个处理器对象按链式结构依次处理请求&#xff0c;直到某个处理器处理为止。它包含抽象处理者、具体处理者和客户端等核心角色。该模式适用于多个对象可能处理请求的场景…...

基于Android的医院陪诊预约系统

博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业六年&#xff0c;熟悉各种主流语言&#xff0c;精通java、python、php、爬虫、web开发&#xff0c;已经做了六年的毕业设计程序开发&#xff0c;开发过上千套毕业设计程序&#xff0c;没有什么华丽的语言&#xff0…...

基于Spring Boot 电商书城平台系统设计与实现(源码+文档+部署讲解)

技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论文…...

【金融基础学习】债券回购方式

债券回购作为货币市场的重要工具&#xff0c;本质上是一种以债券为抵押的短期资金借贷行为。在银行间市场&#xff0c;质押式回购与**买断式回购*是两种主要形式。 1. 质押式回购(Pledged Repo, RP) – 所有权不转移的短期融资工具 1.1 质押式回购概述 质押式回购是交易双方…...

第五十九节:性能优化-GPU加速 (CUDA 模块)

在计算机视觉领域,实时性往往是关键瓶颈。当传统CPU处理高分辨率视频流或复杂算法时,力不从心。本文将深入探索OpenCV的CUDA模块,揭示如何通过GPU并行计算实现数量级的性能飞跃。 一、GPU加速:计算机视觉的必由之路 CPU的强项在于复杂逻辑和低延迟任务,但面对图像处理中高…...

单元测试-概述入门

目录 main方法测试缺点&#xff1a; 在pom.xm中&#xff0c;引入junit的依赖。,在test/java目录下&#xff0c;创建测试类&#xff0c;并编写对应的测试方法&#xff0c;并在方法上声明test注解。 练习&#xff1a;验证身份证合法性 测试成功 测试失败 main方法测试缺点&am…...

⚡ Hyperlane —— 比 Rocket 更快的 Rust Web 框架!

⚡ Hyperlane —— 比 Rocket 更快的 Rust Web 框架&#xff01; 在现代 Web 服务开发中&#xff0c;开发者需要一个既轻量级又高性能的 HTTP 服务器库来简化开发流程&#xff0c;同时确保服务的高效运行。Hyperlane 正是为此而生——一个专为 Rust 开发者设计的 HTTP 服务器库…...

《AI Agent项目开发实战》DeepSeek R1模型蒸馏入门实战

一、模型蒸馏环境部署 注&#xff1a;本次实验仍然采用Ubuntu操作系统&#xff0c;基本配置如下&#xff1a; 需要注意的是&#xff0c;本次公开课以Qwen 1.5-instruct模型为例进行蒸馏&#xff0c;从而能省略冷启动SFT过程&#xff0c;并且 由于Qwen系列模型本身性能较强&…...

Ubuntu 24.04 LTS Chrome 中文输入法(搜狗等)失效?一行命令解决

Ubuntu 24.04 LTS Chrome 中文输入法&#xff08;搜狗等&#xff09;失效&#xff1f;一行命令解决 在 Ubuntu 24.04 LTS 中&#xff0c;如果你发现 Chrome 浏览器用不了搜狗输入法或其他 Fcitx5 中文输入法&#xff0c;可以试试下面的方法。 直接上解决方案&#xff1a; 打…...

字节golang后端二面

前端接口使用restful格式&#xff0c;post与get的区别是什么&#xff1f; HTTP网络返回的状态码有哪些&#xff1f; go语言切片与数组的区别是什么&#xff1f; MySQL实现并发安全避免两个事务同时对一个记录写操作的手段有哪些&#xff1f; 如何实现业务的幂等性&#xff08;在…...

计算机网络物理层基础练习

第二章 物理层 填空题 从通信双方信息交互的方式来看&#xff0c;通信的三种基本方式为单工、半双工和全双工。其中&#xff0c;单工数据传输只支持数据在一个方向上传输&#xff0c;全双工数据传输则允许数据同时在两个方向上传输。最基本的带通调制方法包括三种&#xff1a…...

vscode + cmake + ninja+ gcc 搭建MCU开发环境

vscode cmake ninja gcc 搭建MCU开发环境 文章目录 vscode cmake ninja gcc 搭建MCU开发环境1. 前言2. 工具安装及介绍2.1 gcc2.1.1 gcc 介绍2.1.2 gcc 下载及安装 2.2 ninja2.2.1 ninja 介绍2.2 ninja 安装 2.3 cmake2.3.1 cmake 介绍2.3.2 cmake 安装 2.4 VScode 3. 上手…...

三种经典算法优化无线传感器网络(WSN)覆盖(SSA-WSN、PSO-WSN、GWO-WSN),MATLAB代码实现

三种经典算法优化无线传感器网络(WSN)覆盖&#xff08;SSA-WSN、PSO-WSN、GWO-WSN&#xff09;&#xff0c;MATLAB代码实现 目录 三种经典算法优化无线传感器网络(WSN)覆盖&#xff08;SSA-WSN、PSO-WSN、GWO-WSN&#xff09;&#xff0c;MATLAB代码实现效果一览基本介绍程序设…...

JVM 核心组件深度解析:堆、方法区、执行引擎与本地方法接口

一、JVM 堆内存&#xff1a;对象的生存与消亡之地 作为 Java 虚拟机中最大的内存区域&#xff0c;堆内存是所有对象实例的 “出生地” 与 “安息所”。从程序运行的角度看&#xff0c;所有通过new关键字创建的对象都在堆中分配内存&#xff0c;其生命周期完全由垃圾回收机制&am…...

OpenCV4.4.0下载及初步配置(Win11)

目录 OpenCV4.4.0工具下载安装环境变量系统配置 OpenCV4.4.0 工具 系统&#xff1a;Windows 11 下载 OpenCV全版本百度网盘链接&#xff1a;: https://pan.baidu.com/s/15qTzucC6ela3bErdZ285oA?pwdjxuy 提取码: jxuy找到 opencv-4.0.0-vc14_vc15 下载得到 安装 运行op…...

【iOS(swift)笔记-13】App版本不升级时本地数据库sqlite更新逻辑一

App版本不升级时&#xff0c;又想即时更新本地数据库怎么办&#xff1f; 办法一&#xff1a;直接从服务器下载最新的sqlite数据库替换掉本地的 具体逻辑 1、首先本地数据库里一定要有一个字段&#xff08;名字自己取&#xff09; 比如dbVersion&#xff0c;可用数字&#x…...

Flink CDC将MySQL数据同步到数据湖

此项目可以理解为MySQL数据迁移&#xff0c;由Flink Stream监听MySQL的Binlog日志写入Kafka&#xff0c;在Kafka消费端将消息写入Doris或其他外部对象存储。 涉及的环境与版本 组件版本flink1.20.1flink-cdc3.4.0kafka2.13-4.0.0Dragonwell17 引入相关依赖 <?xml versio…...

使用Mathematica观察多形式根的分布随参数的变化

有两种方式观察多项式的根随着参数变化&#xff1a;&#xff08;1&#xff09;直接制作一个小的动态视频&#xff1b;&#xff08;2&#xff09;绘制所有根形成的痕迹&#xff08;locus&#xff09;。 制作动态视频&#xff1a; (*Arg-plane plotting routine with plotting …...

【C++高级主题】转换与多个基类

目录 一、多重继承的虚函数表结构&#xff1a;每个基类一个虚表 1.1 单继承与多重继承的虚表差异 1.2 代码示例&#xff1a;多重继承的虚函数覆盖 1.3 虚表结构示意图 二、指针与引用的类型转换&#xff1a;地址调整的底层逻辑 2.1 派生类指针转基类指针的地址偏移 2.2 …...