MonkeyCode做监控告警:从零搭建生产级可观测性

代码跑起来只是第一步,知道它有没有在跑、跑得怎么样,才是生产环境的硬功夫。MonkeyCode帮你从日志=>指标=>告警,一套搞定。

可观测性的三大支柱

可观测性
├── 日志(Logs)   → 发生了什么?(详细记录)
├── 指标(Metrics)→ 系统状态如何?(数值时序)
└── 追踪(Traces) → 请求链路怎样?(调用链)

MonkeyCode能帮你把这三件事全自动化。

第一层:日志(Logs)

反模式:print大法

#  这种代码生产环境就是灾难
print("用户登录成功")
print(f"订单创建:{order_id}")  # 没有时间、没有级别、没有上下文

正模式:结构化日志

让MonkeyCode帮你重构:

#  结构化日志,可搜索、可告警
import structlog
import sys

# 配置structlog
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.dev.ConsoleRenderer()  # 开发环境友好
    ],
    logger_factory=structlog.PrintLoggerFactory(file=sys.stderr)
)

log = structlog.get_logger()

# 使用
async def login(email: str, password: str):
    log.info("login.attempt", email=email, source="web")
    try:
        user = await authenticate(email, password)
        log.info("login.success", user_id=user.id, email=email)
        return user
    except Exception as e:
        log.error("login.failed", email=email, error=str(e), exc_info=True)
        raise

输出(JSON格式,可接入ELK):

{"event": "login.success", "level": "info", "user_id": 42, "email": "user@example.com", "timestamp": "2024-06-01T10:30:00Z"}

第二层:指标(Metrics)

指标是数值型的时序数据,用来做监控大盘和告警。

用Prometheus + FastAPI暴露指标

# app/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import FastAPI, Response
import time

# 定义指标
REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"]
)
REQUEST_DURATION = Histogram(
    "http_request_duration_seconds",
    "Request duration",
    ["method", "endpoint"]
)
ACTIVE_USERS = Gauge(
    "active_users",
    "Current active users"
)
DB_CONNECTION_POOL = Gauge(
    "db_connection_pool_size",
    "DB connection pool size"
)

# 中间件自动记录
async def metrics_middleware(request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = time.time() - start
    
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    
    REQUEST_DURATION.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)
    
    return response

# /metrics端点(给Prometheus拉取)
def setup_metrics(app: FastAPI):
    app.add_middleware(metrics_middleware)
    
    @app.get("/metrics")
    async def metrics():
        return Response(generate_latest(), media_type="text/plain")

业务指标埋点

from prometheus_client import Counter, Histogram

# 业务指标
ORDER_COUNT = Counter("orders_total", "Total orders", ["status"])
ORDER_VALUE = Histogram("order_value", "Order value distribution", buckets=[10, 50, 100, 500, 1000, 5000])

async def create_order(req: CreateOrderRequest, user_id: int):
    try:
        order = await db.create_order(req, user_id)
        ORDER_COUNT.labels(status="success").inc()
        ORDER_VALUE.observe(order.total_amount)
        log.info("order.created", order_id=order.id, user_id=user_id, amount=order.total_amount)
        return order
    except Exception as e:
        ORDER_COUNT.labels(status="failed").inc()
        log.error("order.create_failed", user_id=user_id, error=str(e))
        raise

第三层:追踪(Traces)

微服务场景下,一个请求可能经过5个服务,没有Trace就是抓瞎。

OpenTelemetry自动埋点

# app/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentation
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentation
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentation

def setup_tracing(app: FastAPI, service_name: str = "my-service"):
    # 初始化TracerProvider
    trace.set_tracer_provider(TracerProvider(
        resource=Resource.create({"service.name": service_name})
    ))
    
    # 导出到Jaeger/OTLP
    otlp_exporter = OTLPSpanExporter(endpoint="http://jaeger:4317")
    span_processor = BatchSpanProcessor(otlp_exporter)
    trace.get_tracer_provider().add_span_processor(span_processor)
    
    # 自动埋点:FastAPI、httpx、SQLAlchemy
    FastAPIInstrumentation().instrument()
    HTTPXClientInstrumentation().instrument()
    SQLAlchemyInstrumentation().instrument()
    
    # 手动埋点(业务逻辑)
    @app.middleware("http")
    async def add_trace_context(request, call_next):
        tracer = trace.get_tracer(__name__)
        with tracer.start_as_current_span("http_request") as span:
            span.set_attribute("http.method", request.method)
            span.set_attribute("http.url", str(request.url))
            response = await call_next(request)
            span.set_attribute("http.status_code", response.status_code)
            return response

监控大盘:Grafana部署

MonkeyCode生成docker-compose.monitoring.yml

version: '3.8'
services:
  prometheus:
    image: prom/prometheus:latest
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prom_data:/prometheus
    ports: ["9090:9090"]
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      
  grafana:
    image: grafana/grafana:latest
    ports: ["3000:3000"]
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
    depends_on: [prometheus]
    
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports: ["16686:16686", "4317:4317"]  # 4317是OTLP gRPC端口
    environment:
      - COLLECTOR_OTLP_ENABLED=true
      
  node-exporter:
    image: prom/node-exporter:latest
    ports: ["9100:9100"]
    
  cadvisor:
    image: gcr.io/cadvisor/cadvisor:latest
    ports: ["8080:8080"]
    volumes:
      - /:/rootfs:ro
      - /var/run:/var/run:rw
      - /sys:/sys:ro
      - /var/lib/docker:/var/lib/docker:ro

