当前位置: 首页 > news >正文

Python psycopg2 类库使用学习总结

实践环境

openGauss 6.0.0 TLS企业版

python3 .9.13

psycopg2 2.9.10

实践操作

# -*- coding:utf-8 -*-import psycopg2if __name__ == '__main__':    # 连接方式1# connection_str = 'host=192.168.88.139 port=15400 dbname=testdb user=testacc password=test1234#'# conn = psycopg2.connect(connection_str)# 连接方式2# 注意:# host: 如果未提供,默认为UNIX socket# port: 如果未提供,默认为5432# database 仅用于关键词参数,不能用于连接参数字符串中conn = psycopg2.connect(host='192.168.88.139', port=15400, database='testdb', user='testacc', password='test1234#')cursor = conn.cursor()cursor.execute('SELECT datname AS "Database", pg_catalog.pg_get_userbyid(datdba) AS "Owner" FROM pg_database')res = cursor.fetchall()print(res)  # 输出形如 [('template1', 'omm'), ('template0', 'omm'), ('testdb', 'omm'), ('postgres', 'omm')]# 删除表cursor.execute('DROP TABLE IF EXISTS t_user')#print(cursor.fetchall())  # 会报错:psycopg2.ProgrammingError: no results to fetchcreate_tb_sql = '''CREATE TABLE IF NOT EXISTS t_user (user_id    INT PRIMARY KEY,username   VARCHAR(50) NOT NULL UNIQUE,password   VARCHAR(60) NOT NULL,email      VARCHAR(100) UNIQUE) WITH (fillfactor = 85  -- 行存表填充因子(预留15%空间用于更新))'''cursor.execute(create_tb_sql)cursor.execute('commit') # 注意,如果没有这个,不会提交到数据库,即执行完上述语句后,数据库public模式下依然看不到数据表# 插入数据--使用 %s 占位符cursor.execute('INSERT INTO t_user (user_id, username, password, email) VALUES (%s, %s, %s, %s)',(1, '赖某某', '123456', 'testemail1@163.com'))print(cursor.rowcount)  # 获取execute 产生记录数 输出:1cursor.execute('INSERT INTO t_user (user_id, username, password, email) VALUES (%s, %s, %s, %s)',[2, '王某某', '123456', 'testemail2@163.com'])# 插入数据--使用 %(field_name)s 占位符cursor.execute('INSERT INTO t_user (user_id, username, password, email) VALUES (%(user_id)s, %(username)s, %(password)s, %(email)s)',{'user_id':3, 'username': '肖某某', 'password': '123456', 'email':'testemail3@163.com'})# 插入数据-插入多条cursor.executemany('INSERT INTO t_user (user_id, username, password, email) VALUES (%s, %s, %s, %s)',[(4, 'testacc1', '123456', 'testemail5@163.com'),(5, 'testacc2', '123456', 'testemail6@163.com')])# 最后提交cursor.execute('commit')  # 注意,如果没有这个,不会提交到数据库,即执行完上述语句后,数据库表中依然查不到对应数据cursor.execute('SELECT * FROM testdb.public.t_user')print(cursor.fetchall()) # 输出包含对应记录的list# cursor.execute("SELECT (%s % 2) = 0 AS even", (10,))  # WRONGcursor.execute("SELECT (%s %% 2) = 0 AS even", (10,))  # correctprint(cursor.fetchall()) # 输出:[(True,)]cursor.execute('SELECT * FROM t_user WHERE user_id=1')print(cursor.fetchone())  # 输出:(1, '赖某某', '123456', 'testemail1@163.com')print(cursor.fetchone())  # 输出:Nonecursor.execute('SELECT * FROM t_user')print(cursor.rowcount)  # 输出:5# 只取部分记录print(cursor.fetchmany(3))# 输出:[(1, '赖某某', '123456', 'testemail1@163.com'), (2, '林某某', '123456', 'testemail2@163.com'), (3, '王某某', '123456', 'testemail3@163.com')]cursor.execute('SELECT * FROM t_user WHERE user_id=10')res = cursor.fetchall()print(res) # 输出:[]cursor.close()conn.close()

问题

1、使用psycopg2 连接数据时遇到如下报错:

psycopg2.OperationalError: connection to server at "192.168.88.139", port 15400 failed: none of the server's SASL authentication mechanisms are supported

前提:

opengausspg_hba.conf 关键配置

# IPv4 local connections:
...
host    all    all    0.0.0.0/0    sha256

解决方法

1、编辑opengauss服务器postgresql.conf配置文件,修改password_encryption_type 为1

password_encryption_type = 1            #Password storage type, 0 is md5 for PG, 1 is sha256 + md5, 2 is sha256 only

