百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 优雅编程 > 正文

利用Python优雅地操作Oracle数据库

sinye56 2024-10-10 10:59 9 浏览 0 评论

需要源码可以直接联系

方案一:

直接套用脚本,需可以看懂一些脚本逻辑

改代码中 类名为 OracleConnector,它可以同时连接多个 Oracle 数据库,并提供执行增删改查操作的方法。

import cx_Oracle

class OracleConnector:
    def __init__(self, databases):
        self.connections = {}
        for db in databases:
            conn = cx_Oracle.connect(
                user=db['username'],
                password=db['password'],
                dsn=db['dsn'],
                encoding=db.get('encoding', 'UTF-8')
            )
            self.connections[db['name']] = conn

    def execute_query(self, database_name, query, params=None):
        with self.connections[database_name].cursor() as cursor:
            cursor.execute(query, params)
            result = cursor.fetchall()
        return result
    
    def execute_non_query(self, database_name, query, params=None):
        with self.connections[database_name].cursor() as cursor:
            cursor.execute(query, params)
            self.connections[database_name].commit()
            return cursor.rowcount
        
    def execute_query_all(self, databases, query, params=None):
        results = {}
        for db in databases:
            database_name = db['name']
            results[database_name] = self.execute_query(database_name, query, params)
        return results
    
    def execute_non_query_all(self, databases, query, params=None):
        results = {}
        for db in databases:
            database_name = db['name']
            try:
                res = self.execute_non_query(database_name, query, params)
                results[database_name] = res
            except cx_Oracle.DatabaseError as e:
                print(f"Error occurred when running query on {database_name}: {e}")
                self.connections[database_name].rollback()
        return results


以下是代码解释:

import cx_Oracle:导入 cx_Oracle 库,用于与 Oracle 数据库进行连接和交互。

class OracleConnector::定义一个名为 OracleConnector 的类。

def __init__(self, databases)::类的初始化方法,接受一个包含多个数据库信息的列表作为参数。

self.connections = {}:初始化一个字典,用于存储数据库连接。

for db in databases::遍历数据库列表中的每个数据库信息。

conn = cx_Oracle.connect(...):根据数据库信息创建数据库连接对象。user=db['username']:连接数据库的用户名。password=db['password']:连接数据库的密码。dsn=db['dsn']:Oracle 数据库的 DSN(数据源名称)。encoding=db.get('encoding', 'UTF-8'):可选的编码方式,默认为 UTF-8。

self.connections[db['name']] = conn:将数据库连接对象添加到字典中,以数据库名称作为键。

def execute_query(self, database_name, query, params=None)::执行查询语句的方法。database_name:数据库名称。query:要执行的查询语句。params=None:可选的查询参数。

with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。

cursor.execute(query, params):执行查询语句。

result = cursor.fetchall():获取查询结果集。

return result:返回查询结果。

def execute_non_query(self, database_name, query, params=None)::执行非查询语句的方法,比如插入、更新和删除操作。

database_name:数据库名称。

query:要执行的非查询语句。

params=None:可选的参数。

with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。

cursor.execute(query, params):执行非查询语句,比如插入、更新和删除操作。

self.connections[database_name].commit():提交事务,将修改操作永久保存到数据库中。

return cursor.rowcount:返回受影响的行数,即执行非查询操作后所影响的行数。

def execute_query_all(self, databases, query, params=None)::在多个数据库上执行相同的查询语句,并返回结果。

databases:要执行查询的数据库列表。

query:要执行的查询语句。

params=None:可选的查询参数。

results = {}:初始化一个字典,用于存储查询结果。

for db in databases::遍历数据库列表中的每个数据库。

database_name = db['name']:获取数据库名称。

results[database_name] = self.execute_query(database_name, query, params):执行查询,并将结果存储到字典中。

return results:返回包含查询结果的字典。

def execute_non_query_all(self, databases, query, params=None)::在多个数据库上执行相同的非查询语句,并返回结果。

databases:要执行非查询操作的数据库列表。

query:要执行的非查询语句。

params=None:可选的参数。

results = {}:初始化一个字典,用于存储操作结果。

for db in databases::遍历数据库列表中的每个数据库。

database_name = db['name']:获取数据库名称。

try::捕获可能发生的异常。

res = self.execute_non_query(database_name, query, params):执行非查询操作。

results[database_name] = res:将操作结果存储到字典中。

except cx_Oracle.DatabaseError as e::捕获 Oracle 数据库相关的异常。

print(f"Error occurred when running query on {database_name}: {e}"):打印出错信息。

self.connections[database_name].rollback():执行回滚操作,撤销之前的操作。

return results:返回包含操作结果的字典。

这个封装类使得可以同时连接多个 Oracle 数据库,并在它们上执行增删改查操作。而在执行非查询操作时,如果遇到错误,将会进行回滚操作,保证数据的一致性。


方案二:直接调用封装脚本(写用例,执行脚本即可)

