当前位置: 首页 > news >正文

ETL系列-数据抽取(Extract)

ETL的过程

1、数据抽取:确定数据源,定义数据接口,选择数据抽取方法(主动抽取或由源系统推送)。
2、数据清洗:处理不完整数据、错误数据、重复数据等,确保数据的准确性和一致性。(是数据转换的一部分)
3、数据转换:进行空值处理、数据标准统一、数据拆分、数据验证、数据替换和数据关联等操作。
4、规则检查:根据业务需求进行数据质量和业务规则的校验。
5、数据加载:将数据缓冲区的数据加载到目标数据库或数据仓库中,可能是全量加载或增量加载。

1、数据抽取(Extract)

选择抽取策略

  • 全量抽取
    • 特点:一次性抽取所有数据,适合数据量较小或首次抽取的场景。
    • 实现方式:直接查询整个表或读取整个文件。
  • 增量抽取
    • 特点:仅抽取发生变化的数据,适合数据量较大且需要频繁更新的场景。
    • 常用技术
    • 时间戳:通过记录最后更新时间来抽取新增或修改的数据。
    • CDC(Change Data Capture):通过数据库日志或触发器捕获数据变化

数据抽取

  • 数据库抽取

    • mysql、oracle等
  • 文件抽取

    • 读取文件:使用文件读取库(如csv)。
  • API 抽取

    • HTTP 请求:使用 HTTP 客户端库(如 requests)发送请求,获取 API 返回的数据。
  • 消息队列抽取

    • 订阅消息:使用消息队列客户端(如 Kafka Consumer)订阅消息并获取数据。

抽取开源工具

  1. Apache NiFi

    • 特点:提供可视化界面,支持实时和批量数据抽取,内置多种数据源连接器(如数据库、API、文件系统等)。
    • 适用场景:适合需要实时数据流处理的场景,支持复杂的数据路由和转换逻辑。
  2. Apache Kafka

  • 特点:分布式流处理平台,支持高吞吐量的实时数据抽取和传输。
  • 适用场景:适合需要实时数据流处理的场景,常与 Spark Streaming 或 Flink 结合使用。
  1. Talend Open Studio
  • 特点:提供图形化界面,支持多种数据源(如数据库、文件、API)的抽取,支持代码生成。
  • 适用场景:适合中小型项目,支持快速开发和部署。
  1. Apache Sqoop
  • 特点:专门用于在 Hadoop 和关系型数据库之间传输数据,支持增量抽取。
  • 适用场景:适合大数据场景,尤其是 Hadoop 生态系统的数据抽取。
  1. Logstash
  • 特点:主要用于日志数据的抽取和传输,支持多种输入和输出插件。
  • 适用场景:适合日志数据的实时抽取和处理。

数据抽取例子

一、数据抽取的场景

假设我们需要从以下三个数据源中抽取数据:

  1. MySQL 数据库:抽取用户表(users)中的数据。
  2. CSV 文件:抽取一个包含订单信息的文件(orders.csv)。
  3. API:从一个公开的 API 中抽取天气数据。

二、数据抽取的实现

1. 从 MySQL 数据库抽取数据
  • 工具:Python + pymysqlSQLAlchemy
  • 步骤
    1. 连接数据库。
    2. 执行 SQL 查询。
    3. 将查询结果保存到 DataFrame 或文件中。
import pandas as pd
from sqlalchemy import create_engine# 数据库连接配置
db_config = {'host': 'localhost','user': 'root','password': 'password','database': 'test_db'
}# 创建数据库连接
engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")# 执行 SQL 查询
query = "SELECT * FROM users"  # 假设 users 表包含 id, name, age 字段
df_users = pd.read_sql(query, engine)# 输出结果
print("从 MySQL 抽取的用户数据:")
print(df_users)

2. 从 CSV 文件抽取数据
  • 工具:Python + pandas
  • 步骤
    1. 读取 CSV 文件。
    2. 将数据加载到 DataFrame 中。
import pandas as pd# 读取 CSV 文件
df_orders = pd.read_csv('orders.csv')  # 假设 orders.csv 包含 order_id, user_id, amount 字段# 输出结果
print("从 CSV 文件抽取的订单数据:")
print(df_orders)

