代码跑起来只是第一步,知道它有没有在跑、跑得怎么样,才是生产环境的硬功夫。MonkeyCode帮你从日志=>指标=>告警,一套搞定。
可观测性的三大支柱
可观测性
├── 日志(Logs) → 发生了什么?(详细记录)
├── 指标(Metrics)→ 系统状态如何?(数值时序)
└── 追踪(Traces) → 请求链路怎样?(调用链)
MonkeyCode能帮你把这三件事全自动化。
第一层:日志(Logs)
反模式:print大法
# 这种代码生产环境就是灾难
print("用户登录成功")
print(f"订单创建:{order_id}") # 没有时间、没有级别、没有上下文
正模式:结构化日志
让MonkeyCode帮你重构:
# 结构化日志,可搜索、可告警
import structlog
import sys
# 配置structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer() # 开发环境友好
],
logger_factory=structlog.PrintLoggerFactory(file=sys.stderr)
)
log = structlog.get_logger()
# 使用
async def login(email: str, password: str):
log.info("login.attempt", email=email, source="web")
try:
user = await authenticate(email, password)
log.info("login.success", user_id=user.id, email=email)
return user
except Exception as e:
log.error("login.failed", email=email, error=str(e), exc_info=True)
raise
输出(JSON格式,可接入ELK):
{"event": "login.success", "level": "info", "user_id": 42, "email": "user@example.com", "timestamp": "2024-06-01T10:30:00Z"}
第二层:指标(Metrics)
指标是数值型的时序数据,用来做监控大盘和告警。
用Prometheus + FastAPI暴露指标
# app/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import FastAPI, Response
import time
# 定义指标
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
REQUEST_DURATION = Histogram(
"http_request_duration_seconds",
"Request duration",
["method", "endpoint"]
)
ACTIVE_USERS = Gauge(
"active_users",
"Current active users"
)
DB_CONNECTION_POOL = Gauge(
"db_connection_pool_size",
"DB connection pool size"
)
# 中间件自动记录
async def metrics_middleware(request, call_next):
start = time.time()
response = await call_next(request)
duration = time.time() - start
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
# /metrics端点(给Prometheus拉取)
def setup_metrics(app: FastAPI):
app.add_middleware(metrics_middleware)
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
业务指标埋点
from prometheus_client import Counter, Histogram
# 业务指标
ORDER_COUNT = Counter("orders_total", "Total orders", ["status"])
ORDER_VALUE = Histogram("order_value", "Order value distribution", buckets=[10, 50, 100, 500, 1000, 5000])
async def create_order(req: CreateOrderRequest, user_id: int):
try:
order = await db.create_order(req, user_id)
ORDER_COUNT.labels(status="success").inc()
ORDER_VALUE.observe(order.total_amount)
log.info("order.created", order_id=order.id, user_id=user_id, amount=order.total_amount)
return order
except Exception as e:
ORDER_COUNT.labels(status="failed").inc()
log.error("order.create_failed", user_id=user_id, error=str(e))
raise
第三层:追踪(Traces)
微服务场景下,一个请求可能经过5个服务,没有Trace就是抓瞎。
OpenTelemetry自动埋点
# app/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentation
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentation
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentation
def setup_tracing(app: FastAPI, service_name: str = "my-service"):
# 初始化TracerProvider
trace.set_tracer_provider(TracerProvider(
resource=Resource.create({"service.name": service_name})
))
# 导出到Jaeger/OTLP
otlp_exporter = OTLPSpanExporter(endpoint="http://jaeger:4317")
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# 自动埋点:FastAPI、httpx、SQLAlchemy
FastAPIInstrumentation().instrument()
HTTPXClientInstrumentation().instrument()
SQLAlchemyInstrumentation().instrument()
# 手动埋点(业务逻辑)
@app.middleware("http")
async def add_trace_context(request, call_next):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("http_request") as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", str(request.url))
response = await call_next(request)
span.set_attribute("http.status_code", response.status_code)
return response
监控大盘:Grafana部署
MonkeyCode生成docker-compose.monitoring.yml:
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prom_data:/prometheus
ports: ["9090:9090"]
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
grafana:
image: grafana/grafana:latest
ports: ["3000:3000"]
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
depends_on: [prometheus]
jaeger:
image: jaegertracing/all-in-one:latest
ports: ["16686:16686", "4317:4317"] # 4317是OTLP gRPC端口
environment:
- COLLECTOR_OTLP_ENABLED=true
node-exporter:
image: prom/node-exporter:latest
ports: ["9100:9100"]
cadvisor:
image: gcr.io/cadvisor/cadvisor:latest
ports: ["8080:8080"]
volumes:
- /:/rootfs:ro
- /var/run:/var/run:rw
- /sys:/sys:ro
- /var/lib/docker:/var/lib/docker:ro
volumes:
prom_data:
grafana_data:
Prometheus配置(prometheus.yml):
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'my-service'
static_configs:
- targets: ['host.docker.internal:8000'] # 你的FastAPI服务
metrics_path: '/metrics'
- job_name: 'node'
static_configs:
- targets: ['node-exporter:9100']
告警规则配置
# prometheus/alerts.yml
groups:
- name: app_alerts
rules:
# 错误率超过5%持续2分钟
- alert: HighErrorRate
expr: |
sum(rate(http_requests_total{status=~"5.."}[2m]))
/ sum(rate(http_requests_total[2m])) > 0.05
for: 2m
labels:
severity: critical
annotations:
summary: "高错误率: {{ $value | humanizePercentage }}"
description: "服务 {{ $labels.endpoint }} 错误率超过5%"
# P99延迟超过500ms
- alert: HighLatency
expr: |
histogram_quantile(0.99,
sum(rate(http_request_duration_seconds_bucket[2m])) by (le)
) > 0.5
for: 2m
labels:
severity: warning
annotations:
summary: "高延迟: P99 = {{ $value }}s"
# 服务下线
- alert: ServiceDown
expr: up{job="my-service"} == 0
for: 30s
labels:
severity: critical
annotations:
summary: "服务下线: {{ $labels.instance }}"
告警通知:接入钉钉/飞书/Slack
# app/alerts.py - 告警Webhook处理
import httpx
import json
async def send_dingtalk_alert(alert: dict):
"""发送钉钉机器人告警"""
webhook = os.getenv("DINGTALK_WEBHOOK")
if not webhook:
return
message = {
"msgtype": "markdown",
"markdown": {
"title": alert["labels"]["alertname"],
"text": f"""### {alert['labels']['alertname']}
**级别**: {alert['labels']['severity']}
**详情**: {alert['annotations']['description']}
**时间**: {alert['startsAt']}
"""
}
}
async with httpx.AsyncClient() as client:
await client.post(webhook, json=message)
async def send_feishu_alert(alert: dict):
"""发送飞书机器人告警"""
webhook = os.getenv("FEISHU_WEBHOOK")
if not webhook:
return
message = {
"msg_type": "interactive",
"card": {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": alert["labels"]["alertname"]},
"template": "red" if alert["labels"]["severity"] == "critical" else "orange"
},
"elements": [{
"tag": "div",
"text": {
"tag": "lark_md",
"content": alert["annotations"]["description"]
}
}]
}
}
async with httpx.AsyncClient() as client:
await client.post(webhook, json=message)
AlertManager配置(路由告警到Webhook):
# alertmanager.yml
route:
group_by: ['alertname']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'webhook'
receivers:
- name: 'webhook'
webhook_configs:
- url: 'http://my-service:8000/alerts/webhook'
send_resolved: true
日志聚合:ELK Stack
# docker-compose.elk.yml
services:
elasticsearch:
image: elasticsearch:8.12.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports: ["9200:9200"]
volumes: ["es_data:/usr/share/elasticsearch/data"]
logstash:
image: logstash:8.12.0
volumes: ["./logstash.conf:/usr/share/logstash/pipeline/logstash.conf"]
ports: ["5044:5044"]
depends_on: [elasticsearch]
kibana:
image: kibana:8.12.0
ports: ["5601:5601"]
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on: [elasticsearch]
Python日志输出到ELK(通过TCP直接发到Logstash):
# 配置structlog输出到TCP(Logstash)
import structlog
from structlog.stdlib import LoggerFactory
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.EventRenamer("message"),
structlog.dev.JSONRenderer()
],
logger_factory=LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
)
# 用Python logging handler把日志发到Logstash TCP端口
import logging
from logstash import TCPLogstashHandler
logger = logging.getLogger()
logger.addHandler(TCPLogstashHandler("logstash", 5044, version=1))
MonkeyCode Prompt模板
我的[FastAPI/Django/Flask]服务需要接入生产级监控,请帮我:
1. 用structlog配置结构化日志(JSON格式)
2. 集成Prometheus指标(请求数、延迟、业务指标)
3. 配置OpenTelemetry + Jaeger分布式追踪
4. 生成docker-compose.monitoring.yml(Prometheus + Grafana + Jaeger)
5. 配置告警规则(错误率、延迟、服务下线)
6. 接入钉钉/飞书告警通知
7. 生成Grafana Dashboard JSON配置
监控黄金信号(Google SRE标准)
| 信号 | 含义 | PromQL示例 |
|---|---|---|
| 延迟(Latency) | 请求处理时间 | histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m])) |
| 流量(Traffic) | 请求量 | sum(rate(http_requests_total[5m])) |
| 错误(Errors) | 错误率 | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) |
| 饱和度(Saturation) | 资源使用率 | node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes |
总结
可观测性三层楼:日志(查问题)→ 指标(看趋势)→ 追踪(找瓶颈)。
MonkeyCode能帮你:
- 重构print大法为结构化日志(structlog)
- 自动埋点Prometheus指标
- 零代码接入OpenTelemetry分布式追踪
- 生成监控、告警、通知全套配置
记住:监控要在写第一行业务代码的时候就考虑,不要等出了生产事故再补。
文章摘自:https://www.cnblogs.com/jaryn/p/20224113
