参考一个知名博主,本文章非原创,仅供自我学习
问题
生产和工作
- 很多开发者对Python的
logging模块用得相当随意。要么把所有日志都一股脑儿地往控制台和文件里塞,导致生产环境日志文件膨胀得飞快,缺失对关键信息的检索。 - 要么就是自定义的日志器(Logger)完全失灵,调试的时候死活看不到输出
问题的根源,往往不在于代码逻辑有多复杂,而在于对日志模块最核心的两个设计理念——层次结构(Hierarchy)和传播属性(propagate)——理解得不够透彻。
摘要
本文从实践者的角度,详解Logger层次结构与propagate的5个典型应用场景,比如如何在Django中优雅地分离不同应用的日志,如何在Flask里为第三方库的日志“降噪”,以及如何在CI/CD 流水线中构建清晰、可追溯的日志收集链路。
学习效果:你会发现,合理运用logger.propagate这个看似简单的开关,能瞬间让你的日志从混乱走向有序,从负担变成利器。
1 Logger的家族树与消息传递链
在动手解决具体问题之前,我们得先统一认知。Python的logging模块设计得非常巧妙,它借鉴了包管理的命名空间思想。每个Logger都有一个用点号分隔的名字
如‘app’、‘app.services’、‘app.services.user’。这不仅仅是为了好看,它构建了一棵清晰的“家族树”。
根Logger (名为‘’,也叫root) ├── ‘app’ │ ├── ‘app.api’ │ │ └── ‘app.api.v1’ │ └── ‘app.models’ └── ‘third_party’ └── ‘third_party.requests’在这棵树里,‘app.api.v1’是‘app.api’的儿子,‘app.api’又是‘app’的儿子。这种层级关系直接决定了日志消息的默认流动方向:自下而上。
1.1消息传递
当一个Logger(例如‘app.api.v1’孙子)收到一条日志记录请求时,它会先用自己的处理器(Handler)处理这条消息。然后,一个关键属性propagate开始发挥作用。如果propagate=True(默认值),这条消息就会像接力棒一样,被传递给它的父Logger(‘app.api’),父Logger处理完,如果它的propagate也是True,就再传给它的父Logger(‘app’),以此类推,直到根Logger。如果中途任何一个Logger的propagate=False,传递链就在那里终止。这个不难理解
注意:这里的“处理”指的是Logger上绑定的Handler执行输出操作。父Logger完全不知道消息来自哪个子Logger,它只是按照自己的配置处理接收到的每一条消息。
举例 和 实验平台
Python3 在线工具 | 菜鸟工具
为了直观对比,我们来看一个最简单的代码实验:
import logging # 1. 配置根Logger:输出到控制台,格式带[ROOT]前缀 root_logger = logging.getLogger()# 不传参数相当于是 获取一个全局的根logger root_handler = logging.StreamHandler() # 负责决定往哪发,这个是把他发到控制台 root_handler.setFormatter(logging.Formatter('[ROOT] %(message)s')) #设置消息格式 root_logger.addHandler(root_handler) #交给执行者 root_logger.setLevel(logging.INFO) # 根Logger只关心INFO及以上级别 # 2. 配置父Logger ‘parent’:输出到控制台,格式带[PARENT]前缀 parent_logger = logging.getLogger('parent') parent_handler = logging.StreamHandler() parent_handler.setFormatter(logging.Formatter('[PARENT] %(message)s')) parent_logger.addHandler(parent_handler) parent_logger.setLevel(logging.DEBUG) # parent Logger更敏感,DEBUG级别也记录 # 3. 获取子Logger ‘parent.child’ child_logger = logging.getLogger('parent.child') # 注意:我们没有给child_logger添加任何Handler! print("场景一:propagate为默认值True时") child_logger.warning("一条来自子模块的警告信息") # 输出: # [PARENT] 一条来自子模块的警告信息 # [ROOT] 一条来自子模块的警告信息 print("\n场景二:设置propagate=False时") child_logger.propagate = False child_logger.warning("另一条警告信息") # 输出:(无任何输出,因为child自己没有Handler,且传播被阻断)返回结果
1.2 结论
这个实验揭示了几个关键点:
- 继承性:子Logger(
‘parent.child’)即使没有显式设置级别(Level),在需要记录消息时,也会沿着家族树向上查找,直到找到一个设置了级别的祖先Logger,并采用那个级别。这里它采用了parent_logger的DEBUG级别,因此WARNING消息得以记录。 - 传播的威力:默认情况下,消息会层层上传,被路径上的每一个Logger处理。这就是为什么一条日志可能出现在多个输出目的地(比如控制台和文件)。
- propagate=False的隔离作用:一旦设置,就切断了向上的传递链。如果这个Logger自己没配Handler,那日志就等于被“静默丢弃”了,这是一个常见的坑。
理解了这个基础机制,我们就能游刃有余地应对复杂场景了。也可以将上面的代码作为1个模板来指导我们日后代码的编写。
2. 场景一:在Django多应用项目中实现日志隔离与聚合
这个说实话我还没有学dj,Django项目通常由多个应用(app)组成,
比如user、order、payment。在微服务架构流行之前,我们常常在一个Django项目里维护所有这些功能。日志管理的挑战在于:如何让每个应用的日志既能独立输出到专属文件便于排查,又能将错误级别以上的日志统一汇总到一个总文件供监控系统采集?
错误做法是给每个模块的Logger都添加FileHandler指向同一个文件,这会导致写入冲突或日志错乱。正确思路是利用Logger层次结构和propagate的精细控制。
假设我们的Django项目结构如下:
myproject/ ├── myproject/ # 项目根目录 ├── logs/ # 日志目录 │ ├── app_user.log │ ├── app_order.log │ ├── app_payment.log │ └── errors.log # 所有应用的错误日志汇总 ├── user/ # 用户应用 ├── order/ # 订单应用 └── payment/ # 支付应用我们可以在Django的settings.py中这样配置LOGGING字典:
# settings.py LOGGING = { 'version': 1, 'disable_existing_loggers': False, # 关键!保留第三方库的Logger 'formatters': {...}, # 定义格式器,此处省略 'handlers': { # 为每个应用创建独立的文件处理器 'user_file': { 'level': 'DEBUG', 'class': 'logging.handlers.RotatingFileHandler', 'filename': 'logs/app_user.log', 'formatter': 'verbose', 'maxBytes': 1024*1024*10, # 10MB 'backupCount': 5, }, 'order_file': {...}, # 类似配置,指向logs/app_order.log 'payment_file': {...}, # 类似配置,指向logs/app_payment.log # 统一的错误日志处理器 'error_file': { 'level': 'WARNING', # 只收集WARNING及以上级别 'class': 'logging.handlers.RotatingFileHandler', 'filename': 'logs/errors.log', 'formatter': 'simple', }, 'console': {...}, }, 'loggers': { # 配置 user 应用的Logger 'user': { 'handlers': ['user_file', 'error_file', 'console'], # 输出到专属文件和错误总文件 'level': 'DEBUG', 'propagate': False, # 核心!阻止向更高级别(如django、root)传播,避免重复 }, # 配置 order 应用的Logger 'order': { 'handlers': ['order_file', 'error_file', 'console'], 'level': 'DEBUG', 'propagate': False, }, 'payment': { 'handlers': ['payment_file', 'error_file', 'console'], 'level': 'DEBUG', 'propagate': False, }, # 处理Django框架自身的日志 'django': { 'handlers': ['console'], 'level': 'INFO', 'propagate': False, }, }, }在这个配置里,精髓在于每个应用Logger(如‘user’)的propagate被设置为False。这意味着:
user.views模块的Logger(它是‘user’的子Logger)产生的日志,会由‘user’这个Logger配置的handlers(user_file,error_file,console)来处理。- 处理完毕后,日志不会继续向上传递给
‘django’或根Logger,从而完美避免了在django或根Logger的handlers里再次输出同样的内容,造成重复或混乱。
带来的好处是立竿见影的:
- 排查效率提升:当支付接口出问题时,你直接打开
logs/app_payment.log,里面全是支付相关的DEBUG/INFO日志,上下文清晰,没有用户模块的无关信息干扰。 - 监控聚焦:运维同事只需要监控
logs/errors.log这一个文件,就能看到全项目所有应用产生的WARNING和ERROR,便于设置告警。 - 职责清晰:各个应用的日志配置独立,修改
user应用的日志级别或格式,完全不会影响order应用
3. 场景二:为Flask应用中的第三方库日志“降噪”
Flask项目经常集成大量第三方库,比如requests(用于HTTP请求)、sqlalchemy(ORM)、celery(异步任务)。这些库在调试时可能会输出非常冗长的DEBUG信息,比如requests会打印出每个HTTP请求和响应的全部头部和体部,sqlalchemy会输出每条生成的SQL语句。
在开发环境这很有用,但在生产环境,这些信息不仅无用,还会严重干扰我们查看自己业务逻辑的日志,并迅速撑爆磁盘。我们的目标是:在生产环境,让这些第三方库“闭嘴”或只报告错误;在开发环境,则可以按需开启它们的详细输出。
这需要结合Logger层次结构和环境变量来实现动态配置。第三方库的Logger通常以其导入路径命名,例如:
‘requests’‘sqlalchemy.engine’‘celery’
策略是:将第三方库的Logger级别设高(如WARNING),并阻止其日志传播到我们的根Logger,防止它们污染我们的主日志流。同时,为我们自己的应用Logger配置独立的、级别更低的Handler。
# logging_config.py import logging import os from logging.handlers import RotatingFileHandler def setup_logging(): # 判断环境 is_production = os.getenv('FLASK_ENV') == 'production' # 创建根Logger,并配置一个基础的控制台Handler(可选) root_logger = logging.getLogger() root_logger.setLevel(logging.WARNING) # 根Logger只处理WARNING及以上 # 创建我们应用的主Logger app_logger = logging.getLogger('my_flask_app') app_logger.setLevel(logging.DEBUG if not is_production else logging.INFO) # 应用Logger的处理器:控制台和文件 console_handler = logging.StreamHandler() file_handler = RotatingFileHandler('app.log', maxBytes=10*1024*1024, backupCount=5) # 为不同环境设置不同的格式和级别 if not is_production: console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setLevel(logging.DEBUG) else: console_formatter = logging.Formatter('%(levelname)s: %(message)s') console_handler.setLevel(logging.INFO) file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s') file_handler.setFormatter(file_formatter) console_handler.setFormatter(console_formatter) app_logger.addHandler(file_handler) app_logger.addHandler(console_handler) # 关键步骤:配置第三方库Logger third_party_loggers = ['requests', 'sqlalchemy.engine', 'urllib3', 'celery'] for logger_name in third_party_loggers: lib_logger = logging.getLogger(logger_name) if is_production: lib_logger.setLevel(logging.WARNING) # 生产环境:只显示警告和错误 else: lib_logger.setLevel(logging.INFO) # 开发环境:显示基本信息,关闭DEBUG噪音 lib_logger.propagate = False # 核心!阻止其日志向上传播到我们的app_logger或root_logger # 可以选择不给它们添加Handler,让它们“静默”,或者添加一个单独的调试文件Handler if not is_production: # 开发环境下,可以将第三方库的DEBUG日志重定向到单独文件,不与业务日志混在一起 debug_file_handler = RotatingFileHandler(f'logs/{logger_name}.debug.log', maxBytes=5*1024*1024, backupCount=2) debug_file_handler.setLevel(logging.DEBUG) debug_file_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) lib_logger.addHandler(debug_file_handler) # 最后,将我们应用的Logger的传播也关闭,实现完全自治 app_logger.propagate = False # 在Flask应用工厂函数中调用 from flask import Flask def create_app(): app = Flask(__name__) setup_logging() # 初始化日志配置 # ... 其他初始化 return app通过将requests等Logger的propagate设为False,我们实现了:
- 生产环境降噪:
requests库内部的DEBUG/INFO日志不会出现在我们的app.log或控制台,只有WARNING和ERROR会通过它自己的(未添加的)Handler处理,实际上就是被忽略了,除非我们特意为它添加一个Handler。 - 开发环境精细控制:我们可以将第三方库的详细日志导到单独的文件(如
requests.debug.log),既满足了调试需求,又保证了主业务日志app.log的整洁。 - 清晰的责任边界:
my_flask_app这个Logger只处理我们自己的业务代码日志,配置独立,不受任何第三方库日志行为的影响。
4. 场景三:在异步任务框架(Celery )中构建清晰的日志链路
Celery的日志有点特殊,因为工作进程(Worker)是独立于Web应用启动的。一个常见的痛点是:在Celery Task中打的日志,不知道跑哪个Worker上去了,或者和Web请求的日志混在一起,难以追踪一个异步任务的完整生命周期。
我们的目标是:让每个Celery任务产生的所有日志,无论在哪台机器、哪个Worker进程上执行,都能通过一个唯一的标识(如Task ID)关联起来,并最终汇集到同一个地方(如中心化的日志系统)。
这需要分两步走:
- 在Task内部实现日志上下文注入:为每个任务执行初始化一个独立的Logger或为日志记录添加上下文。
- 控制日志传播,避免Celery Worker根Logger的干扰。
首先,我们可以创建一个自定义的日志过滤器(Filter)来注入Task ID:
# filters.py import logging from celery import current_task class TaskIDFilter(logging.Filter): """一个日志过滤器,为Celery任务日志添加task_id字段""" def filter(self, record): try: # 尝试从当前执行的Celery任务中获取ID record.task_id = current_task.request.id if current_task else 'N/A' except Exception: record.task_id = 'N/A' return True # 始终允许此日志记录通过然后,在Celery的配置中(celery.py或应用工厂函数)设置日志:
# celery.py from celery import Celery from logging.config import dictConfig import os def setup_celery_logging(): log_level = 'DEBUG' if os.getenv('ENV') == 'development' else 'INFO' config = { 'version': 1, 'disable_existing_loggers': False, 'filters': { 'task_id_filter': { '()': 'your_project.filters.TaskIDFilter', # 指向自定义过滤器 } }, 'formatters': { 'task_formatter': { 'format': '%(asctime)s [%(task_id)s] [%(name)s] %(levelname)s: %(message)s' }, 'simple': { 'format': '%(levelname)s: %(message)s' } }, 'handlers': { 'task_file': { 'level': log_level, 'class': 'logging.handlers.RotatingFileHandler', 'filename': '/var/log/celery/tasks.log', 'formatter': 'task_formatter', 'filters': ['task_id_filter'], # 应用过滤器 'maxBytes': 50*1024*1024, 'backupCount': 10, }, 'error_file': { 'level': 'WARNING', 'class': 'logging.handlers.RotatingFileHandler', 'filename': '/var/log/celery/errors.log', 'formatter': 'simple', }, 'console': { 'level': log_level, 'class': 'logging.StreamHandler', 'formatter': 'simple', }, }, 'loggers': { # 这是我们业务Task使用的Logger命名空间 'your_project.tasks': { 'handlers': ['task_file', 'error_file', 'console'], 'level': log_level, 'propagate': False, # 核心!不传播给Celery的根Logger }, # 处理Celery框架自身的日志 'celery': { 'handlers': ['console', 'error_file'], # Celery框架日志不进入task_file,避免混淆 'level': 'INFO', 'propagate': False, }, # 可选:让所有以‘your_project’开头的模块日志都走这个配置 'your_project': { 'handlers': ['task_file', 'error_file', 'console'], 'level': log_level, 'propagate': False, }, }, } dictConfig(config) # 创建Celery应用 app = Celery('your_project') setup_celery_logging() # ... 其他Celery配置在具体的Task模块中,我们这样使用:
# tasks/email_tasks.py import logging from your_project.celery import app # 获取本模块的Logger,它会是‘your_project.tasks.email_tasks’ logger = logging.getLogger(__name__) @app.task(bind=True) # bind=True 使任务能访问self(即任务上下文) def send_welcome_email(self, user_id): # 此时,current_task.request.id 就是 self.request.id logger.info(f"开始为用户 {user_id} 发送欢迎邮件") # ... 发送邮件逻辑 logger.info("邮件发送任务完成")当这个任务执行时,日志文件/var/log/celery/tasks.log中会出现如下格式的行:
当这个任务执行时,日志文件/var/log/celery/tasks.log中会出现如下格式的行:这个方案的优势在于:
- 端到端追踪:通过
task_id,你可以在日志系统中轻松过滤出同一个任务的所有日志,无论它被重试了多少次,在哪个Worker上运行。 - 日志分离:通过将
‘your_project.tasks’的propagate设为False,我们确保了业务日志只被我们自定义的task_fileHandler处理,不会与celery框架自身的日志(如Worker启动、心跳信息)混在一起。celery框架的日志被单独配置到了console和error_file。 - 配置独立:Celery Worker的日志配置与Web应用完全解耦,可以独立管理和滚动。
5. 场景四:在CI/CD流水线中实现分级日志收集与测试报告生成
在持续集成/持续部署(CI/CD)流水线中,日志扮演着双重角色:一是帮助开发者快速定位构建、测试失败的原因;二是作为可分析的资产,用于生成测试报告、计算代码覆盖率等。混乱的日志输出会让日志收集器(如Fluentd、Logstash)难以解析,也让生成的报告杂乱无章。
典型需求是:
- 构建阶段:需要详细日志(DEBUG级别)来排查依赖安装、编译错误。
- 单元测试阶段:需要将测试框架(如pytest)的输出、被测代码的日志、以及最终的测试结果(通过/失败)清晰地分离。
- 部署阶段:只需要INFO和WARNING级别的关键信息。
我们可以通过环境变量动态设置Logger的级别和propagate属性,并将不同阶段的日志导向不同的“管道”。以下是一个基于GitLab CI的.gitlab-ci.yml示例片段,配合Python脚本实现:
首先,在项目的日志工具模块中,我们提供一个灵活的配置函数:
# utils/logging_ci.py import logging import sys import os def setup_ci_logging(phase='test'): """ 为CI/CD流水线设置日志 :param phase: ‘build‘, ‘test‘, ‘deploy‘ """ root_logger = logging.getLogger() # 先清除可能存在的旧Handler,避免重复 for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) # 根据阶段设置根Logger级别 if phase == 'build': root_logger.setLevel(logging.DEBUG) elif phase == 'test': root_logger.setLevel(logging.INFO) # 测试阶段,根Logger收集INFO及以上 else: # deploy root_logger.setLevel(logging.WARNING) # 创建一个流处理器,输出到标准输出(CI Runner会捕获) console_handler = logging.StreamHandler(sys.stdout) if phase == 'test': # 测试阶段,我们使用一个能很好被解析的格式(例如JSON) import json class JsonFormatter(logging.Formatter): def format(self, record): log_record = { 'timestamp': self.formatTime(record), 'phase': phase, 'name': record.name, 'level': record.levelname, 'message': record.getMessage(), } if hasattr(record, 'test_id'): log_record['test_id'] = record.test_id return json.dumps(log_record) console_handler.setFormatter(JsonFormatter()) else: console_handler.setFormatter(logging.Formatter('%(levelname)s - %(name)s - %(message)s')) root_logger.addHandler(console_handler) # **关键配置:处理测试框架和业务代码日志** # 1. 获取pytest的Logger,并阻止其DEBUG日志传播到根Logger,避免控制台过于嘈杂 pytest_logger = logging.getLogger('pytest') if phase == 'test': # 在测试阶段,我们可能希望pytest的详细日志输出到单独文件,而不是控制台 pytest_logger.setLevel(logging.DEBUG) pytest_logger.propagate = False # 阻止传播到root # 可以给pytest添加一个FileHandler,将详细日志写入文件,供深度调试使用 pytest_file_handler = logging.FileHandler(f'pytest_{os.getpid()}.log') pytest_file_handler.setLevel(logging.DEBUG) pytest_file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) pytest_logger.addHandler(pytest_file_handler) else: pytest_logger.setLevel(logging.WARNING) pytest_logger.propagate = False # 2. 我们业务代码的Logger,在测试阶段也控制其传播 app_logger = logging.getLogger('my_app') if phase == 'test': app_logger.setLevel(logging.DEBUG) app_logger.propagate = False # 业务日志不干扰pytest的JSON输出 # 为业务日志也添加一个文件Handler,便于和测试结果关联 app_file_handler = logging.FileHandler(f'app_test_{os.getpid()}.log') app_file_handler.setLevel(logging.DEBUG) app_logger.addHandler(app_file_handler) # 在其他阶段,业务日志会正常传播到根Logger,由console_handler处理然后,在GitLab CI的配置中调用
# .gitlab-ci.yml variables: LOG_PHASE: $CI_JOB_STAGE # GitLab CI预定义变量,值为 ‘build‘, ‘test‘, ‘deploy‘ 等 stages: - build - test - deploy build-job: stage: build script: - python -c "from utils.logging_ci import setup_ci_logging; setup_ci_logging(phase='$LOG_PHASE')" - echo "开始安装依赖..." - pip install -r requirements.txt # 这里的pip输出会被我们的DEBUG级别日志捕获 - echo "构建完成。" unit-test-job: stage: test script: - python -c "from utils.logging_ci import setup_ci_logging; setup_ci_logging(phase='$LOG_PHASE')" - echo "开始运行单元测试..." - pytest --tb=short --junitxml=report.xml 2>&1 | tee pytest_output.log # pytest的输出会被我们配置的pytest_logger捕获到文件,而JSON格式的业务日志输出到控制台 artifacts: reports: junit: report.xml # GitLab能解析JUnit格式的测试报告 paths: - pytest_*.log # 收集pytest的详细日志 - app_test_*.log # 收集业务代码的测试日志 expire_in: 1 week deploy-job: stage: deploy script: - python -c "from utils.logging_ci import setup_ci_logging; setup_ci_logging(phase='$LOG_PHASE')" - echo "开始部署,仅显示WARNING及以上日志..." - ansible-playbook deploy.yml # Ansible的输出会被我们的日志系统过滤在这个流程中,propagate=False起到了日志路由的作用:
- 在
test阶段,pytestLogger和my_appLogger的日志被限制在各自的文件Handler中,不会向上传播干扰根Logger的JSON格式输出。这样,CI Runner控制台看到的是整洁的、结构化的JSON日志流,便于后续的日志收集器(如ELK Stack)直接摄取和索引。 - 根Logger的JSON输出,包含了阶段(
phase)、Logger名(name)、级别(level)等标准字段,可以轻松地与report.xml(JUnit测试报告)中的测试用例关联起来,实现日志与测试结果的联动分析。
6. 场景五:构建微服务间可追溯的分布式日志上下文
在微服务架构下,一个用户请求可能穿越A、B、C多个服务。当出现问题时,你需要像刑侦专家一样,把散落在各服务日志中的相关条目“串”起来,还原完整的调用链。这光靠propagate不够,需要引入分布式追踪的概念,但propagate可以帮助我们在单个服务内,将追踪上下文(如Trace ID)无损地传递下去。
假设我们使用OpenTelemetry或类似机制生成一个trace_id,并通过HTTP头在服务间传递。在每个服务内部,我们需要将这个trace_id注入到每一条相关的日志中,无论这条日志来自主程序、工具函数还是深层嵌套的库。
挑战在于:你不可能去修改每个打印日志的语句。解决方案是使用日志过滤器(Filter)和Logger适配器(LoggerAdapter),并结合propagate确保上下文能覆盖所有需要的地方。
首先,创建一个能存储追踪上下文的类,并提供一个过滤器:
然后,在服务启动时(如FastAPI 的中间件、Flask的before_request),配置日志并设置trace_id:
# app.py (FastAPI示例) from fastapi import FastAPI, Request import logging from tracing import set_trace_id, TraceIDFilter from contextlib import asynccontextmanager @asynccontextmanager async def lifespan(app: FastAPI): # 启动时配置日志 setup_logging() yield # 关闭时清理 def setup_logging(): """应用级别的日志配置""" root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # 移除默认Handler(如果有) for hdlr in root_logger.handlers[:]: root_logger.removeHandler(hdlr) # 创建控制台Handler,格式中包含trace_id console_handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s [%(trace_id)s] [%(name)s] %(levelname)s: %(message)s') console_handler.setFormatter(formatter) # 创建文件Handler,用于持久化 file_handler = logging.handlers.RotatingFileHandler('service.log', maxBytes=10*1024*1024, backupCount=5) file_handler.setFormatter(formatter) # 将过滤器添加到Handler,而不是Logger,这样更灵活 trace_filter = TraceIDFilter() console_handler.addFilter(trace_filter) file_handler.addFilter(trace_filter) root_logger.addHandler(console_handler) root_logger.addHandler(file_handler) # **关键配置:为特定第三方库Logger设置propagate=True,让它们也能带上trace_id** # 例如,数据库ORM库的日志对我们追踪SQL很有用 sqlalchemy_logger = logging.getLogger('sqlalchemy.engine') sqlalchemy_logger.setLevel(logging.WARNING) # 生产环境只记录WARNING以上 sqlalchemy_logger.propagate = True # 允许传播到根Logger,从而使用根Logger的Handler和Filter # 对于我们自己的业务模块,也确保它们传播 app_logger = logging.getLogger('my_service') app_logger.setLevel(logging.DEBUG) app_logger.propagate = True # 业务日志传播到根Logger # 对于一些我们不想收集的、非常底层的库,可以关闭传播以减少噪音 urllib3_logger = logging.getLogger('urllib3') urllib3_logger.setLevel(logging.WARNING) urllib3_logger.propagate = False # 不传播,除非我们单独为它添加带trace_id的Handler app = FastAPI(lifespan=lifespan) @app.middleware("http") async def add_trace_id(request: Request, call_next): # 从请求头获取或生成trace_id trace_id = request.headers.get('X-Trace-ID', None) trace_id = set_trace_id(trace_id) # 设置到上下文 response = await call_next(request) # 将trace_id添加到响应头,方便下游服务使用 response.headers['X-Trace-ID'] = trace_id return response @app.get("/") async def read_root(): # 业务代码中正常使用logging即可 logger = logging.getLogger(__name__) # 名称会是 ‘my_service.app‘ logger.info("处理根路径请求") # ... 一些数据库操作,sqlalchemy的日志也会带上相同的trace_id return {"message": "Hello World"}在这个配置下,当一个携带X-Trace-ID: abc123的请求进入服务:
- 中间件将
abc123设置到上下文变量。 - 根Logger的两个Handler都附加了
TraceIDFilter,该过滤器会为每一条经过的LogRecord添加trace_id字段。 - 由于
sqlalchemy.engineLogger的propagate=True,它产生的WARNING级别SQL日志会向上传播到根Logger,从而被根Logger的Handler处理,并自动注入trace_id。 - 我们业务代码的Logger(如
my_service.app)也因为propagate=True,其日志会传播到根Logger并带上trace_id。
最终,在service.log和控制台中,你会看到:
2023-10-27 15:45:01,123 [abc123] [my_service.app] INFO: 处理根路径请求 2023-10-27 15:45:01,124 [abc123] [sqlalchemy.engine] WARNING: 慢查询警告: SELECT * FROM large_table ...这样一来,无论日志来自框架、第三方库还是业务代码,只要它们最终通过根Logger的Handler输出,就会自动携带统一的trace_id。在日志聚合系统(如Loki、ELK)中,你只需搜索trace_id:abc123,就能一次性拉出这个请求在当前服务内涉及的所有日志条目,为分布式排查提供了坚实的基础。而propagate的灵活设置,让你可以精确控制哪些组件的日志能享受到这种自动注入上下文的便利,哪些则需要被隔离或静默处理。