3. 从 API 抽取数据
  • 工具:Python + requests
  • 步骤
    1. 发送 HTTP 请求到 API。
    2. 解析返回的 JSON 数据。
    3. 将数据保存到 DataFrame 中。
import requests
import pandas as pd# API 配置
api_url = "https://api.weatherapi.com/v1/current.json"
api_key = "your_api_key"  # 替换为你的 API Key
params = {'key': api_key,'q': 'Beijing'  # 查询北京的天气
}# 发送 HTTP 请求
response = requests.get(api_url, params=params)
data = response.json()  # 解析 JSON 数据# 将数据保存到 DataFrame
weather_data = {'location': data['location']['name'],'temperature': data['current']['temp_c'],'condition': data['current']['condition']['text']
}
df_weather = pd.DataFrame([weather_data])# 输出结果
print("从 API 抽取的天气数据:")
print(df_weather)

三、完整代码示例

以下是整合了上述三种数据抽取方式的完整代码:

import pandas as pd
from sqlalchemy import create_engine
import requests# 1. 从 MySQL 数据库抽取数据
def extract_from_mysql():# 数据库连接配置db_config = {'host': 'localhost','user': 'root','password': 'password','database': 'test_db'}# 创建数据库连接engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")# 执行 SQL 查询query = "SELECT * FROM users"df_users = pd.read_sql(query, engine)return df_users# 2. 从 CSV 文件抽取数据
def extract_from_csv():df_orders = pd.read_csv('orders.csv')return df_orders# 3. 从 API 抽取数据
def extract_from_api():api_url = "https://api.weatherapi.com/v1/current.json"api_key = "your_api_key"  # 替换为你的 API Keyparams = {'key': api_key,'q': 'Beijing'}response = requests.get(api_url, params=params)data = response.json()weather_data = {'location': data['location']['name'],'temperature': data['current']['temp_c'],'condition': data['current']['condition']['text']}df_weather = pd.DataFrame([weather_data])return df_weather# 主函数
if __name__ == "__main__":# 抽取数据df_users = extract_from_mysql()df_orders = extract_from_csv()df_weather = extract_from_api()# 输出结果print("从 MySQL 抽取的用户数据:")print(df_users)print("\n从 CSV 文件抽取的订单数据:")print(df_orders)print("\n从 API 抽取的天气数据:")print(df_weather)

四、运行结果

假设数据如下:

  1. MySQL 用户表
idnameage
1Alice25
2Bob30
  1. CSV 订单文件
order_iduser_idamount
1011100.0
1022200.0
  1. API 天气数据
locationtemperaturecondition
Beijing20.0Sunny

运行代码后,输出如下:

从 MySQL 抽取的用户数据:id   name  age
0   1  Alice   25
1   2    Bob   30从 CSV 文件抽取的订单数据:order_id  user_id  amount
0       101        1   100.0
1       102        2   200.0从 API 抽取的天气数据:location  temperature condition
0  Beijing         20.0     Sunny

五、总结

数据抽取是 ETL 流程的第一步,通常涉及从多种数据源(如数据库、文件、API)中提取数据。通过 Python 和相关库(如 pandasSQLAlchemyrequests),可以轻松实现数据抽取任务。你可以根据实际需求扩展这个例子,比如支持增量抽取、处理异常情况等。希望这个例子对你有帮助!

原文地址:码农小站
公众号:码农小站

相关文章:

ETL系列-数据抽取(Extract)

