SQLAlchemy 使用封装实例

类封装
database.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-import sys
import json
import logging
from datetime import datetimefrom core.utils import classlock, parse_bool
from core.config import (MYSQL_HOST,MYSQL_PORT,MYSQL_USER,MYSQL_PASS,MYSQL_DATABASE,MYSQL_TIMEOUT
)from sqlalchemy import create_engine, Column, desc, not_, func
from sqlalchemy import Integer, String, Boolean, DateTime, Text, Enum # Text存储大不固定长的字符串
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyErrorBase = declarative_base()log = logging.getLogger("log")SCHEMA_VERSION = "1.0.0"class User(Base):__tablename__ = "user"id = Column(Integer(), primary_key=True)file_size = Column(Integer(), nullable=False) # nullable 不可为空md5 = Column(String(32), nullable=False)crc32 = Column(String(8), nullable=False)sha1 = Column(String(40), nullable=False)sha256 = Column(String(64), nullable=False)sha512 = Column(String(128), nullable=False)memory = Column(Boolean, nullable=False, default=False)ssdeep = Column(String(255), nullable=True)start_time = Column(DateTime(timezone=False), nullable=True, default=datetime.now)def __repr__(self): # 查询返回的结果return "<User('{0}','{1}')>".format(self.id, self.sha256)def to_dict(self):"""将对象转换为dict.@return: dict"""d = {}for column in self.__table__.columns:d[column.name] = getattr(self, column.name)return ddef to_json(self):"""将对象转换为JSON.@return: JSON data"""return json.dumps(self.to_dict())class Version(Base):"""用于确定实际数据库架构发布的表."""__tablename__ = "version"version_num = Column(String(32), nullable=False, primary_key=True)class Database(object):"""分析队列数据库此类处理为内部队列创建数据库用户经营它还提供了一些与之交互的功能"""def __init__(self, schema_check=True, echo=False):"""@param dsn: 数据库连接字符串.@param schema_check: 禁用或启用数据库架构版本检查.@param echo: echo sql 查询."""self._lock = Noneself.schema_check = schema_checkself.echo = echodef connect(self, schema_check=None, dsn=None, create=True):"""连接到数据库后端."""if schema_check is not None:self.schema_check = schema_checkif not dsn:dsn = "mysql://{0}:{1}@{2}:{3}/{4}".format(MYSQL_USER, MYSQL_PASS, MYSQL_HOST, MYSQL_PORT, MYSQL_DATABASE)#dsn = "mysql://{username}:{password}@{hostname}:{port}/{database}"self._connect_database(dsn)# 禁用SQL日志记录。打开它进行调试.self.engine.echo = self.echo# 连接超时.self.engine.pool_timeout = MYSQL_TIMEOUT# 获取数据库会话.self.Session = sessionmaker(bind=self.engine)if create:self._create_tables()def _create_tables(self):"""创建所有数据库表等."""try:Base.metadata.create_all(self.engine)except SQLAlchemyError as e:raise ("无法创建或连接到数据库: %s" % e)# 处理架构版本控制.# TODO: it's a little bit dirty, needs refactoring.tmp_session = self.Session()if not tmp_session.query(Version).count():# 设置数据库架构版本.tmp_session.add(Version(version_num=SCHEMA_VERSION))try:tmp_session.commit()except SQLAlchemyError as e:raise ("无法设置架构版本: %s" % e)tmp_session.rollback()finally:tmp_session.close()else:# 检查数据库版本是否为预期版本.last = tmp_session.query(Version).first()tmp_session.close()if last.version_num != SCHEMA_VERSION and self.schema_check:log.warning("数据库架构版本不匹配:找到 %s,应为 %s.",last.version_num, SCHEMA_VERSION)log.error("(可选)进行备份,然后通过运行migrate应用最新的数据库迁移。")sys.exit(1)def __del__(self):"""断开连接池."""self.engine.dispose()def _connect_database(self, connection_string):"""连接到数据库.@param connection_string: 指定数据库的连接字符串"""try:if connection_string.startswith("sqlite"):# 使用“check_same_thread”在多个线程上禁用sqlite安全检查.self.engine = create_engine(connection_string, connect_args={"check_same_thread": False})elif connection_string.startswith("postgres"):# 禁用SSL模式以避免使用sqlalchemy和多进程时出现一些错误.# See: http://www.postgresql.org/docs/9.0/static/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS# TODO 检查这是否仍然相关。特别是假设我们不再使用多处理.self.engine = create_engine(connection_string, connect_args={"sslmode": "disable"})else:self.engine = create_engine(connection_string)except ImportError as e:lib = str(e).split()[-1].strip("'")if lib == "MySQLdb":log.error("缺少MySQL数据库驱动程序(在Linux上使用 `pip install mysql-python` 安装,或在Windows上使用 `pip-install mysqlclient`)")if lib == "psycopg2":log.error("缺少PostgreSQL数据库驱动程序 (使用 `pip install psycopg2`)")log.error("缺少未知的数据库驱动程序,无法导入 %s" % lib)sys.exit(-1)@classlockdef add_user(self, file_size, md5, crc32, sha1, sha256, sha512, memory, ssdeep=None):session = self.Session()# 将空字符串和None值转换为有效的int# if not timeout:# timeout = 0# if not priority:# priority = 1#try:memory = parse_bool(memory)except ValueError:memory = False## try:# enforce_timeout = parse_bool(enforce_timeout)# except ValueError:# enforce_timeout = Falseuser = User()user.file_size = file_sizeuser.md5 = md5user.crc32 = crc32user.sha1 = sha1user.sha256 = sha256user.sha512 = sha512user.memory = memoryuser.ssdeep = ssdeepsession.add(user)try:session.commit()except SQLAlchemyError as e:log.error("数据库添加 user 错误: {0}".format(e))return Falsefinally:session.close()return True@classlockdef select_user(self, id=None):session = self.Session()try:search = session.query(User)if id:search = search.filter_by(id=id)# 排序# search = search.order_by(id)# search = search.order_by(desc(User.id)) 倒叙tasks = search.all()return tasksexcept SQLAlchemyError as e:log.error("数据库查看所有 user 错误: {0}".format(e))return []finally:session.close()@classlockdef update_user(self, id, new_file_size):session = self.Session()try:search = session.query(User).filter(User.id == id).first()search.file_size = new_file_size# session.query(User).filter(User.id == 1).update({'file_size': 'new_file_size'})session.commit()except SQLAlchemyError as e:log.error("数据库更新 user 错误: {0}".format(e))return Falsefinally:session.close()return True@classlockdef delete_user(self, id):session = self.Session()try:search = session.query(User).filter(User.id == id).first()if search:session.delete(search)session.commit()except SQLAlchemyError as e:log.error("数据库删除 user 错误: {0}".format(e))return Falsefinally:session.close()return True
调用运行
merage.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-import time
import loggingfrom core.database import Databaselog = logging.getLogger("log")class Merge(object):def __init__(self):db = Database()db.connect()res = db.add_user(32, "11", "22", "33", "44", "55", "off")if not res:print("添加错误")print(db.select_user())# res = db.update_user(1, 50)# if not res:# print("更新错误")# res = db.delete_user(2)# if not res:# print("删除错误")if __name__ == '__main__':merge = Merge()相关文章:
SQLAlchemy 使用封装实例
类封装 database.py #! /usr/bin/env python # -*- coding: utf-8 -*-import sys import json import logging from datetime import datetimefrom core.utils import classlock, parse_bool from core.config import (MYSQL_HOST,MYSQL_PORT,MYSQL_USER,MYSQL_PASS,MYSQL_DA…...
Android Framework通信:Binder
文章目录 前言一、Linux传统跨进程通信原理二、Android Binder跨进程通信原理1、动态内核可加载模块2、内存映射3、Binder IPC 实现原理 三、Android Binder IPC 通信模型1、Client/Server/ServiceManager/驱动Binder与路由器之间的角色关系 2、Binder通信过程3、Binder通信中的…...
如何用精准测试来搞垮团队?
测试行业每年会冒出来一些新鲜词:混沌工程、精准测试、AI测试…… 这些新概念、新技术让我们感到很焦虑,逼着自己去学习和了解这些新玩意,担心哪一天被淘汰掉。 以至于给我这样的错觉,当「回归测试」、「精准测试」这两个词摆在一…...
暴力递归转动态规划(十)
题目 给定一个二维数组matrix[][],一个人必须从左上角出发,最终到达右下角,沿途只可以向下或者向右走,沿途的数字都累加就是距离累加和。返回最小距离累加和。 这道题中会采用压缩数组的算法来进行优化 暴力递归 暴力递归方法的整…...
深度学习-房价预测案例
1. 实现几个函数方便下载数据 import hashlib import os import tarfile import zipfile import requests#save DATA_HUB dict() DATA_URL http://d2l-data.s3-accelerate.amazonaws.com/def download(name, cache_diros.path.join(.., data)): #save"""下载…...
【26】c++设计模式——>命令模式
c命令模式 C的命令模式是一种行为模式,通过将请求封装成对象,以实现请求发送者和接受者的解耦。 在命令模式中,命令被封装成一个包含特定操作的对象,这个对象包含的执行该操作的方法,以及一些必要的参数。命令对象可以…...
ElasticSearch容器化从0到1实践(一)
背景 通过kubernetes集群聚合多个Elasticsearch集群碎片资源,提高运维效率。 介绍 Kubernetes Operator 是一种特定的应用控制器,通过 CRD(Custom Resource Definitions,自定义资源定义)扩展 Kubernetes API 的功能…...
【Vue面试题二十四】、Vue项目中有封装过axios吗?主要是封装哪方面的?
文章底部有个人公众号:热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享? 踩过的坑没必要让别人在再踩,自己复盘也能加深记忆。利己利人、所谓双赢。 面试官:Vue项目中有封装过axios…...
旅游票务商城小程序的作用是什么
随着环境放开,旅游行业恢复了以往的规模,本地游、外地游成为众多用户选择,而在旅游时,不少人会报名旅行团前往各风景热点游玩,对旅游票务经营者而言,市场高需求的同时也面临一些难题。 对旅游票务经营商家…...
LabVIEW在安装了其它的NI软件之后崩溃了
LabVIEW在安装了其它的NI软件之后崩溃了 在安装了其它的NI软件之后,一些原本安装好的或者新安装的软件由于缺少必要的DLL而崩溃掉了。例如,在这种情况下,Teststand可能会报下面的错误: RetrievingCOM class factory for compone…...
基于Java的个人健康管理系统设计与实现(源码+lw+部署文档+讲解等)
文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding)有保障的售后福利 代码参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…...
nginx https的配置方法
文章目录 安装证书工具安装根证书生成域名证书配置转发 ssl的请求到http请求 安装证书工具 curl ‘http://pan.itshine.cn:5080/?explorer/share/fileOut&shareID64h6PiQQ&path%7BshareItemLink%3A64h6PiQQ%7D%2F%E5%B7%A5%E5%85%B7%2Fmkcert’ > ‘./mkcert’ c…...
使用WebDriver采样器将JMeter与Selenium集成
目录 第一步:在JMeter中添加Selenium / WebDriver插件 第二步:创建一条测试计划--添加线程组 第三步:下载 chromedriver.exe 第四步:在Web Driver 采样器中添加测试脚本 第五步:运行并且验证 注意: 第…...
flink教程
文章目录 来自于尚硅谷教程1. Flink概述1.1 特点1.2 与SparkStreaming对比 2. Flink部署2.1 集群角色2.2 部署模式2.3 Standalone运行模式2.3.1 本地会话模式部署2.3.2 应用模式 2.4 YARN运行模式2.4.1 会话模式部署2.4.2 应用模式部署 2.5 历史服务 3. 系统架构3.1 并行度3.2 …...
视频监控系统/安防视频平台EasyCVR广场视频细节优化
安防视频监控系统/视频云存储/安防监控EasyCVR视频汇聚平台基于云边端智能协同,支持海量视频的轻量化接入与汇聚、转码与处理、全网智能分发、视频集中存储等。安防视频汇聚平台EasyCVR拓展性强,视频能力丰富,可实现视频监控直播、视频轮播、…...
电脑上播放4K视频需要具备哪些条件?
在电视上播放 4K( 4096 2160 像素)视频是很简单的,但在电脑设备上播放 4K 视频并不容易。相反,它们有自己必须满足的硬件要求。 如果不满足要求,在电脑上打开 4K 分辨率文件或大型视频文件会导致卡顿、音频滞后以及更…...
测试除了点点点,还有哪些内容呢?
今天和一个网友讨论了一下关于互联网行业中测试的情况,希望能够了解现在的互联网行业主要的测试工作内容。小编根据以往的工作经历和经验情况,来做一个总结和整理。 1、岗位分类 现在的岗位划分主要是分为两大类:测试工程师 和 测试开发工程…...
HTTP的本质理解
HTTP是超文本传输协议,从协议、传输和超文本三个关键词进行进行分解。 协议关键词讲解 1.协议的第一个词是协,这个就表明需要至少两方参与到其中。 2.协议的第二个词是议,表明HTTP是规范和约定,需要大家共同遵守,也包…...
微信小程序获取公众号的文章
背景:我有一个《砂舞指南》的小程序,主要是分享砂舞最新动态等 最近做了一个小程序,想要一些固定的文章展示在小程序里面,比如《什么是砂舞》《玩砂舞注意点》等普及砂舞知识的文章 开发流程: 1、刚开始测试了 素材…...
【算法|动态规划No.20】leetcode416. 分割等和子集
个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【LeetCode】 🍔本专栏旨在提高自己算法能力的同时,记录一下自己的学习过程,希望…...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...
从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...
mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包
文章目录 现象:mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时,可能是因为以下几个原因:1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...
使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台
🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...
RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...
uniapp 开发ios, xcode 提交app store connect 和 testflight内测
uniapp 中配置 配置manifest 文档:manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号:4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...
在 Spring Boot 项目里,MYSQL中json类型字段使用
前言: 因为程序特殊需求导致,需要mysql数据库存储json类型数据,因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...
Axure 下拉框联动
实现选省、选完省之后选对应省份下的市区...
