ETL系列-数据抽取(Extract)
ETL的过程
1、数据抽取:确定数据源,定义数据接口,选择数据抽取方法(主动抽取或由源系统推送)。
2、数据清洗:处理不完整数据、错误数据、重复数据等,确保数据的准确性和一致性。(是数据转换的一部分)
3、数据转换:进行空值处理、数据标准统一、数据拆分、数据验证、数据替换和数据关联等操作。
4、规则检查:根据业务需求进行数据质量和业务规则的校验。
5、数据加载:将数据缓冲区的数据加载到目标数据库或数据仓库中,可能是全量加载或增量加载。
1、数据抽取(Extract)
选择抽取策略
- 全量抽取
- 特点:一次性抽取所有数据,适合数据量较小或首次抽取的场景。
- 实现方式:直接查询整个表或读取整个文件。
- 增量抽取
- 特点:仅抽取发生变化的数据,适合数据量较大且需要频繁更新的场景。
- 常用技术:
- 时间戳:通过记录最后更新时间来抽取新增或修改的数据。
- CDC(Change Data Capture):通过数据库日志或触发器捕获数据变化
数据抽取
-
数据库抽取
- mysql、oracle等
-
文件抽取
- 读取文件:使用文件读取库(如csv)。
-
API 抽取
- HTTP 请求:使用 HTTP 客户端库(如
requests)发送请求,获取 API 返回的数据。
- HTTP 请求:使用 HTTP 客户端库(如
-
消息队列抽取
- 订阅消息:使用消息队列客户端(如 Kafka Consumer)订阅消息并获取数据。
抽取开源工具
-
Apache NiFi
- 特点:提供可视化界面,支持实时和批量数据抽取,内置多种数据源连接器(如数据库、API、文件系统等)。
- 适用场景:适合需要实时数据流处理的场景,支持复杂的数据路由和转换逻辑。
-
Apache Kafka
- 特点:分布式流处理平台,支持高吞吐量的实时数据抽取和传输。
- 适用场景:适合需要实时数据流处理的场景,常与 Spark Streaming 或 Flink 结合使用。
- Talend Open Studio
- 特点:提供图形化界面,支持多种数据源(如数据库、文件、API)的抽取,支持代码生成。
- 适用场景:适合中小型项目,支持快速开发和部署。
- Apache Sqoop
- 特点:专门用于在 Hadoop 和关系型数据库之间传输数据,支持增量抽取。
- 适用场景:适合大数据场景,尤其是 Hadoop 生态系统的数据抽取。
- Logstash
- 特点:主要用于日志数据的抽取和传输,支持多种输入和输出插件。
- 适用场景:适合日志数据的实时抽取和处理。
数据抽取例子
一、数据抽取的场景
假设我们需要从以下三个数据源中抽取数据:
- MySQL 数据库:抽取用户表(
users)中的数据。 - CSV 文件:抽取一个包含订单信息的文件(
orders.csv)。 - API:从一个公开的 API 中抽取天气数据。
二、数据抽取的实现
1. 从 MySQL 数据库抽取数据
- 工具:Python +
pymysql或SQLAlchemy。 - 步骤:
- 连接数据库。
- 执行 SQL 查询。
- 将查询结果保存到 DataFrame 或文件中。
import pandas as pd
from sqlalchemy import create_engine# 数据库连接配置
db_config = {'host': 'localhost','user': 'root','password': 'password','database': 'test_db'
}# 创建数据库连接
engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")# 执行 SQL 查询
query = "SELECT * FROM users" # 假设 users 表包含 id, name, age 字段
df_users = pd.read_sql(query, engine)# 输出结果
print("从 MySQL 抽取的用户数据:")
print(df_users)
2. 从 CSV 文件抽取数据
- 工具:Python +
pandas。 - 步骤:
- 读取 CSV 文件。
- 将数据加载到 DataFrame 中。
import pandas as pd# 读取 CSV 文件
df_orders = pd.read_csv('orders.csv') # 假设 orders.csv 包含 order_id, user_id, amount 字段# 输出结果
print("从 CSV 文件抽取的订单数据:")
print(df_orders)
3. 从 API 抽取数据
- 工具:Python +
requests。 - 步骤:
- 发送 HTTP 请求到 API。
- 解析返回的 JSON 数据。
- 将数据保存到 DataFrame 中。
import requests
import pandas as pd# API 配置
api_url = "https://api.weatherapi.com/v1/current.json"
api_key = "your_api_key" # 替换为你的 API Key
params = {'key': api_key,'q': 'Beijing' # 查询北京的天气
}# 发送 HTTP 请求
response = requests.get(api_url, params=params)
data = response.json() # 解析 JSON 数据# 将数据保存到 DataFrame
weather_data = {'location': data['location']['name'],'temperature': data['current']['temp_c'],'condition': data['current']['condition']['text']
}
df_weather = pd.DataFrame([weather_data])# 输出结果
print("从 API 抽取的天气数据:")
print(df_weather)
三、完整代码示例
以下是整合了上述三种数据抽取方式的完整代码:
import pandas as pd
from sqlalchemy import create_engine
import requests# 1. 从 MySQL 数据库抽取数据
def extract_from_mysql():# 数据库连接配置db_config = {'host': 'localhost','user': 'root','password': 'password','database': 'test_db'}# 创建数据库连接engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")# 执行 SQL 查询query = "SELECT * FROM users"df_users = pd.read_sql(query, engine)return df_users# 2. 从 CSV 文件抽取数据
def extract_from_csv():df_orders = pd.read_csv('orders.csv')return df_orders# 3. 从 API 抽取数据
def extract_from_api():api_url = "https://api.weatherapi.com/v1/current.json"api_key = "your_api_key" # 替换为你的 API Keyparams = {'key': api_key,'q': 'Beijing'}response = requests.get(api_url, params=params)data = response.json()weather_data = {'location': data['location']['name'],'temperature': data['current']['temp_c'],'condition': data['current']['condition']['text']}df_weather = pd.DataFrame([weather_data])return df_weather# 主函数
if __name__ == "__main__":# 抽取数据df_users = extract_from_mysql()df_orders = extract_from_csv()df_weather = extract_from_api()# 输出结果print("从 MySQL 抽取的用户数据:")print(df_users)print("\n从 CSV 文件抽取的订单数据:")print(df_orders)print("\n从 API 抽取的天气数据:")print(df_weather)
四、运行结果
假设数据如下:
- MySQL 用户表:
| id | name | age |
|---|---|---|
| 1 | Alice | 25 |
| 2 | Bob | 30 |
- CSV 订单文件:
| order_id | user_id | amount |
|---|---|---|
| 101 | 1 | 100.0 |
| 102 | 2 | 200.0 |
- API 天气数据:
| location | temperature | condition |
|---|---|---|
| Beijing | 20.0 | Sunny |
运行代码后,输出如下:
从 MySQL 抽取的用户数据:id name age
0 1 Alice 25
1 2 Bob 30从 CSV 文件抽取的订单数据:order_id user_id amount
0 101 1 100.0
1 102 2 200.0从 API 抽取的天气数据:location temperature condition
0 Beijing 20.0 Sunny
五、总结
数据抽取是 ETL 流程的第一步,通常涉及从多种数据源(如数据库、文件、API)中提取数据。通过 Python 和相关库(如 pandas、SQLAlchemy、requests),可以轻松实现数据抽取任务。你可以根据实际需求扩展这个例子,比如支持增量抽取、处理异常情况等。希望这个例子对你有帮助!
原文地址:码农小站
公众号:码农小站
相关文章:
ETL系列-数据抽取(Extract)
ETL的过程 1、数据抽取:确定数据源,定义数据接口,选择数据抽取方法(主动抽取或由源系统推送)。 2、数据清洗:处理不完整数据、错误数据、重复数据等,确保数据的准确性和一致性。(是…...
java八股文之框架
1.Spring框架中的Bean是否线程安全的 Spring框架中的Bean默认是单例的,不是线程安全的。因为一般在Spring的bean的中都是注入无状态的对象,没有线程安全问题,如果在bean中定义了可修改的成员变量,是要考虑线程安全问题的…...
【大模型】Ubuntu下 fastgpt 的部署和使用
前言 本次安装的版本为 fastgpt:v4.8.8-fix2。 最新版本fastgpt:v4.8.20-fix2 问答时报错,本着跑通先使用起来,就没有死磕下去,后面bug解了再进行记录。 github连接:https://github.com/labring/FastGPT fastgpt 安装说明&…...
小程序中头像昵称填写
官方文档 参考小程序用户头像昵称获取规则调整公告 新的小程序版本不能通过wx.getUserProfile和wx.getUserInfo获取用户信息 <van-field label"{{Avatar}}" label-class"field-label" right-icon-class"field-right-icon-class"input-class&…...
卷积神经网络(cnn,类似lenet-1,八)
我们第一层用卷积核,前面已经成功,现在我们用两层卷积核: 结构如下,是不是很想lenet-1,其实我们24年就实现了sigmoid版本的: cnn突破九(我们的五层卷积核bpnet网络就是lenet-1)-CS…...
【NLP 27、文本分类任务 —— 传统机器学习算法】
不要抓着枯叶哭泣,你要等待初春的新芽 —— 25.1.23 一、文本分类任务 定义:预先设定好一个文本类别集合,对于一篇文本,预测其所属的类别 例如: 情感分析: 这家饭店太难吃了 —> 正类 …...
Go红队开发—并发编程
文章目录 并发编程go协程chan通道无缓冲通道有缓冲通道创建⽆缓冲和缓冲通道 等协程sync.WaitGroup同步Runtime包Gosched()Goexit() 区别 同步变量sync.Mutex互斥锁atomic原子变量 SelectTicker定时器控制并发数量核心机制 并发编程阶段练习重要的细节端口扫描股票监控 并发编程…...
Oracle 导出所有表索引的创建语句
在Oracle数据库中,导出所有表的索引创建语句通常涉及到使用数据字典视图来查询索引的定义,然后生成对应的SQL语句。你可以通过查询DBA_INDEXES或USER_INDEXES视图(取决于你的权限和需求)来获取这些信息。 使用DBA_INDEXES视图 如…...
使用Docker方式一键部署MySQL和Redis数据库详解
一、前言 数据库是现代应用开发中不可或缺的一部分,MySQL和Redis作为两种广泛使用的数据库系统,分别用于关系型数据库和键值存储。本文旨在通过Docker和Docker Compose的方式,提供一个简洁明了的一键部署方案,确保数据库服务的稳…...
2020年蓝桥杯Java B组第二场题目+部分个人解析
#A:门牌制作 624 解一: public static void main(String[] args) {int count0;for(int i1;i<2020;i) {int ni;while(n>0) {if(n%102) {count;}n/10;}}System.out.println(count);} 解二: public static void main(String[] args) {…...
[深度学习] 大模型学习2-提示词工程指北
在文章大语言模型基础知识里,提示词工程(Prompt Engineering)作为大语言模型(Large Language Model,LLM)应用构建的一种方式被简要提及,本文将着重对该技术进行介绍。 提示词工程就是在和LLM聊…...
FPGA之硬件设计笔记-持续更新中
目录 1、说在前面2、FPGA硬件设计总计说明3、 原理图详解 - ARITX - 7 系列3.1 顶层框图介绍3.2 FPGA 电源sheet介绍:3.2.1 bank 14 和 bank 15的供电3.2.2 bank 0的供电3.2.3 Bank34 35 的供电 3.3 核电压和RAM电压以及辅助电压 4 原理图详解-- Ultrascale ARTIX4.…...
vue cli 与 vite的区别
1、现在我们一般会用vite来构建vue3的项目。 2、之前一开始的时候,我们会用vue cli的vue create来构建项目。 3、它们之间有什么区别呢? 1. 设计理念 Vue CLI: 是 Vue.js 官方提供的命令行工具,主要用于快速搭建 Vue 项目。 提…...
怎么在本地环境安装yarn包
一、安装Yarn的前置条件 安装Node.js和npm Yarn依赖于Node.js环境,需先安装Node.js官网的最新稳定版(建议≥16.13.0)。安装时勾选“Add to PATH”以自动配置环境变量。 二、安装Yarn的多种方式 1. 通过npm全局安装(通用…...
【大模型】AI 辅助编程操作实战使用详解
目录 一、前言 二、AI 编程介绍 2.1 AI 编程是什么 2.1.1 为什么需要AI辅助编程 2.2 AI 编程主要特点 2.3 AI编程底层核心技术 2.4 AI 编程核心应用场景 三、AI 代码辅助编程解决方案 3.1 AI 大模型平台 3.1.1 AI大模型平台代码生成优缺点 3.2 AI 编码插件 3.3 AI 编…...
react18自定义hook实现
概念:自定义 hook 是一种将组件逻辑提取到可复用函数中的方式,它允许你在多个组件中共享相同的状态和行为。自定义 hook 的本质上是一个普通的 JavaScript 函数,它可以使用 React 内部的 hook(如 useState、useEffect、useContext…...
一周学会Flask3 Python Web开发-Jinja2模板过滤器使用
锋哥原创的Flask3 Python Web开发 Flask3视频教程: 2025版 Flask3 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 在Jinja2中,过滤器(filter)是一些可以用来修改和过滤变量值的特殊函数,过滤器和变量用一个竖线 | &a…...
使用PDFMiner.six解析PDF数据
PDF(可移植文档格式)文件是由Adobe创建的一种灵活的文件格式,它允许文档在不同的软件、硬件和操作系统中一致地显示。每个PDF文件都包含对固定布局文档的全面描述,包括文本、字体、图形和其他必要的显示元素。pdf通常用于文档共享…...
本地svn
参考补充:https://blog.csdn.net/hhl_work/article/details/107832414 先在D:\coding_cangku下新建空文件夹,例:code1【类似gitee线上仓库】点击进入code1,右键选择TortoiseSVN,再下一级菜单下点击Create repository …...
金融支付行业技术侧重点
1. 合规问题 第三方支付系统的平稳运营,严格遵循《非银行支付机构监督管理条例》的各项条款是基础与前提,其中第十八条的规定堪称重中之重,是支付机构必须牢牢把握的关键准则。 第十八条明确指出,非银行支付机构需构建起必要且独…...
三年级下册语文第八单元作文:这样想象真有趣
《这样想象真有趣》是三年级非常经典的“童话想象作文”。重点是:✅ 想象大胆 ✅ 故事情节有趣 ✅ 人物会说话、会行动 ✅ 最后最好有一点启发我用夸克网盘分享了「三年级下册语文作文」,1-8单元。链接:https://pan.quark.cn/s/a80b7ca7f993这…...
C#调用C++ DLL报错‘找不到指定的模块’根因与精准排查指南
1. 这个报错不是“找不到文件”,而是“找不到依赖”——C#调用C DLL时最典型的认知陷阱 “无法加载 DLL ‘xxx.dll’: 找不到指定的模块”——这行红色错误信息,几乎每个在Windows平台做混合编程的C#开发者都见过。它第一次出现时,很多人会本…...
客制化键盘党必看:在Ubuntu 22.04上让F1-F12键失灵的HS75T/珂芝K75恢复正常(附一键脚本)
客制化键盘在Ubuntu下的F键修复指南:从原理到一键解决方案作为一名长期使用客制化机械键盘的Linux开发者,我深知那种当F5调试键突然失灵时的崩溃感。特别是当你刚入手一款颜值与手感俱佳的HS75T或珂芝K75,却发现在Ubuntu下连接蓝牙或2.4G接收…...
Linuxptp从入门到排查:一份覆盖安装、配置与常见报错解决的保姆级指南
Linuxptp从入门到排查:一份覆盖安装、配置与常见报错解决的保姆级指南当你在数据中心里部署高精度时间同步服务时,突然发现日志里不断跳出master offset超限警告;或者当你按照教程配置完ptp4l后,时钟状态始终卡在s0无法锁定——这…...
MacType 2025:3大突破性改进让Windows字体渲染焕然一新
MacType 2025:3大突破性改进让Windows字体渲染焕然一新 【免费下载链接】mactype Better font rendering for Windows. 项目地址: https://gitcode.com/gh_mirrors/ma/mactype 还在为Windows系统下字体模糊、边缘粗糙而烦恼吗?MacType 2025版本带…...
ICA与NMF算法详解:从盲源分离到矩阵分解的数学原理与工程实践
1. 项目概述:从数据噪音中“听”出独立的声音在信号处理、神经科学、金融数据分析等领域,我们常常会遇到一个经典的“鸡尾酒会问题”:在一个嘈杂的房间里,多个声源(比如不同人的谈话、背景音乐)的声音混合在…...
3分钟彻底清理Windows右键菜单!ContextMenuManager让你的效率提升200%
3分钟彻底清理Windows右键菜单!ContextMenuManager让你的效率提升200% 【免费下载链接】ContextMenuManager 🖱️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 你是不是也遇到过这种情况&…...
5分钟掌握NCM解密:网易云音乐文件转换终极指南
5分钟掌握NCM解密:网易云音乐文件转换终极指南 【免费下载链接】ncmdumpGUI C#版本网易云音乐ncm文件格式转换,Windows图形界面版本 项目地址: https://gitcode.com/gh_mirrors/nc/ncmdumpGUI 你是否曾经遇到过这样的情况:在网易云音…...
微信小程序逆向分析终极指南:快速掌握wxappUnpacker完整实战技巧
微信小程序逆向分析终极指南:快速掌握wxappUnpacker完整实战技巧 【免费下载链接】wxappUnpacker forked from https://github.com/qwerty472123/wxappUnpacker 项目地址: https://gitcode.com/gh_mirrors/wxappu/wxappUnpacker 作为一名微信小程序开发者&am…...
英雄联盟智能助手Seraphine:从青铜到王者的游戏效率革命 [特殊字符]
英雄联盟智能助手Seraphine:从青铜到王者的游戏效率革命 🎮 【免费下载链接】Seraphine 英雄联盟战绩查询工具 项目地址: https://gitcode.com/gh_mirrors/se/Seraphine 还在为错过排位对局而懊恼吗?还在BP阶段手忙脚乱查询对手战绩吗…...
