📋 本文概览
学习目标:
- 掌握Prometheus监控系统的核心概念和部署
- 学会定义和采集自定义业务指标
- 实现Grafana可视化仪表盘配置
- 构建智能告警规则和通知系统
- 掌握性能瓶颈分析和优化方法
- 理解监控系统的最佳实践
技术栈:
- Prometheus(指标采集和存储)
- Grafana(可视化仪表盘)
- AlertManager(告警管理)
- Python Client(指标暴露)
- FastAPI(集成监控)
- Docker(容器化部署)
预计阅读时间:70分钟
前置知识要求:
- 了解时间序列数据库概念
- 熟悉Docker容器化部署
- 掌握FastAPI框架
- 理解工作流执行引擎(参考第4-5篇)
- 了解日志系统设计(参考第17篇)
🎯 业务场景
为什么需要性能监控系统?
在工作流自动化平台中,性能监控是保障系统稳定性和用户体验的关键。没有监控,就像盲人开车。
场景1:系统性能下降
问题: - 工作流执行时间突然变长 - 用户投诉系统响应慢 - 不知道哪个环节出了问题 传统做法: - 查看日志(海量日志难以分析) - 手动测试(无法复现问题) - 凭经验猜测(效率低下) 监控解决方案: - 实时性能指标图表 - 自动异常检测 - 精准定位瓶颈 - 历史趋势分析场景2:资源使用异常
问题: - 数据库连接池耗尽 - Redis内存占用过高 - Celery队列堆积 - CPU/内存飙升 Prometheus监控: - 连接池使用率:85% → 告警 - Redis内存:4GB/8GB → 正常 - Celery队列长度:1000+ → 告警 - CPU使用率:90% → 告警场景3:业务指标异常
需求: - 监控工作流执行成功率 - 追踪节点执行时长 - 统计用户活跃度 - 分析错误类型分布 Grafana仪表盘: ┌─────────────────────────────────────┐ │ 工作流执行成功率:97.5% ↓ (昨日98.2%)│ │ 平均执行时长:2.3s ↑ (昨日1.8s) │ │ 活跃用户数:1,234 ↑ (昨日1,100) │ │ 错误率:2.5% ↑ (昨日1.8%) │ └─────────────────────────────────────┘业界解决方案对比
| 方案 | 优势 | 劣势 | 成本 | 适用场景 |
|---|---|---|---|---|
| Prometheus + Grafana | 开源免费、生态丰富、灵活强大 | 需要自己运维 | 低 | 中小型项目 |
| DataDog | 功能全面、易用性好、SaaS服务 | 价格昂贵 | 高 | 大型企业 |
| New Relic | APM专业、深度追踪 | 价格高、学习曲线陡 | 高 | 企业级应用 |
| CloudWatch | AWS原生、集成好 | 仅限AWS、功能有限 | 中 | AWS环境 |
| 自研监控 | 高度定制 | 开发成本高、维护困难 | 高 | 特殊需求 |
🏗️ 架构设计
整体架构图
graph TB subgraph "应用层" APP[FastAPI应用] WORKER[Celery Worker] ENGINE[工作流引擎] end subgraph "指标暴露" APP_METRICS[/metrics端点] WORKER_METRICS[Worker指标] CUSTOM_METRICS[自定义指标] end subgraph "Prometheus" PROM[Prometheus Server] SCRAPE[指标抓取] TSDB[(时序数据库)] RULES[告警规则] end subgraph "AlertManager" AM[AlertManager] ROUTE[路由规则] NOTIFY[通知渠道] end subgraph "Grafana" GRAFANA[Grafana Server] DASHBOARD[仪表盘] PANEL[图表面板] end subgraph "通知渠道" EMAIL[邮件] SLACK[Slack] WEBHOOK[Webhook] SMS[短信] end APP --> APP_METRICS WORKER --> WORKER_METRICS ENGINE --> CUSTOM_METRICS APP_METRICS --> SCRAPE WORKER_METRICS --> SCRAPE CUSTOM_METRICS --> SCRAPE SCRAPE --> PROM PROM --> TSDB PROM --> RULES RULES --> AM AM --> ROUTE ROUTE --> NOTIFY NOTIFY --> EMAIL NOTIFY --> SLACK NOTIFY --> WEBHOOK NOTIFY --> SMS TSDB --> GRAFANA GRAFANA --> DASHBOARD DASHBOARD --> PANEL style PROM fill:#E6522C style GRAFANA fill:#F46800 style AM fill:#00D1B2 style TSDB fill:#3B82F6核心模块说明
1. 指标采集层
- 应用指标:HTTP请求、响应时间、错误率
- 系统指标:CPU、内存、磁盘、网络
- 业务指标:工作流执行、节点性能、用户行为
- 中间件指标:数据库、Redis、Celery
2. Prometheus Server
- 指标抓取:定期拉取各服务的/metrics端点
- 时序存储:高效存储时间序列数据
- 查询引擎:PromQL查询语言
- 告警评估:实时评估告警规则
3. AlertManager
- 告警聚合:合并相似告警
- 告警路由:根据标签分发告警
- 告警抑制:避免告警风暴
- 通知发送:多渠道通知
4. Grafana
- 数据源管理:连接Prometheus
- 仪表盘设计:可视化配置
- 图表展示:多种图表类型
- 告警集成:可视化告警
数据流图
sequenceDiagram participant App as FastAPI应用 participant Metrics as /metrics端点 participant Prom as Prometheus participant TSDB as 时序数据库 participant Rules as 告警规则 participant AM as AlertManager participant Grafana as Grafana participant User as 用户 App->>Metrics: 记录指标 Note over App,Metrics: Counter, Gauge, Histogram loop 每15秒 Prom->>Metrics: 抓取指标 Metrics-->>Prom: 返回指标数据 Prom->>TSDB: 存储时序数据 end Prom->>Rules: 评估告警规则 alt 触发告警 Rules->>AM: 发送告警 AM->>AM: 聚合/路由/抑制 AM->>User: 发送通知 end User->>Grafana: 访问仪表盘 Grafana->>Prom: 查询指标 Prom->>TSDB: 读取数据 TSDB-->>Prom: 返回数据 Prom-->>Grafana: 返回查询结果 Grafana-->>User: 展示图表💻 代码实现
1. Prometheus指标定义
# monitoring/metrics.py from prometheus_client import Counter, Gauge, Histogram, Summary from prometheus_client import CollectorRegistry, generate_latest from typing import Dict, Any import time from functools import wraps # 创建注册表 registry = CollectorRegistry() # ============================================================================ # HTTP请求指标 # ============================================================================ # 请求总数(Counter:只增不减) http_requests_total = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'], registry=registry ) # 请求延迟(Histogram:分布统计) http_request_duration_seconds = Histogram( 'http_request_duration_seconds', 'HTTP request latency', ['method', 'endpoint'], buckets=(0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0), registry=registry ) # 活跃请求数(Gauge:可增可减) http_requests_in_progress = Gauge( 'http_requests_in_progress', 'HTTP requests in progress', ['method', 'endpoint'], registry=registry ) # ============================================================================ # 工作流执行指标 # ============================================================================ # 工作流执行总数 workflow_executions_total = Counter( 'workflow_executions_total', 'Total workflow executions', ['workflow_id', 'status'], registry=registry ) # 工作流执行时长 workflow_execution_duration_seconds = Histogram( 'workflow_execution_duration_seconds', 'Workflow execution duration', ['workflow_id'], buckets=(1, 5, 10, 30, 60, 120, 300, 600), registry=registry ) # 正在执行的工作流数 workflow_executions_in_progress = Gauge( 'workflow_executions_in_progress', 'Workflows currently executing', registry=registry ) # 工作流执行成功率(自定义计算) workflow_success_rate = Gauge( 'workflow_success_rate', 'Workflow success rate', ['workflow_id'], registry=registry ) # ============================================================================ # 节点执行指标 # ============================================================================ # 节点执行总数 node_executions_total = Counter( 'node_executions_total', 'Total node executions', ['node_type', 'status'], registry=registry ) # 节点执行时长 node_execution_duration_seconds = Histogram( 'node_execution_duration_seconds', 'Node execution duration', ['node_type'], buckets=(0.1, 0.5, 1, 2, 5, 10, 30), registry=registry ) # ============================================================================ # 数据库指标 # ============================================================================ # 数据库连接池 db_connections_total = Gauge( 'db_connections_total', 'Total database connections', registry=registry ) db_connections_in_use = Gauge( 'db_connections_in_use', 'Database connections in use', registry=registry ) db_connections_idle = Gauge( 'db_connections_idle', 'Idle database connections', registry=registry ) # 数据库查询 db_query_duration_seconds = Histogram( 'db_query_duration_seconds', 'Database query duration', ['operation'], buckets=(0.001, 0.01, 0.05, 0.1, 0.5, 1.0), registry=registry ) # ============================================================================ # Redis指标 # ============================================================================ # Redis操作 redis_operations_total = Counter( 'redis_operations_total', 'Total Redis operations', ['operation', 'status'], registry=registry ) redis_operation_duration_seconds = Histogram( 'redis_operation_duration_seconds', 'Redis operation duration', ['operation'], buckets=(0.001, 0.005, 0.01, 0.05, 0.1), registry=registry ) # Redis内存使用 redis_memory_used_bytes = Gauge( 'redis_memory_used_bytes', 'Redis memory used in bytes', registry=registry ) # ============================================================================ # Celery指标 # ============================================================================ # Celery任务 celery_tasks_total = Counter( 'celery_tasks_total', 'Total Celery tasks', ['task_name', 'status'], registry=registry ) celery_task_duration_seconds = Histogram( 'celery_task_duration_seconds', 'Celery task duration', ['task_name'], buckets=(1, 5, 10, 30, 60, 300), registry=registry ) # Celery队列长度 celery_queue_length = Gauge( 'celery_queue_length', 'Celery queue length', ['queue_name'], registry=registry ) # ============================================================================ # 业务指标 # ============================================================================ # 活跃用户数 active_users = Gauge( 'active_users', 'Number of active users', registry=registry ) # 用户操作 user_operations_total = Counter( 'user_operations_total', 'Total user operations', ['operation_type'], registry=registry ) # 错误率 error_rate = Gauge( 'error_rate', 'Error rate', ['error_type'], registry=registry ) # ============================================================================ # 装饰器:自动记录指标 # ============================================================================ def track_request_metrics(func): """HTTP请求指标跟踪装饰器""" @wraps(func) async def wrapper(*args, **kwargs): method = kwargs.get('request').method if 'request' in kwargs else 'UNKNOWN' endpoint = func.__name__ # 增加活跃请求数 http_requests_in_progress.labels(method=method, endpoint=endpoint).inc() start_time = time.time() status = 'success' try: result = await func(*args, **kwargs) return result except Exception as e: status = 'error' raise finally: # 记录请求时长 duration = time.time() - start_time http_request_duration_seconds.labels( method=method, endpoint=endpoint ).observe(duration) # 增加请求总数 http_requests_total.labels( method=method, endpoint=endpoint, status=status ).inc() # 减少活跃请求数 http_requests_in_progress.labels(method=method, endpoint=endpoint).dec() return wrapper def track_workflow_metrics(func): """工作流执行指标跟踪装饰器""" @wraps(func) async def wrapper(workflow_id: str, *args, **kwargs): # 增加正在执行的工作流数 workflow_executions_in_progress.inc() start_time = time.time() status = 'success' try: result = await func(workflow_id, *args, **kwargs) return result except Exception as e: status = 'failure' raise finally: # 记录执行时长 duration = time.time() - start_time workflow_execution_duration_seconds.labels( workflow_id=workflow_id ).observe(duration) # 增加执行总数 workflow_executions_total.labels( workflow_id=workflow_id, status=status ).inc() # 减少正在执行的工作流数 workflow_executions_in_progress.dec() # 更新成功率(简化计算) total = workflow_executions_total.labels( workflow_id=workflow_id, status='success' )._value.get() + workflow_executions_total.labels( workflow_id=workflow_id, status='failure' )._value.get() success = workflow_executions_total.labels( workflow_id=workflow_id, status='success' )._value.get() if total > 0: workflow_success_rate.labels(workflow_id=workflow_id).set( success / total ) return wrapper def track_node_metrics(node_type: str): """节点执行指标跟踪装饰器""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() status = 'success' try: result = await func(*args, **kwargs) return result except Exception as e: status = 'failure' raise finally: # 记录执行时长 duration = time.time() - start_time node_execution_duration_seconds.labels( node_type=node_type ).observe(duration) # 增加执行总数 node_executions_total.labels( node_type=node_type, status=status ).inc() return wrapper return decorator # ============================================================================ # 指标导出 # ============================================================================ def get_metrics() -> bytes: """获取Prometheus格式的指标""" return generate_latest(registry)2. FastAPI集成
# api/monitoring.py from fastapi import APIRouter, Response from monitoring.metrics import ( get_metrics, track_request_metrics, db_connections_total, db_connections_in_use, db_connections_idle, redis_memory_used_bytes, celery_queue_length, active_users ) from database import engine from redis_client import redis from celery_app import celery_app import asyncio router = APIRouter() @router.get("/metrics") async def metrics(): """ Prometheus指标端点 返回Prometheus格式的指标数据 """ # 更新数据库连接池指标 pool = engine.pool db_connections_total.set(pool.size()) db_connections_in_use.set(pool.checkedout()) db_connections_idle.set(pool.size() - pool.checkedout()) # 更新Redis内存指标 redis_info = await redis.info('memory') redis_memory_used_bytes.set(redis_info['used_memory']) # 更新Celery队列长度 inspect = celery_app.control.inspect() active_tasks = inspect.active() if active_tasks: for queue_name, tasks in active_tasks.items(): celery_queue_length.labels(queue_name=queue_name).set(len(tasks)) # 更新活跃用户数(示例) # 实际应该从Redis或数据库查询 active_users.set(1234) # 返回指标 metrics_data = get_metrics() return Response(content=metrics_data, media_type="text/plain") @router.get("/health") @track_request_metrics async def health_check(): """健康检查端点""" return { "status": "healthy", "timestamp": datetime.now().isoformat() }3. 工作流引擎集成
# engine/workflow_engine.py from monitoring.metrics import ( track_workflow_metrics, track_node_metrics, workflow_executions_in_progress, node_executions_total ) class WorkflowEngine: """工作流执行引擎(集成监控)""" @track_workflow_metrics async def execute_workflow(self, workflow_id: str): """ 执行工作流 自动记录执行指标 """ workflow = await self.load_workflow(workflow_id) dag = self.build_dag(workflow) execution_order = list(nx.topological_sort(dag)) results = {} for node_id in execution_order: node = dag.nodes[node_id]['data'] results[node_id] = await self.execute_node(node, results) return results async def execute_node(self, node, context): """执行节点(根据类型应用不同的监控)""" node_type = node.type # 应用节点类型特定的监控 if node_type == 'http_request': return await self._execute_http_node(node, context) elif node_type == 'database': return await self._execute_db_node(node, context) # ... 其他节点类型 @track_node_metrics('http_request') async def _execute_http_node(self, node, context): """执行HTTP请求节点""" # 实现HTTP请求逻辑 pass @track_node_metrics('database') async def _execute_db_node(self, node, context): """执行数据库节点""" # 实现数据库操作逻辑 pass4. Docker Compose配置
# docker-compose.monitoring.yml version: '3.8' services: # Prometheus服务 prometheus: image: prom/prometheus:latest container_name: quantumflow-prometheus ports: - "9090:9090" volumes: - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml - ./monitoring/rules:/etc/prometheus/rules - prometheus-data:/prometheus command: - '--config.file=/etc/prometheus/prometheus.yml' - '--storage.tsdb.path=/prometheus' - '--web.console.libraries=/usr/share/prometheus/console_libraries' - '--web.console.templates=/usr/share/prometheus/consoles' - '--storage.tsdb.retention.time=30d' networks: - monitoring restart: unless-stopped # Grafana服务 grafana: image: grafana/grafana:latest container_name: quantumflow-grafana ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_USER=admin - GF_SECURITY_ADMIN_PASSWORD=admin123 - GF_USERS_ALLOW_SIGN_UP=false volumes: - ./monitoring/grafana/provisioning:/etc/grafana/provisioning - ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards - grafana-data:/var/lib/grafana networks: - monitoring restart: unless-stopped depends_on: - prometheus # AlertManager服务 alertmanager: image: prom/alertmanager:latest container_name: quantumflow-alertmanager ports: - "9093:9093" volumes: - ./monitoring/alertmanager.yml:/etc/alertmanager/alertmanager.yml - alertmanager-data:/alertmanager command: - '--config.file=/etc/alertmanager/alertmanager.yml' - '--storage.path=/alertmanager' networks: - monitoring restart: unless-stopped # Node Exporter(系统指标) node-exporter: image: prom/node-exporter:latest container_name: quantumflow-node-exporter ports: - "9100:9100" command: - '--path.procfs=/host/proc' - '--path.sysfs=/host/sys' - '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)' volumes: - /proc:/host/proc:ro - /sys:/host/sys:ro - /:/rootfs:ro networks: - monitoring restart: unless-stopped volumes: prometheus-data: grafana-data: alertmanager-data: networks: monitoring: driver: bridge5. Prometheus配置
# monitoring/prometheus.yml global: scrape_interval: 15s evaluation_interval: 15s external_labels: cluster: 'quantumflow-production' environment: 'production' # 告警规则文件 rule_files: - '/etc/prometheus/rules/*.yml' # AlertManager配置 alerting: alertmanagers: - static_configs: - targets: - alertmanager:9093 # 抓取配置 scrape_configs: # Prometheus自身 - job_name: 'prometheus' static_configs: - targets: ['localhost:9090'] # FastAPI应用 - job_name: 'quantumflow-api' static_configs: - targets: ['api:8000'] metrics_path: '/api/metrics' scrape_interval: 10s # Celery Worker - job_name: 'quantumflow-worker' static_configs: - targets: ['worker:9091'] scrape_interval: 10s # Node Exporter(系统指标) - job_name: 'node-exporter' static_configs: - targets: ['node-exporter:9100'] # PostgreSQL Exporter - job_name: 'postgres' static_configs: - targets: ['postgres-exporter:9187'] # Redis Exporter - job_name: 'redis' static_configs: - targets: ['redis-exporter:9121']6. 告警规则配置
# monitoring/rules/alerts.yml groups: - name: quantumflow_alerts interval: 30s rules: # ======================================== # 系统级告警 # ======================================== # CPU使用率过高 - alert: HighCPUUsage expr: 100 - (avg by(instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) > 80 for: 5m labels: severity: warning category: system annotations: summary: "CPU使用率过高" description: "实例 {{ $labels.instance }} CPU使用率为 {{ $value }}%" # 内存使用率过高 - alert: HighMemoryUsage expr: (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100 > 85 for: 5m labels: severity: warning category: system annotations: summary: "内存使用率过高" description: "实例 {{ $labels.instance }} 内存使用率为 {{ $value }}%" # 磁盘空间不足 - alert: LowDiskSpace expr: (1 - (node_filesystem_avail_bytes / node_filesystem_size_bytes)) * 100 > 85 for: 10m labels: severity: critical category: system annotations: summary: "磁盘空间不足" description: "实例 {{ $labels.instance }} 磁盘使用率为 {{ $value }}%" # ======================================== # 应用级告警 # ======================================== # HTTP错误率过高 - alert: HighHTTPErrorRate expr: | ( sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) ) * 100 > 5 for: 5m labels: severity: critical category: application annotations: summary: "HTTP错误率过高" description: "HTTP 5xx错误率为 {{ $value }}%" # 请求延迟过高 - alert: HighRequestLatency expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2 for: 5m labels: severity: warning category: application annotations: summary: "请求延迟过高" description: "95分位延迟为 {{ $value }}秒" # 活跃请求数过多 - alert: TooManyActiveRequests expr: sum(http_requests_in_progress) > 100 for: 5m labels: severity: warning category: application annotations: summary: "活跃请求数过多" description: "当前活跃请求数为 {{ $value }}" # ======================================== # 工作流告警 # ======================================== # 工作流执行失败率过高 - alert: HighWorkflowFailureRate expr: | ( sum(rate(workflow_executions_total{status="failure"}[10m])) / sum(rate(workflow_executions_total[10m])) ) * 100 > 10 for: 10m labels: severity: critical category: workflow annotations: summary: "工作流失败率过高" description: "工作流失败率为 {{ $value }}%" # 工作流执行时长过长 - alert: SlowWorkflowExecution expr: histogram_quantile(0.95, rate(workflow_execution_duration_seconds_bucket[10m])) > 300 for: 10m labels: severity: warning category: workflow annotations: summary: "工作流执行时长过长" description: "95分位执行时长为 {{ $value }}秒" # 正在执行的工作流过多 - alert: TooManyActiveWorkflows expr: workflow_executions_in_progress > 50 for: 5m labels: severity: warning category: workflow annotations: summary: "正在执行的工作流过多" description: "当前正在执行 {{ $value }} 个工作流" # ======================================== # 数据库告警 # ======================================== # 数据库连接池耗尽 - alert: DatabaseConnectionPoolExhausted expr: (db_connections_in_use / db_connections_total) * 100 > 90 for: 5m labels: severity: critical category: database annotations: summary: "数据库连接池即将耗尽" description: "连接池使用率为 {{ $value }}%" # 数据库查询慢 - alert: SlowDatabaseQueries expr: histogram_quantile(0.95, rate(db_query_duration_seconds_bucket[5m])) > 1 for: 5m labels: severity: warning category: database annotations: summary: "数据库查询慢" description: "95分位查询时长为 {{ $value }}秒" # ======================================== # Redis告警 # ======================================== # Redis内存使用率过高 - alert: HighRedisMemoryUsage expr: (redis_memory_used_bytes / redis_memory_max_bytes) * 100 > 85 for: 5m labels: severity: warning category: redis annotations: summary: "Redis内存使用率过高" description: "Redis内存使用率为 {{ $value }}%" # ======================================== # Celery告警 # ======================================== # Celery队列堆积 - alert: CeleryQueueBacklog expr: celery_queue_length > 1000 for: 10m labels: severity: warning category: celery annotations: summary: "Celery队列堆积" description: "队列 {{ $labels.queue_name }} 长度为 {{ $value }}" # Celery任务失败率过高 - alert: HighCeleryTaskFailureRate expr: | ( sum(rate(celery_tasks_total{status="failure"}[10m])) / sum(rate(celery_tasks_total[10m])) ) * 100 > 10 for: 10m labels: severity: critical category: celery annotations: summary: "Celery任务失败率过高" description: "任务失败率为 {{ $value }}%"7. AlertManager配置
# monitoring/alertmanager.yml global: resolve_timeout: 5m smtp_smarthost: 'smtp.gmail.com:587' smtp_from: 'alerts@quantumflow.com' smtp_auth_username: 'alerts@quantumflow.com' smtp_auth_password: 'your-password' # 告警路由 route: group_by: ['alertname', 'cluster', 'service'] group_wait: 10s group_interval: 10s repeat_interval: 12h receiver: 'default' routes: # 系统级告警 -> 运维团队 - match: category: system receiver: 'ops-team' continue: true # 应用级告警 -> 开发团队 - match: category: application receiver: 'dev-team' continue: true # 工作流告警 -> 产品团队 - match: category: workflow receiver: 'product-team' continue: true # 严重告警 -> 所有人 - match: severity: critical receiver: 'all-teams' # 告警接收器 receivers: - name: 'default' email_configs: - to: 'team@quantumflow.com' - name: 'ops-team' email_configs: - to: 'ops@quantumflow.com' slack_configs: - api_url: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL' channel: '#ops-alerts' title: '运维告警' text: '{{ range .Alerts }}{{ .Annotations.summary }}\n{{ end }}' - name: 'dev-team' email_configs: - to: 'dev@quantumflow.com' slack_configs: - api_url: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL' channel: '#dev-alerts' title: '开发告警' - name: 'product-team' email_configs: - to: 'product@quantumflow.com' slack_configs: - api_url: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL' channel: '#product-alerts' title: '产品告警' - name: 'all-teams' email_configs: - to: 'team@quantumflow.com' slack_configs: - api_url: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL' channel: '#critical-alerts' title: '🚨 严重告警' webhook_configs: - url: 'http://api:8000/api/alerts/webhook' # 告警抑制规则 inhibit_rules: # 如果有严重告警,抑制警告级别的相同告警 - source_match: severity: 'critical' target_match: severity: 'warning' equal: ['alertname', 'instance']8. Grafana仪表盘配置
{ "dashboard": { "id": null, "uid": "quantumflow-overview", "title": "QuantumFlow 系统概览", "tags": ["quantumflow", "overview"], "timezone": "browser", "schemaVersion": 16, "version": 1, "refresh": "30s", "panels": [ { "id": 1, "title": "HTTP请求速率", "type": "graph", "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}, "targets": [ { "expr": "sum(rate(http_requests_total[5m])) by (status)", "legendFormat": "{{ status }}", "refId": "A" } ], "yaxes": [ {"format": "reqps", "label": "请求/秒"}, {"format": "short"} ] }, { "id": 2, "title": "HTTP请求延迟", "type": "graph", "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}, "targets": [ { "expr": "histogram_quantile(0.50, rate(http_request_duration_seconds_bucket[5m]))", "legendFormat": "P50", "refId": "A" }, { "expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))", "legendFormat": "P95", "refId": "B" }, { "expr": "histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))", "legendFormat": "P99", "refId": "C" } ], "yaxes": [ {"format": "s", "label": "延迟"}, {"format": "short"} ] }, { "id": 3, "title": "工作流执行成功率", "type": "stat", "gridPos": {"h": 4, "w": 6, "x": 0, "y": 8}, "targets": [ { "expr": "avg(workflow_success_rate) * 100", "refId": "A" } ], "options": { "graphMode": "area", "colorMode": "value", "unit": "percent", "thresholds": { "mode": "absolute", "steps": [ {"value": 0, "color": "red"}, {"value": 90, "color": "yellow"}, {"value": 95, "color": "green"} ] } } }, { "id": 4, "title": "活跃工作流数", "type": "stat", "gridPos": {"h": 4, "w": 6, "x": 6, "y": 8}, "targets": [ { "expr": "workflow_executions_in_progress", "refId": "A" } ], "options": { "graphMode": "area", "colorMode": "value", "unit": "short" } }, { "id": 5, "title": "数据库连接池", "type": "gauge", "gridPos": {"h": 4, "w": 6, "x": 12, "y": 8}, "targets": [ { "expr": "(db_connections_in_use / db_connections_total) * 100", "refId": "A" } ], "options": { "showThresholdLabels": false, "showThresholdMarkers": true, "unit": "percent", "thresholds": { "mode": "absolute", "steps": [ {"value": 0, "color": "green"}, {"value": 70, "color": "yellow"}, {"value": 90, "color": "red"} ] } } }, { "id": 6, "title": "Celery队列长度", "type": "stat", "gridPos": {"h": 4, "w": 6, "x": 18, "y": 8}, "targets": [ { "expr": "sum(celery_queue_length)", "refId": "A" } ], "options": { "graphMode": "area", "colorMode": "value", "unit": "short", "thresholds": { "mode": "absolute", "steps": [ {"value": 0, "color": "green"}, {"value": 500, "color": "yellow"}, {"value": 1000, "color": "red"} ] } } }, { "id": 7, "title": "节点执行时长分布", "type": "heatmap", "gridPos": {"h": 8, "w": 24, "x": 0, "y": 12}, "targets": [ { "expr": "sum(rate(node_execution_duration_seconds_bucket[5m])) by (le, node_type)", "format": "heatmap", "refId": "A" } ] }, { "id": 8, "title": "系统资源使用", "type": "graph", "gridPos": {"h": 8, "w": 12, "x": 0, "y": 20}, "targets": [ { "expr": "100 - (avg(rate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)", "legendFormat": "CPU使用率", "refId": "A" }, { "expr": "(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100", "legendFormat": "内存使用率", "refId": "B" } ], "yaxes": [ {"format": "percent", "label": "使用率"}, {"format": "short"} ] }, { "id": 9, "title": "错误类型分布", "type": "piechart", "gridPos": {"h": 8, "w": 12, "x": 12, "y": 20}, "targets": [ { "expr": "sum(rate(node_executions_total{status=\"failure\"}[1h])) by (node_type)", "refId": "A" } ] } ] } }🧪 测试验证
性能测试脚本
# tests/performance_test.py import asyncio import aiohttp import time from typing import List import statistics async def test_api_performance( url: str, num_requests: int = 1000, concurrency: int = 10 ): """ API性能测试 Args: url: 测试URL num_requests: 总请求数 concurrency: 并发数 """ async def make_request(session, request_id): start_time = time.time() try: async with session.get(url) as response: await response.text() duration = time.time() - start_time return { 'request_id': request_id, 'status': response.status, 'duration': duration, 'success': response.status == 200 } except Exception as e: duration = time.time() - start_time return { 'request_id': request_id, 'status': 0, 'duration': duration, 'success': False, 'error': str(e) } # 创建会话 async with aiohttp.ClientSession() as session: # 分批发送请求 results = [] for batch_start in range(0, num_requests, concurrency): batch_end = min(batch_start + concurrency, num_requests) batch_tasks = [ make_request(session, i) for i in range(batch_start, batch_end) ] batch_results = await asyncio.gather(*batch_tasks) results.extend(batch_results) # 统计结果 durations = [r['duration'] for r in results] success_count = sum(1 for r in results if r['success']) print(f"\n性能测试结果:") print(f"总请求数: {num_requests}") print(f"成功数: {success_count}") print(f"失败数: {num_requests - success_count}") print(f"成功率: {success_count / num_requests * 100:.2f}%") print(f"\n延迟统计:") print(f"最小值: {min(durations):.3f}s") print(f"最大值: {max(durations):.3f}s") print(f"平均值: {statistics.mean(durations):.3f}s") print(f"中位数: {statistics.median(durations):.3f}s") print(f"P95: {statistics.quantiles(durations, n=20)[18]:.3f}s") print(f"P99: {statistics.quantiles(durations, n=100)[98]:.3f}s") if __name__ == "__main__": asyncio.run(test_api_performance( url="http://localhost:8000/api/workflows", num_requests=1000, concurrency=50 ))📊 性能优化
1. 指标采集优化
# 使用批量更新减少锁竞争 from prometheus_client import CollectorRegistry, Counter import threading class BatchedCounter: """批量更新的计数器""" def __init__(self, counter: Counter, batch_size: int = 100): self.counter = counter self.batch_size = batch_size self.buffer = {} self.lock = threading.Lock() def inc(self, labels: dict, value: float = 1): """增加计数""" key = tuple(sorted(labels.items())) with self.lock: self.buffer[key] = self.buffer.get(key, 0) + value if len(self.buffer) >= self.batch_size: self._flush() def _flush(self): """批量刷新到Prometheus""" for key, value in self.buffer.items(): labels = dict(key) self.counter.labels(**labels).inc(value) self.buffer.clear()2. Prometheus查询优化
# 不好的查询(计算量大) sum(rate(http_requests_total[5m])) by (endpoint) # 优化后(使用recording rule预计算) sum(http_requests:rate5m) by (endpoint)Recording Rule配置:
# monitoring/rules/recording.yml groups: - name: http_metrics interval: 30s rules: # 预计算5分钟请求速率 - record: http_requests:rate5m expr: rate(http_requests_total[5m]) # 预计算错误率 - record: http_requests:error_rate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m]))3. Grafana查询优化
{ "targets": [ { "expr": "http_requests:rate5m", "interval": "30s", "intervalFactor": 2, "step": 60 } ] }🔍 深入探讨
1. 自定义Exporter开发
# exporters/workflow_exporter.py from prometheus_client import start_http_server, Gauge import time from database import get_db from models import Workflow, Execution # 定义指标 workflow_count = Gauge('workflow_count', 'Total workflows', ['status']) execution_count = Gauge('execution_count', 'Total executions', ['status']) async def collect_metrics(): """采集业务指标""" async for db in get_db(): # 统计工作流数量 active_workflows = await db.execute( "SELECT COUNT(*) FROM workflows WHERE status = 'active'" ) workflow_count.labels(status='active').set(active_workflows.scalar()) # 统计执行数量 success_executions = await db.execute( "SELECT COUNT(*) FROM executions WHERE status = 'success'" ) execution_count.labels(status='success').set(success_executions.scalar()) if __name__ == '__main__': # 启动HTTP服务器 start_http_server(9091) # 定期采集指标 while True: asyncio.run(collect_metrics()) time.sleep(60)2. 分布式追踪集成
# 集成OpenTelemetry from opentelemetry import trace from opentelemetry.exporter.prometheus import PrometheusMetricReader from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.trace import TracerProvider # 配置追踪 trace.set_tracer_provider(TracerProvider()) tracer = trace.get_tracer(__name__) # 配置指标 reader = PrometheusMetricReader() provider = MeterProvider(metric_readers=[reader]) @tracer.start_as_current_span("execute_workflow") async def execute_workflow(workflow_id: str): """带追踪的工作流执行""" span = trace.get_current_span() span.set_attribute("workflow.id", workflow_id) # 执行逻辑 result = await _do_execute(workflow_id) span.set_attribute("workflow.status", result.status) return result3. 成本优化
# Prometheus数据保留策略 global: scrape_interval: 15s # 不同数据保留不同时长 scrape_configs: - job_name: 'high-frequency' scrape_interval: 5s # 仅保留1天 metric_relabel_configs: - source_labels: [__name__] regex: 'http_.*' action: keep - job_name: 'low-frequency' scrape_interval: 60s # 保留30天📚 参考资料
官方文档:
- [Prometheus Documentation](https://prometheus.io/docs