脚本实现封装后,只需要在Oracle.yaml文件中写用例即可,此后执行Oracle.py脚本即实现数据库的增删改查操作

目录介绍:


Oracle.yaml 编写基本数据库信息

Orcale:
  username: user
  password:  pwd
  ip: 127.0.0.1
  port: 1521
  sid: orcl


PublicConfig.py脚本: 配置读取信息,方便调用

import os
from Public_Utils.util_yaml import YamlReader

class YamlPath:
    def __init__(self):
        current = os.path.abspath(__file__)
        self.base_dir = os.path.dirname(os.path.dirname(current))
        self._config_path = self.base_dir + os.sep + "Public_Config\Public_yaml"

    def get_oracle_file(self):
        self._config_file = self._config_path + os.sep + "Oracle.yaml"
        return self._config_file

class ConfigYaml:
    def __init__(self):   #初始yaml读取配置文件
        self.oracle_config = YamlReader(YamlPath().get_oracle_file()).yaml_data()

    def get_oracle_yaml(self):
        return self.oracle_config['Orcale']

if __name__ == '__main__':
    pass


Oracle.py执行脚本

#  coding=utf-8
import cx_Oracle
import os
import json
from Public_Config.PublicConfig import ConfigYaml
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'

class Oracle:

    def __init__(self):
        user = ConfigYaml().get_oracle_yaml()['username']
        pwd = ConfigYaml().get_oracle_yaml()['password']
        ip = ConfigYaml().get_oracle_yaml()['ip']
        port = ConfigYaml().get_oracle_yaml()['port']
        sid = ConfigYaml().get_oracle_yaml()['sid']
        self.connect = cx_Oracle.connect(str(user) + "/" + str(pwd) + "@" + str(ip) + ":" + str(port) + "/" + str(sid))
        # self.connect = cx_Oracle.connect(f'{user}/{pwd}@{ip}:{port}/{sid}')   # 这里的顺序是 用户名/密码@oracleserver的ip地址/数据库名字
        self.cursor = self.connect.cursor()   # 使用cursor()方法获取操作游标

    def GetData(self,sql):
        self.cursor.execute(sql)
        # 使用Rowfactory更改查询结果,更直观查看数据
        columns = [col[0] for col in self.cursor.description]
        self.cursor.rowfactory = lambda *args: dict(zip(columns, args))
        # fetchall()一次取完所有结果
        # fetchone()一次取一行结果
        data = self.cursor.fetchall()
        return data

    def select(self, sql):   #查询
        list = []
        self.cursor.execute(sql)            # 使用execute方法执行SQL语句
        result = self.cursor.fetchall()     # fetchall()一次取完所有结果,fetchone()一次取一行结果
        col_name = self.cursor.description
        for row in result:
            dict = {}
            for col in range(len(col_name)):
                key = col_name[col][0]
                value = row[col]
                dict[key] = value
            list.append(dict)
        js = json.dumps(list, ensure_ascii=False, indent=2, separators=(',', ':'))
        '''json.dumps() 是把python对象转换成json对象的一个过程,生成的是字符串
           json.dump() 是把python对象转换成json对象生成一个fp的文件流,和文件相关'''
        return js #将结果返回为一个字符串

    def selectlist(self,sql):
        list = []
        self.cursor.execute(sql)
        result = self.cursor.fetchall()
        col_name = self.cursor.description
        for row in result:
            dict = {}
            for col in range(len(col_name)):
                key = col_name[col][0]
                value = row[col]
                dict[key] = value
            list.append(dict)
        return list   #将结果以列表返回

    def disconnect(self):   #未连接
        self.cursor.close()
        self.connect.close()

    def insert(self, sql, list_param):   #插入
        try:
            self.cursor.executemany(sql, list_param)
            self.connect.commit()
            print("插入ok")
        except Exception as e:
            print(e)
        finally:
            self.disconnect()

    def update(self, sql):   #更新
        try:
            self.cursor.execute(sql)
            self.connect.commit()
        except Exception as e:
            print(e)
        finally:
            self.disconnect()

    def delete(self, sql):   #删除
        try:
            self.cursor.execute(sql)
            self.connect.commit()
            print("delete ok")
        except Exception as e:
            print(e)
        finally:
            self.disconnect()

if __name__ == '__main__':
    print(Oracle().select(sql="select * from cifaccount a where a.cif_account ='310400009590' "))
    pass


util_yaml.py

import os
import yaml

class YamlReader:
    #初始化,判断文件是否存在
    def __init__(self,yaml_file):
        if os.path.exists(yaml_file):
            self.yaml_file = yaml_file
        else:
            raise FileNotFoundError("yaml文件不存在")
        self._data = None
        self._data_all = None

    def yaml_data(self):  #yaml文件读取 --单个文档读取
        #第一次调用data,读取yaml文档,如果不是,直接返回之前保存的数据
        if not self._data:
            with open(self.yaml_file,'rb') as f:
                self._data = yaml.safe_load(f)
        return self._data

    def yaml_data_all(self):  #多个文档的读取
        if not self._data_all:
            with open(self.yaml_file,'rb') as f:
                self._data_all = yaml.safe_load_all(f)
        return self._data_all