volumes:
  prom_data:
  grafana_data:

Prometheus配置(prometheus.yml):

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'my-service'
    static_configs:
      - targets: ['host.docker.internal:8000']  # 你的FastAPI服务
    metrics_path: '/metrics'
    
  - job_name: 'node'
    static_configs:
      - targets: ['node-exporter:9100']

告警规则配置

# prometheus/alerts.yml
groups:
  - name: app_alerts
    rules:
      # 错误率超过5%持续2分钟
      - alert: HighErrorRate
        expr: |
          sum(rate(http_requests_total{status=~"5.."}[2m])) 
          / sum(rate(http_requests_total[2m])) > 0.05
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "高错误率: {{ $value | humanizePercentage }}"
          description: "服务 {{ $labels.endpoint }} 错误率超过5%"
          
      # P99延迟超过500ms
      - alert: HighLatency
        expr: |
          histogram_quantile(0.99, 
            sum(rate(http_request_duration_seconds_bucket[2m])) by (le)
          ) > 0.5
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "高延迟: P99 = {{ $value }}s"
          
      # 服务下线
      - alert: ServiceDown
        expr: up{job="my-service"} == 0
        for: 30s
        labels:
          severity: critical
        annotations:
          summary: "服务下线: {{ $labels.instance }}"

告警通知:接入钉钉/飞书/Slack

# app/alerts.py - 告警Webhook处理
import httpx
import json

async def send_dingtalk_alert(alert: dict):
    """发送钉钉机器人告警"""
    webhook = os.getenv("DINGTALK_WEBHOOK")
    if not webhook:
        return
    
    message = {
        "msgtype": "markdown",
        "markdown": {
            "title": alert["labels"]["alertname"],
            "text": f"""### {alert['labels']['alertname']}
            
**级别**: {alert['labels']['severity']}
**详情**: {alert['annotations']['description']}
**时间**: {alert['startsAt']}
            """
        }
    }
    
    async with httpx.AsyncClient() as client:
        await client.post(webhook, json=message)

async def send_feishu_alert(alert: dict):
    """发送飞书机器人告警"""
    webhook = os.getenv("FEISHU_WEBHOOK")
    if not webhook:
        return
    
    message = {
        "msg_type": "interactive",
        "card": {
            "config": {"wide_screen_mode": True},
            "header": {
                "title": {"tag": "plain_text", "content": alert["labels"]["alertname"]},
                "template": "red" if alert["labels"]["severity"] == "critical" else "orange"
            },
            "elements": [{
                "tag": "div",
                "text": {
                    "tag": "lark_md",
                    "content": alert["annotations"]["description"]
                }
            }]
        }
    }
    
    async with httpx.AsyncClient() as client:
        await client.post(webhook, json=message)

AlertManager配置(路由告警到Webhook):

# alertmanager.yml
route:
  group_by: ['alertname']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'webhook'
  
receivers:
  - name: 'webhook'
    webhook_configs:
      - url: 'http://my-service:8000/alerts/webhook'
        send_resolved: true

日志聚合:ELK Stack

# docker-compose.elk.yml
services:
  elasticsearch:
    image: elasticsearch:8.12.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
    ports: ["9200:9200"]
    volumes: ["es_data:/usr/share/elasticsearch/data"]
    
  logstash:
    image: logstash:8.12.0
    volumes: ["./logstash.conf:/usr/share/logstash/pipeline/logstash.conf"]
    ports: ["5044:5044"]
    depends_on: [elasticsearch]
    
  kibana:
    image: kibana:8.12.0
    ports: ["5601:5601"]
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    depends_on: [elasticsearch]

Python日志输出到ELK(通过TCP直接发到Logstash):

# 配置structlog输出到TCP(Logstash)
import structlog
from structlog.stdlib import LoggerFactory

structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.add_log_level,
        structlog.processors.EventRenamer("message"),
        structlog.dev.JSONRenderer()
    ],
    logger_factory=LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
)

# 用Python logging handler把日志发到Logstash TCP端口
import logging
from logstash import TCPLogstashHandler

logger = logging.getLogger()
logger.addHandler(TCPLogstashHandler("logstash", 5044, version=1))

MonkeyCode Prompt模板

我的[FastAPI/Django/Flask]服务需要接入生产级监控,请帮我:
1. 用structlog配置结构化日志(JSON格式)
2. 集成Prometheus指标(请求数、延迟、业务指标)
3. 配置OpenTelemetry + Jaeger分布式追踪
4. 生成docker-compose.monitoring.yml(Prometheus + Grafana + Jaeger)
5. 配置告警规则(错误率、延迟、服务下线)
6. 接入钉钉/飞书告警通知
7. 生成Grafana Dashboard JSON配置

监控黄金信号(Google SRE标准)

信号 含义 PromQL示例
延迟(Latency) 请求处理时间 histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))
流量(Traffic) 请求量 sum(rate(http_requests_total[5m]))
错误(Errors) 错误率 sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m]))
饱和度(Saturation) 资源使用率 node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes

总结

可观测性三层楼:日志(查问题)→ 指标(看趋势)→ 追踪(找瓶颈)

MonkeyCode能帮你:

  1. 重构print大法为结构化日志(structlog)
  2. 自动埋点Prometheus指标
  3. 零代码接入OpenTelemetry分布式追踪
  4. 生成监控、告警、通知全套配置

记住:监控要在写第一行业务代码的时候就考虑,不要等出了生产事故再补。

文章摘自:https://www.cnblogs.com/jaryn/p/20224113