2、重启数据库服务器

3、修改连接数据库所用用户的密码

类库封装

测试用数据表

CREATE TABLE IF NOT EXISTS test (id    INT PRIMARY KEY,log_message   VARCHAR(60) NOT NULL
) WITH (fillfactor = 85  -- 行存表填充因子(预留15%空间用于更新)
)
# -*- coding:utf-8 -*-import re
import traceback
import psycopg2
from utils.log import loggerclass PostgreSQLCli:def __init__(self, db_name='', db_host='', port=3306, user='', password='', connect_timeout=15):try:self.dbconn = Noneself.host = db_hostself.port = portself.user = userself.passwd = passwordself.db_name = db_nameself.connect_timeout = connect_timeoutself.connect_config = {'host': self.host, 'port': self.port, 'user': self.user, 'password': self.passwd, 'database': self.db_name}self.__connect_database()logger.debug('初始化数据库连接成功(数据库:%s)' % self.db_name)except Exception as e:raise Exception('初始化数据库(%s)连接失败:%s' % (self.db_name, traceback.format_exc()))def __connect_database(self):self.dbconn = psycopg2.connect(**self.connect_config)def insert(self, query, params=None):'''插入单条数据示例::query "INSERT INTO test (x) VALUES(%(x)s)":params {'x': 100}:query "INSERT INTO test (x) VALUES(%s)":params [100] 或者(100,)'''try:db_cursor = self.dbconn.cursor()except Exception:logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())self.__connect_database()db_cursor = self.dbconn.cursor()try:db_cursor.execute(query, params)db_cursor.execute('commit')db_cursor.close()except Exception:db_cursor.execute('rollback')db_cursor.close()raise Exception(f'执行数据库插入操作({query})失败:{traceback.format_exc()}')def insert_many(self, query, params=None):'''插入多条数据示例::query "INSERT INTO test (x) VALUES(%(x)s)":params [{'x': 100}, {'x': 101}]:query "INSERT INTO test (x) VALUES(%s)":params [[100], [101]] 或者[(100,), (101, )]'''try:db_cursor = self.dbconn.cursor()except Exception:logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())self.__connect_database()db_cursor = self.dbconn.cursor()try:db_cursor.executemany(query, params)db_cursor.execute('commit')db_cursor.close()except Exception:db_cursor.execute('rollback')db_cursor.close()raise Exception(f'执行数据库批量插入操作({query})失败:{traceback.format_exc()}')def delete(self, query, params=None):'''例子::query DELETE FROM test WHERE id = %(id)s':params {'id': 1}当然,也可以把参数放到query中 DELETE FROM test WHERE id = 2'''try:db_cursor = self.dbconn.cursor()except Exception:logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())self.__connect_database()db_cursor = self.dbconn.cursor()try:db_cursor.execute(query, params)db_cursor.execute('commit')db_cursor.close()except Exception:db_cursor.execute('rollback')db_cursor.close()raise Exception(f'执行数据库删除操作({query})失败:{traceback.format_exc()}')def update(self, query, params=None):'''例子::query "UPDATE test SET log_message=%(log_message)s WHERE id = %(id)s":params {'log_message':'log message', 'id': 2}当然,也可以把参数放到query中 UPDATE test SET log_message='log message' WHERE id = 2'''try:db_cursor = self.dbconn.cursor()except Exception:logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())self.__connect_database()db_cursor = self.dbconn.cursor()try:db_cursor.execute(query, params)db_cursor.execute('commit')db_cursor.close()except Exception:db_cursor.execute('rollback')db_cursor.close()raise Exception(f'执行数据库更新操作({query})失败:{traceback.format_exc()}')def select(self, query, params=None):'''查询结果最多只包含一条记录示例:查询获取获取id为2的记录:query  'SELECT * FROM test WHERE id = %(id)s':param  {'id': 2}当然,也可以把参数放到query中 SELECT * FROM test WHERE id = 2'''result = []try:db_cursor = self.dbconn.cursor()except Exception:logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库' % traceback.format_exc())self.__connect_database()db_cursor = self.dbconn.cursor()try:db_cursor.execute(query, params)query_result = db_cursor.fetchall()if query_result:result = query_result[0]db_cursor.close()except Exception:db_cursor.close()raise Exception(f'执行数据库查询操作({query})失败:{traceback.format_exc()}')return resultdef select_many(self, query, params=None):'''查询,查询结果包含多条记录'''try:db_cursor = self.dbconn.cursor()except Exception:logger.error('获取数据库游标失败:%s,正在尝试重新连接数据库(%s)' % (traceback.format_exc(), self.db_name))self.__connect_database()db_cursor = self.dbconn.cursor()try:db_cursor.execute(query, params)query_result = db_cursor.fetchall()db_cursor.close()except Exception:db_cursor.close()raise Exception(f'执行数据库查询操作({query})失败:{traceback.format_exc()}')return query_resultdef close(self):if self.dbconn:self.dbconn.close()self.dbconn = Nonedef __del__(self):self.close()# 测试
if __name__ == '__main__':db_cli = PostgreSQLCli(db_name='testdb', db_host='192.168.88.139', port=15400, user='testacc', password='test1234#')db_cli.insert('INSERT INTO test (id, log_message) VALUES(%s, %s)', [1, 'test message 1'])db_cli.insert_many('INSERT INTO test (id, log_message) VALUES(%s, %s)', [[2, 'test message 2'],[3, 'test message 3']])res = db_cli.select_many('SELECT * FROM test')print(res) #输出:[(1, 'test message 1'), (2, 'test message 2'), (3, 'test message 3')]res = db_cli.select('SELECT * FROM test WHERE id = %(id)s', {'id': 2})print(res) # 输出:(2, 'test message 2')res = db_cli.select('SELECT * FROM test WHERE id = 2')print(res)db_cli.update("UPDATE test SET log_message = %(log_message)s WHERE id = %(id)s", {'log_message':'log_message %s' % datetime.now().strftime('%Y%m%d%H%M%S'), 'id': 2})res = db_cli.select('SELECT * FROM test WHERE id = 2')print(res)db_cli.update("UPDATE test SET log_message = '%s' WHERE id = 2" % ('log_message %s' % datetime.now().strftime('%Y%m%d%H%M%S')))res = db_cli.select('SELECT * FROM test WHERE id = 2')print(res)db_cli.delete('DELETE FROM test WHERE id IN (%(id)s, %(id2)s)', {'id': 2, 'id2': 3})res = db_cli.select_many('SELECT * FROM test')print(res)

注意:误区

当前驱动版本下验证,使用类似以下代码,尝试切换当前数据库至目标数据库test_db,然后获取获取test_db数据库中所有表

db_cli.select('USE `test_db`')
tables = db_cli.select('SHOW TABLES')  

实际执行结果,db_cli.select('SHOW TABLES')总是返回初始化连接时连接的数据库的中的表。解决方法如下:

tables = db_cli.select('SHOW TABLES FROM test_db')  

注意:如果表名中存在特殊字符比如 / 时,表名需要加双引号,否则会报错

db_cli.select('SELECT * FROM db_name.schema_name."agent/defualt");
db_cli.select('SELECT * FROM "agent/defualt");

参考链接

https://github.com/psycopg/psycopg2

https://www.psycopg.org/docs/usage.html

http://www.hskmm.com/?act=detail&tid=29401

相关文章:

  • [GenAI] RAG架构演进
  • 24NOIP游记——彼时彼刻
  • 嵌入式-C++面经1
  • 合并区间 - MKT
  • 如何防止员工向第三方 AI 泄露数据?滤海 AI DLP 全方位技术防护方案解析
  • 20232322 2025-2026-1 《网络与系统攻防技术》实验一实验报告
  • 实验1 现代c++编程初体验
  • 冬天快乐
  • P2441M 见过的 tricks
  • 企业大数据战略定位
  • OpenAI加码个性化消费AI技术布局
  • 线性回归 C++ 实现
  • 内存分区
  • Spring Data JPA学习笔记
  • P1112 波浪数 题解
  • 20232411 2025-2026-1 《网络与系统攻防技术》实验一实验报告
  • 使用 Pascal 实现英文数字验证码识别系统
  • PWN手的成长之路-15-jarvisoj_level2_x64
  • 2025.10.12——1绿
  • 价值博弈场的工程实现:构建数字文明的价值免疫系统——声明Ai生成
  • 基于 Rust 的英文数字验证码识别系统设计与实现
  • 2025年两联供室内机厂家最新权威推荐榜:技术实力与市场口碑
  • 2025武汉商铺装修防水厂家最新权威推荐榜:专业施工与品质保
  • 2025铝合金微弧氧化厂家权威推荐榜:表面处理技术实力深度解
  • 2025杉木木方厂家最新权威推荐榜:优质木材与稳定供应口碑之
  • 2025年厂房保养厂家最新权威推荐榜:专业维护与成本控制优选
  • 使用C语言实现重写stm32的启动文件
  • 2025中医师承权威推荐榜:名师带徒与临床实践深度解析
  • 让我们开始 CSS 的学习之旅
  • 2025液压无损扒胎机厂家权威推荐榜:高效无损与耐用性能深度