相关推荐

CTO偷偷传我的系统性能优化十大绝招(万字干货)

上篇引言:取与舍软件设计开发某种意义上是“取”与“舍”的艺术。关于性能方面,就像建筑设计成抗震9度需要额外的成本一样,高性能软件系统也意味着更高的实现成本,有时候与其他质量属性甚至会冲突,比如安全性、...

提升效率!VMware虚拟机性能优化十大实用技巧

我40岁,干跨境婚恋中介的。为服务各国用户,常得弄英语、日语、俄语系统环境,VMware虚拟机帮了不少忙。用久了发现优化下性能,效率能更高。今儿就来聊聊优化技巧和同类软件。一、VMware虚拟...

低延迟场景下的性能优化实践

本文摘录自「全球C++及系统软件技术大会」ScottMeyers曾说到过,如果你不在乎性能,为什么要在C++这里,而不去隔壁的Pythonroom呢?今天我们就从“低延迟的概述”、“低延迟系...

Linux性能调优之内存负载调优的一些笔记

写在前面整理一些Linux内存调优的笔记,分享给小伙伴博文没有涉及的Demo,理论方法偏多,可以用作内存调优入门博文内容涉及:Linux内存管理的基本理论寻找内存泄露的进程内存交换空间调优不同方式的...

优化性能套路:带你战胜这只后段程序员的拦路虎

来源|极客时间《卖桃者说》作者|池建强编辑|成敏你好,这里是卖桃者说。今天给大家推荐一篇文章,来自倪朋飞老师的专栏《Linux性能优化实战》,文章主要讲的是优化性能的套路,这几乎是每个后端程序员...

SK海力士CXL优化解决方案已成功搭载于Linux:带宽提升30%,性能提升12%以上

SK海力士宣布,已将用于优化CXL(ComputeExpressLink)存储器运行的自研软件异构存储器软件开发套件(HMSDK)中主要功能成功搭载于全球最大的开源操作系统Linux上,不但提升了...

Linux内核优化:提升系统性能的秘诀

Linux内核优化:提升系统性能的艺术在深入Linux内核优化的世界之前,让我们先来理解一下内核优化的重要性。Linux内核是操作系统的核心,负责管理系统资源和控制硬件。一个经过精心优化的内核可以显著...

Linux系统性能优化:七个实战经验

Linux系统的性能是指操作系统完成任务的有效性、稳定性和响应速度。Linux系统管理员可能经常会遇到系统不稳定、响应速度慢等问题,例如在Linux上搭建了一个web服务,经常出现网页无法打开、打开速...

腾讯面试:linux内存性能优化总结

【1】内存映射Linux内核给每个进程都提供了一个独立且连续的虚拟地址空间,以便进程可以方便地访问虚拟内存;虚拟地址空间的内部又被分为内核空间和用户空间两部分,不同字长的处理器,地址空间的范围也不同...

Linux文件系统性能调优《参数优化详解》

由于各种的I/O负载情形各异,Linux系统中文件系统的缺省配置一般来说都比较中庸,强调普遍适用性。然而在特定应用下,这种配置往往在I/O性能方面不能达到最优。因此,如果应用对I/O性能要求较高,除...

Nginx 性能优化(吐血总结)

一、性能优化考虑点当我需要进行性能优化时,说明我们服务器无法满足日益增长的业务。性能优化是一个比较大的课题,需要从以下几个方面进行探讨当前系统结构瓶颈了解业务模式性能与安全1、当前系统结构瓶颈首先需要...

Linux问题分析与性能优化

排查顺序整体情况:top/htop/atop命令查看进程/线程、CPU、内存使用情况,CPU使用情况;dstat2查看CPU、磁盘IO、网络IO、换页、中断、切换,系统I/O状态;vmstat2查...

大神级产品:手机装 Linux 运行 Docker 如此简单

本内容来源于@什么值得买APP,观点仅代表作者本人|作者:灵昱Termux作为一个强大的Android终端模拟器,能够运行多种Linux环境。然而,直接在Termux上运行Docker并不可行,需要...

新手必须掌握的Linux命令

Shell就是终端程序的统称,它充当了人与内核(硬件)之间的翻译官,用户把一些命令“告诉”终端程序,它就会调用相应的程序服务去完成某些工作。现在包括红帽系统在内的许多主流Linux系统默认使用的终端是...

Linux 系统常用的 30 个系统环境变量全解析

在Linux系统中,环境变量起着至关重要的作用,它们犹如隐藏在系统背后的“魔法指令”,掌控着诸多程序的运行路径、配置信息等关键要素。尤其在shell脚本编写时,巧妙运用环境变量,能让脚本如虎...

取消回复欢迎 发表评论: