flask认证机制logging模块实战
如何在项目中实现日志的记录呢?在 Flask 中我们可以使用 Python 的内置 logging 模块来实现记录日志。你可以对照表格看看具体的日志级别分类,从上往下级别依次升高,最高的是 CRITICAL。
直接上代码
import logging
from logging.handlers import RotatingFileHandler
def setup_log(logger_name=None,log_file='logs/log',level = logging.INFO):# 设置日志的几录等级logging.basicConfig(level=level) # 控制台打印日志,调试debug等级# 穿件日志记录器,指明日志保存的路径、每个日志文件的最大大小、保存的日志文件个数上限file_log_handler = RotatingFileHandler(log_file,maxBytes=1024*1024*100,backupCount=10,encoding='utf-8')# 创建日志记录的格式 日志等级 输入日志信息文件名 行数 日志信息formatter = logging.Formatter('%(asctime)s - %(levelname)s %(filename)s: %(lineno)d %(message)s')# 为刚创建的日志记录器设置日志记录格式file_log_handler.setFormatter(formatter)# 为全局的日志工具对象( flask app 使用的)添加日志记录器logging.getLogger(logger_name).addHandler(file_log_handler)def setup_logger(logger_name,log_file,level= logging.INFO):"""%(asctime) 即日志记录时间,精确到毫秒%(levelname) 即日志级别%(filename) 即日志记录的python文件名%(functionname) 即触发日志记录的函数名%(lineno) 即触发日志代码记录的行号%(message) 这项及调用如app.logger.info('info log')时传入的参数,即message :param logger_name::param log_file::param level::return:"""log = logging.getLogger(logger_name)# 创建日志对象formatter = logging.Formatter('%(asctime)s : %(message)s')# 创建日志记录格式,即日志记录时间和日志内容file_handler = logging.FileHandler(log_file,mode='w',encoding='utf-8')# 创建日志记录器,将日志保存名为log_file的文件中,mode='w'表示每次运行程序时覆盖之前的日志file_handler.setFormatter(formatter)# 将日志格式设置为文件处理器的格式stream_handler = logging.StreamHandler()# 创建一个文件处理器,将日志输出到控制台stream_handler.setFormatter(formatter)# 将文件处理器设置为流处理器的格式log.setLevel(level)# 将你的日志级别设置为传入的参数levellog.addHandler(file_handler)log.addHandler(stream_handler)# 将文件处理器和流处理器添加到日志对象中
代码整体的流程和将日志文件写入文件是类似的,setup_logger 函数会把日志记录器的名称、记录文件的名称和记录级别作为参数。我们先创建日志对象,然后再设置日志格式。紧接着就是创建文件处理器和流处理器,最后将两个处理器添加到日志对象中。我们将日志内容输出到文件时,设置了参数 mode=‘w’,这表示以写入方式打开文件,这种模式会先清空文件中的内容,然后写入新的内容。如果写入的时候发现文件不存在,系统就会创建一个新文件。
主要区别对比
特性 | setup_log |
setup_logger |
---|---|---|
文件处理 | RotatingFileHandler(轮转) | FileHandler(覆盖) |
文件模式 | 追加模式 | 覆盖模式('w') |
日志格式 | 详细(含文件名、行号等) | 简化(只有时间和消息) |
输出目标 | 仅文件 | 文件 + 控制台 |
日志轮转 | 支持(100MB,10个备份) | 不支持 |
适用场景 | 生产环境长期运行 | 开发调试或短期任务 |
我用ai写了一个测试代码,还不错
import logging
import os
import sys# 导入同目录下的log_utils模块
from log_utils import setup_log, setup_loggerdef get_absolute_log_path(relative_path):"""将相对路径转换为绝对路径"""script_dir = os.path.dirname(os.path.abspath(__file__))return os.path.join(script_dir, relative_path)def test_basic_functionality():"""测试基本功能"""print("=== 基本功能测试 ===")# 使用绝对路径app_log_path = get_absolute_log_path('logs/app.log')dev_log_path = get_absolute_log_path('logs/dev.log')# 确保日志目录存在log_dir = os.path.dirname(app_log_path)if not os.path.exists(log_dir):os.makedirs(log_dir)print(f"创建日志目录: {log_dir}")# 测试setup_log函数print("1. 测试setup_log函数...")setup_log('app_logger', app_log_path, logging.DEBUG)app_logger = logging.getLogger('app_logger')app_logger.debug("调试信息 - 测试setup_log")app_logger.info("普通信息 - 用户登录成功")app_logger.warning("警告信息 - 内存使用率过高")app_logger.error("错误信息 - 数据库连接失败")print("setup_log测试完成")# 测试setup_logger函数print("2. 测试setup_logger函数...")setup_logger('dev_logger', dev_log_path, logging.DEBUG)dev_logger = logging.getLogger('dev_logger')dev_logger.debug("调试信息 - 测试setup_logger")dev_logger.info("普通信息 - API请求处理完成")dev_logger.warning("警告信息 - 响应时间过长")dev_logger.error("错误信息 - 文件不存在")print("setup_logger测试完成")def test_different_log_levels():"""测试不同的日志级别"""print("\n=== 不同日志级别测试 ===")# 测试INFO级别info_log_path = get_absolute_log_path('logs/info.log')setup_log('info_logger', info_log_path, logging.INFO)info_logger = logging.getLogger('info_logger')info_logger.debug("这条DEBUG信息不会被记录") # 不会记录info_logger.info("这条INFO信息会被记录")info_logger.warning("这条WARNING信息会被记录")print("INFO级别测试完成")# 测试ERROR级别error_log_path = get_absolute_log_path('logs/error.log')setup_logger('error_logger', error_log_path, logging.ERROR)error_logger = logging.getLogger('error_logger')error_logger.debug("DEBUG - 不会记录")error_logger.info("INFO - 不会记录")error_logger.warning("WARNING - 不会记录")error_logger.error("ERROR - 会被记录")print("ERROR级别测试完成")def test_log_format_differences():"""测试两个函数的日志格式区别"""print("\n=== 日志格式区别测试 ===")# setup_log的详细格式detailed_path = get_absolute_log_path('logs/detailed.log')setup_log('detailed', detailed_path)detailed_logger = logging.getLogger('detailed')detailed_logger.info("这是setup_log的日志格式")# setup_logger的简化格式simple_path = get_absolute_log_path('logs/simple.log')setup_logger('simple', simple_path)simple_logger = logging.getLogger('simple')simple_logger.info("这是setup_logger的日志格式")print("日志格式测试完成")def test_multiple_loggers():"""测试多个日志记录器同时工作"""print("\n=== 多日志记录器测试 ===")# 创建不同用途的日志记录器db_path = get_absolute_log_path('logs/db.log')api_path = get_absolute_log_path('logs/api.log')security_path = get_absolute_log_path('logs/security.log')setup_log('database', db_path, logging.INFO)setup_logger('api', api_path, logging.DEBUG)setup_log('security', security_path, logging.WARNING)db_logger = logging.getLogger('database')api_logger = logging.getLogger('api')security_logger = logging.getLogger('security')# 模拟数据库操作db_logger.info("数据库连接池初始化")db_logger.info("执行SQL: SELECT * FROM users WHERE id = 1")# 模拟API请求api_logger.debug("收到GET请求: /api/users")api_logger.info("用户认证通过")api_logger.warning("查询参数验证警告")# 模拟安全事件security_logger.warning("检测到多次登录失败尝试")security_logger.error("IP地址被封禁")print("多日志记录器测试完成")def test_error_with_traceback():"""测试错误堆栈跟踪"""print("\n=== 错误堆栈跟踪测试 ===")exception_path = get_absolute_log_path('logs/exception.log')setup_log('exception_logger', exception_path)exception_logger = logging.getLogger('exception_logger')def simulate_error():# 模拟一个会出错的分支data = {"key": "value"}return data["nonexistent_key"] # 这里会抛出KeyErrortry:simulate_error()except Exception as e:exception_logger.error("发生了一个异常", exc_info=True)exception_logger.error(f"错误详情: {str(e)}")print("错误堆栈跟踪测试完成")def test_real_world_scenario():"""模拟真实使用场景"""print("\n=== 真实场景模拟 ===")# 模拟用户服务user_service_path = get_absolute_log_path('logs/user_service.log')setup_log('user_service', user_service_path)user_logger = logging.getLogger('user_service')# 模拟用户注册流程def register_user(username, email):user_logger.info(f"开始用户注册流程 - 用户名: {username}, 邮箱: {email}")# 验证用户信息if not username or not email:user_logger.error("用户名或邮箱为空")return Falseuser_logger.debug("用户信息验证通过")# 检查用户是否已存在user_logger.info("检查用户是否已存在")# 模拟创建用户try:# 这里模拟可能出错的数据库操作if username == "admin":raise Exception("管理员用户已存在")user_logger.info(f"用户 {username} 注册成功")return Trueexcept Exception as e:user_logger.error(f"用户注册失败: {str(e)}", exc_info=True)return False# 测试用户注册register_user("john_doe", "john@example.com")register_user("admin", "admin@example.com") # 这个会失败register_user("", "test@example.com") # 这个也会失败print("真实场景模拟完成")def check_log_files():"""检查生成的日志文件"""print("\n=== 日志文件检查 ===")log_dir = get_absolute_log_path('logs')if os.path.exists(log_dir):files = os.listdir(log_dir)print(f"生成的日志文件 ({len(files)} 个):")for file in sorted(files):file_path = os.path.join(log_dir, file)file_size = os.path.getsize(file_path)print(f" - {file} ({file_size} 字节)")# 显示文件前几行内容try:with open(file_path, 'r', encoding='utf-8') as f:lines = f.readlines()[:2] # 只显示前2行for line in lines:print(f" {line.strip()}")except Exception as e:print(f" 读取文件错误: {e}")else:print("日志目录不存在")if __name__ == "__main__":print("开始测试 log_utils.py 中的日志函数")print("=" * 50)try:# 运行所有测试test_basic_functionality()test_different_log_levels()test_log_format_differences()test_multiple_loggers()test_error_with_traceback()test_real_world_scenario()# 检查生成的日志文件check_log_files()print("\n" + "=" * 50)print("所有测试完成!")print("\n总结:")print("1. setup_log - 使用RotatingFileHandler,支持日志轮转,格式详细")print("2. setup_logger - 使用FileHandler,每次覆盖,格式简化")print("3. 两个函数都支持控制台和文件输出")print("4. 日志文件保存在 api/utils/logs/ 目录下")except Exception as e:print(f"测试过程中发生错误: {e}")import tracebacktraceback.print_exc()
测试redis连接
224 yum provides redis225 yum install redis-6.2.19-1.el9_6 -y226 ls -l /usr/lib/systemd/system/redis.service227 systemctl daemon-reload228 systemctl start redis229 systemctl enable redis230 systemctl status redis231 vi mysql_service_control.sh232 ./mysql_service_control.sh233 vi mysql_service_control.sh234 ./mysql_service_control.sh235 mv mysql_service_control.sh mysql_redis_service_control.sh236 ll237 vim /etc/redis/redis.conf238 systemctl restart redis239 systemctl status redis240 ss -ntl241 redis-cli -h 192.168.117.200 -p 6379 -a 1234567890242 historyredis安装完成后还需要做后续的操作
修改redis.conf的配置文件
1、把protected-mode yes改为protected-mode no(在没有密码的情况下,关闭保护模式)
2、注释掉bind 127.0.0.1,前面加# (取消绑定本地地址),或者改为bind 0.0.0.0
3、把daemonize no改为daemonize yes (是否为进程守护,关闭ssh窗口后即是否在后台继续运行)
import redis
try:# 创建一个Redis连接r = redis.Redis(host='192.168.117.200',port=6379,password='1234567890',decode_responses=True #自动解码返回为字符串)# 测试连接response = r.ping()print(f"Redis连接成功: {response}")# 测试基本操作r.set('name', 'John')value = r.get('name')print(f"获取的name值为: {value}")# 获取服务器信息info = r.info()print(f"Redis服务器信息: {info}")print(f"Redis版本: {info['redis_version']}")except Exception as e:print(f"连接失败,发生错误: {str(e)}")