ETL的过程 1、数据抽取:确定数据源,定义数据接口,选择数据抽取方法(主动抽取或由源系统推送)。 2、数据清洗:处理不完整数据、错误数据、重复数据等,确保数据的准确性和一致性。(是…...

java八股文之框架

1.Spring框架中的Bean是否线程安全的 Spring框架中的Bean默认是单例的,不是线程安全的。因为一般在Spring的bean的中都是注入无状态的对象,没有线程安全问题,如果在bean中定义了可修改的成员变量,是要考虑线程安全问题的&#xf…...

【大模型】Ubuntu下 fastgpt 的部署和使用

前言 本次安装的版本为 fastgpt:v4.8.8-fix2。 最新版本fastgpt:v4.8.20-fix2 问答时报错,本着跑通先使用起来,就没有死磕下去,后面bug解了再进行记录。   github连接:https://github.com/labring/FastGPT fastgpt 安装说明&…...

小程序中头像昵称填写

官方文档 参考小程序用户头像昵称获取规则调整公告 新的小程序版本不能通过wx.getUserProfile和wx.getUserInfo获取用户信息 <van-field label"{{Avatar}}" label-class"field-label" right-icon-class"field-right-icon-class"input-class&…...

卷积神经网络(cnn,类似lenet-1,八)

我们第一层用卷积核&#xff0c;前面已经成功&#xff0c;现在我们用两层卷积核&#xff1a; 结构如下&#xff0c;是不是很想lenet-1&#xff0c;其实我们24年就实现了sigmoid版本的&#xff1a; cnn突破九&#xff08;我们的五层卷积核bpnet网络就是lenet-1&#xff09;-CS…...

【NLP 27、文本分类任务 —— 传统机器学习算法】

不要抓着枯叶哭泣&#xff0c;你要等待初春的新芽 —— 25.1.23 一、文本分类任务 定义&#xff1a;预先设定好一个文本类别集合&#xff0c;对于一篇文本&#xff0c;预测其所属的类别 例如&#xff1a; 情感分析&#xff1a; 这家饭店太难吃了 —> 正类 …...

Go红队开发—并发编程

文章目录 并发编程go协程chan通道无缓冲通道有缓冲通道创建⽆缓冲和缓冲通道 等协程sync.WaitGroup同步Runtime包Gosched()Goexit() 区别 同步变量sync.Mutex互斥锁atomic原子变量 SelectTicker定时器控制并发数量核心机制 并发编程阶段练习重要的细节端口扫描股票监控 并发编程…...

Oracle 导出所有表索引的创建语句

在Oracle数据库中&#xff0c;导出所有表的索引创建语句通常涉及到使用数据字典视图来查询索引的定义&#xff0c;然后生成对应的SQL语句。你可以通过查询DBA_INDEXES或USER_INDEXES视图&#xff08;取决于你的权限和需求&#xff09;来获取这些信息。 使用DBA_INDEXES视图 如…...

使用Docker方式一键部署MySQL和Redis数据库详解

一、前言 数据库是现代应用开发中不可或缺的一部分&#xff0c;MySQL和Redis作为两种广泛使用的数据库系统&#xff0c;分别用于关系型数据库和键值存储。本文旨在通过Docker和Docker Compose的方式&#xff0c;提供一个简洁明了的一键部署方案&#xff0c;确保数据库服务的稳…...

2020年蓝桥杯Java B组第二场题目+部分个人解析

#A&#xff1a;门牌制作 624 解一&#xff1a; public static void main(String[] args) {int count0;for(int i1;i<2020;i) {int ni;while(n>0) {if(n%102) {count;}n/10;}}System.out.println(count);} 解二&#xff1a; public static void main(String[] args) {…...

[深度学习] 大模型学习2-提示词工程指北

在文章大语言模型基础知识里&#xff0c;提示词工程&#xff08;Prompt Engineering&#xff09;作为大语言模型&#xff08;Large Language Model&#xff0c;LLM&#xff09;应用构建的一种方式被简要提及&#xff0c;本文将着重对该技术进行介绍。 提示词工程就是在和LLM聊…...

FPGA之硬件设计笔记-持续更新中

目录 1、说在前面2、FPGA硬件设计总计说明3、 原理图详解 - ARITX - 7 系列3.1 顶层框图介绍3.2 FPGA 电源sheet介绍&#xff1a;3.2.1 bank 14 和 bank 15的供电3.2.2 bank 0的供电3.2.3 Bank34 35 的供电 3.3 核电压和RAM电压以及辅助电压 4 原理图详解-- Ultrascale ARTIX4.…...

vue cli 与 vite的区别

1、现在我们一般会用vite来构建vue3的项目。 2、之前一开始的时候&#xff0c;我们会用vue cli的vue create来构建项目。 3、它们之间有什么区别呢&#xff1f; 1. 设计理念 Vue CLI&#xff1a; 是 Vue.js 官方提供的命令行工具&#xff0c;主要用于快速搭建 Vue 项目。 提…...

怎么在本地环境安装yarn包

一、安装Yarn的前置条件 安装Node.js和npm Yarn依赖于Node.js环境&#xff0c;需先安装Node.js官网的最新稳定版&#xff08;建议≥16.13.0&#xff09;。安装时勾选“Add to PATH”以自动配置环境变量。 二、安装Yarn的多种方式 1. 通过npm全局安装&#xff08;通用&#xf…...

【大模型】AI 辅助编程操作实战使用详解

目录 一、前言 二、AI 编程介绍 2.1 AI 编程是什么 2.1.1 为什么需要AI辅助编程 2.2 AI 编程主要特点 2.3 AI编程底层核心技术 2.4 AI 编程核心应用场景 三、AI 代码辅助编程解决方案 3.1 AI 大模型平台 3.1.1 AI大模型平台代码生成优缺点 3.2 AI 编码插件 3.3 AI 编…...

react18自定义hook实现

概念&#xff1a;自定义 hook 是一种将组件逻辑提取到可复用函数中的方式&#xff0c;它允许你在多个组件中共享相同的状态和行为。自定义 hook 的本质上是一个普通的 JavaScript 函数&#xff0c;它可以使用 React 内部的 hook&#xff08;如 useState、useEffect、useContext…...

一周学会Flask3 Python Web开发-Jinja2模板过滤器使用

锋哥原创的Flask3 Python Web开发 Flask3视频教程&#xff1a; 2025版 Flask3 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 在Jinja2中&#xff0c;过滤器(filter)是一些可以用来修改和过滤变量值的特殊函数&#xff0c;过滤器和变量用一个竖线 | &a…...

使用PDFMiner.six解析PDF数据

PDF&#xff08;可移植文档格式&#xff09;文件是由Adobe创建的一种灵活的文件格式&#xff0c;它允许文档在不同的软件、硬件和操作系统中一致地显示。每个PDF文件都包含对固定布局文档的全面描述&#xff0c;包括文本、字体、图形和其他必要的显示元素。pdf通常用于文档共享…...

本地svn

参考补充&#xff1a;https://blog.csdn.net/hhl_work/article/details/107832414 先在D:\coding_cangku下新建空文件夹&#xff0c;例&#xff1a;code1【类似gitee线上仓库】点击进入code1&#xff0c;右键选择TortoiseSVN&#xff0c;再下一级菜单下点击Create repository …...

金融支付行业技术侧重点

1. 合规问题 第三方支付系统的平稳运营&#xff0c;严格遵循《非银行支付机构监督管理条例》的各项条款是基础与前提&#xff0c;其中第十八条的规定堪称重中之重&#xff0c;是支付机构必须牢牢把握的关键准则。 第十八条明确指出&#xff0c;非银行支付机构需构建起必要且独…...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

conda相比python好处

Conda 作为 Python 的环境和包管理工具&#xff0c;相比原生 Python 生态&#xff08;如 pip 虚拟环境&#xff09;有许多独特优势&#xff0c;尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处&#xff1a; 一、一站式环境管理&#xff1a…...

从零实现富文本编辑器#5-编辑器选区模型的状态结构表达

先前我们总结了浏览器选区模型的交互策略&#xff0c;并且实现了基本的选区操作&#xff0c;还调研了自绘选区的实现。那么相对的&#xff0c;我们还需要设计编辑器的选区表达&#xff0c;也可以称为模型选区。编辑器中应用变更时的操作范围&#xff0c;就是以模型选区为基准来…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程&#xff1a;&#xff08;白话解释&#xff09; 我们将原始待发送的消息称为 M M M&#xff0c;依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)&#xff08;意思就是 G &#xff08; x ) G&#xff08;x) G&#xff08;x) 是已知的&#xff09;&#xff0…...

【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】

1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件&#xff08;System Property Definition File&#xff09;&#xff0c;用于声明和管理 Bluetooth 模块相…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)

文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

技术栈RabbitMq的介绍和使用

目录 1. 什么是消息队列&#xff1f;2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...

无人机侦测与反制技术的进展与应用

国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机&#xff08;无人驾驶飞行器&#xff0c;UAV&#xff09;技术的快速发展&#xff0c;其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统&#xff0c;无人机的“黑飞”&…...

Kafka入门-生产者

生产者 生产者发送流程&#xff1a; 延迟时间为0ms时&#xff0c;也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于&#xff1a;异步发送不需要等待结果&#xff0c;同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...

SQL Server 触发器调用存储过程实现发送 HTTP 请求

文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...