为什么并发编程是高级技能
写爬虫、处理大数据、并发请求API——这些场景单线程跑不动,必须上并发。但Python的GIL让多线程鸡肋,asyncio又门槛高,进程池配置复杂……
这篇文章用MonkeyCode生成7种并发方案的实战代码,每种方案都有适用场景,帮你搞清楚什么时候用什么。
给MonkeyCode的统一Prompt模板
用Python实现XX功能,需要支持高并发。要求:
1. 完整可运行的代码,包含main入口
2. 每种方案对比优劣分析
3. 包含性能测试代码
4. 处理异常和超时
5. 资源控制(最大并发数)
武器1:ThreadPoolExecutor(线程池)
适用场景:IO密集型任务(爬虫、API请求、文件读写)
# concurrent_threading.py - MonkeyCode生成
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class CrawlResult:
url: str
status_code: int
title: str
error: str = None
def fetch_page(url: str, timeout: int = 10) -> CrawlResult:
"""抓取单个页面"""
try:
resp = requests.get(url, timeout=timeout, headers={
'User-Agent': 'Mozilla/5.0 (compatible; Bot/1.0)'
})
# 解析标题
title = ''
if 'text/html' in resp.headers.get('Content-Type', ''):
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(resp.text, 'html.parser')
title = soup.title.string if soup.title else ''
except:
title = resp.text[:200]
return CrawlResult(url=url, status_code=resp.status_code, title=title)
except requests.Timeout:
return CrawlResult(url=url, status_code=0, title='', error='Timeout')
except requests.RequestException as e:
return CrawlResult(url=url, status_code=0, title='', error=str(e))
def batch_crawl(urls: List[str], max_workers: int = 10) -> List[CrawlResult]:
"""并发抓取URL列表"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_url = {
executor.submit(fetch_page, url): url
for url in urls
}
# 按完成顺序收集结果
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
print(f"[{result.status_code}] {url}")
except Exception as e:
print(f"[ERROR] {url}: {e}")
results.append(CrawlResult(url=url, status_code=0, title='', error=str(e)))
return results
def benchmark_crawl(urls: List[str], max_workers_list: List[int]):
"""对比不同线程数性能"""
print("\n=== ThreadPoolExecutor Benchmark ===")
for max_workers in max_workers_list:
start = time.time()
results = batch_crawl(urls, max_workers=max_workers)
elapsed = time.time() - start
success = sum(1 for r in results if r.status_code == 200)
failed = len(results) - success
print(f"\nWorkers={max_workers}: {elapsed:.2f}s | "
f"Success={success} | Failed={failed} | "
f"RPS={len(urls)/elapsed:.1f}")
if __name__ == '__main__':
# 测试URL(用httpbin做测试)
test_urls = [f'https://httpbin.org/delay/1' for _ in range(20)]
benchmark_crawl(test_urls, [1, 5, 10, 20])
# 实际抓取示例
print("\n=== Real Crawl ===")
real_urls = [
'https://httpbin.org/html',
'https://httpbin.org/json',
'https://httpbin.org/uuid',
]
results = batch_crawl(real_urls, max_workers=3)
for r in results:
print(f"URL: {r.url}, Status: {r.status_code}, Title: {r.title[:50]}")
优缺点:
- 优点:简单、IO密集型任务效果好、标准库无需安装
- 缺点:GIL限制,CPU密集型任务无效
武器2:asyncio + aiohttp(异步IO)
适用场景:大量并发HTTP请求、需要单线程处理超高并发(1000+连接)
# concurrent_async.py - MonkeyCode生成
import asyncio
import aiohttp
import time
from dataclasses import dataclass, field
from typing import List, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class AsyncResult:
url: str
status: int
data: dict = field(default_factory=dict)
error: Optional[str] = None
class AsyncCrawler:
def __init__(self, max_concurrent: int = 100, timeout: int = 30):
self.max_concurrent = max_concurrent
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.semaphore: Optional[asyncio.Semaphore] = None
self.results: List[AsyncResult] = []
async def fetch(self, session: aiohttp.ClientSession, url: str) -> AsyncResult:
"""并发控制 + 超时处理"""
async with self.semaphore: # 限制并发数
try:
async with session.get(url) as resp:
data = await resp.json() if resp.headers.get('Content-Type', '').startswith('application/json') else {}
return AsyncResult(url=url, status=resp.status, data=data)
except asyncio.TimeoutError:
return AsyncResult(url=url, status=0, error='Timeout')
except aiohttp.ClientError as e:
return AsyncResult(url=url, status=0, error=str(e))
except Exception as e:
return AsyncResult(url=url, status=0, error=str(e))
async def crawl(self, urls: List[str]) -> List[AsyncResult]:
"""异步并发抓取"""
self.semaphore = asyncio.Semaphore(self.max_concurrent)
# 创建ClientSession(复用一个连接)
async with aiohttp.ClientSession(timeout=self.timeout) as session:
tasks = [self.fetch(session, url) for url in urls]
# gather: 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常
for i, result in enumerate(results):
if isinstance(result, Exception):
results[i] = AsyncResult(url=urls[i], status=0, error=str(result))
return results
async def crawl_with_progress(self, urls: List[str]) -> List[AsyncResult]:
"""带进度显示的抓取"""
self.semaphore = asyncio.Semaphore(self.max_concurrent)
results = []
async with aiohttp.ClientSession(timeout=self.timeout) as session:
for i, url in enumerate(urls):
task = asyncio.create_task(self.fetch(session, url))
result = await task
results.append(result)
if (i + 1) % 10 == 0:
logger.info(f"Progress: {i+1}/{len(urls)}")
return results
async def benchmark_async(urls: List[str], max_concurrent_list: List[int]):
"""对比不同并发数"""
print("\n=== Async Benchmark ===")
for max_concurrent in max_concurrent_list:
crawler = AsyncCrawler(max_concurrent=max_concurrent)
start = time.time()
results = await crawler.crawl(urls)
elapsed = time.time() - start
success = sum(1 for r in results if r.status == 200)
print(f"MaxConcurrent={max_concurrent}: {elapsed:.2f}s | "
f"Success={success}/{len(urls)} | "
f"RPS={len(urls)/elapsed:.1f}")
async def main():
# 对比:asyncio vs threading
test_urls = [f'https://httpbin.org/delay/1' for _ in range(50)]
await benchmark_async(test_urls, [10, 50, 100])
# 实际API调用
print("\n=== Real API Test ===")
crawler = AsyncCrawler(max_concurrent=20)
real_urls = [f'https://httpbin.org/uuid' for _ in range(10)]
results = await crawler.crawl(real_urls)
for r in results:
print(f"[{r.status}] {r.url}: {r.data}")
if __name__ == '__main__':
asyncio.run(main())
优缺点:
- 优点:单线程超高并发、内存占用低(无线程开销)
- 缺点:需要aiohttp等异步库、代码比线程池复杂
武器3:ProcessPoolExecutor(进程池)
适用场景:CPU密集型任务(数据处理、图像处理、加密计算)
# concurrent_processing.py - MonkeyCode生成
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp
import time
import hashlib
import json
from typing import List, Tuple
from dataclasses import dataclass
def cpu_intensive_task(n: int) -> dict:
"""CPU密集型:计算素数 + hash"""
# 计算n以内的所有素数
primes = []
for i in range(2, n):
is_prime = True
for j in range(2, int(i**0.5) + 1):
if i % j == 0:
is_prime = False
break
if is_prime:
primes.append(i)
# 计算所有素数的MD5
combined = ''.join(map(str, primes))
hash_val = hashlib.sha256(combined.encode()).hexdigest()
return {
'n': n,
'prime_count': len(primes),
'last_prime': primes[-1] if primes else 0,
'hash': hash_val[:16]
}
def io_intensive_task(file_path: str) -> dict:
"""IO密集型:读文件 + 处理"""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# 统计
lines = content.count('\n')
words = len(content.split())
chars = len(content)
return {
'file': file_path,
'lines': lines,
'words': words,
'chars': chars
}
except Exception as e:
return {'file': file_path, 'error': str(e)}
def benchmark_cpu(n_list: List[int], workers_list: List[int]):
"""CPU任务:进程池 vs 单进程"""
print("\n=== CPU Task Benchmark (ProcessPool) ===")
for n in [5000, 10000]:
print(f"\n--- n={n} ---")
# 单进程基准
start = time.time()
[cpu_intensive_task(n) for _ in range(4)]
single_time = time.time() - start
print(f"Single process: {single_time:.2f}s")
# 不同worker数
for workers in workers_list:
start = time.time()
with ProcessPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(cpu_intensive_task, n) for _ in range(4)]
results = [f.result() for f in as_completed(futures)]
elapsed = time.time() - start
speedup = single_time / elapsed
print(f" Workers={workers}: {elapsed:.2f}s (Speedup: {speedup:.1f}x)")
def map_reduce_example(data: List[dict]) -> dict:
"""MapReduce模式:用进程池处理大数据"""
def mapper(item: dict) -> Tuple[str, int]:
"""Map: 提取省份和金额"""
province = item.get('province', 'Unknown')
amount = item.get('amount', 0)
return (province, amount)
def reducer(key: str, values: List[int]) -> dict:
"""Reduce: 汇总"""
return {
'province': key,
'total_amount': sum(values),
'count': len(values),
'avg_amount': sum(values) / len(values)
}
# Mapper
with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
mapped = list(executor.map(mapper, data))
# Group by key
groups = {}
for key, value in mapped:
if key not in groups:
groups[key] = []
groups[key].append(value)
# Reducer
with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
results = list(executor.starmap(reducer, groups.items()))
return results
if __name__ == '__main__':
# CPU Benchmark
benchmark_cpu([5000, 10000], [2, 4, mp.cpu_count()])
# MapReduce示例
print("\n=== MapReduce Example ===")
sample_data = [
{'province': '广东', 'amount': 100},
{'province': '北京', 'amount': 200},
{'province': '广东', 'amount': 150},
{'province': '上海', 'amount': 300},
{'province': '北京', 'amount': 250},
]
results = map_reduce_example(sample_data)
for r in sorted(results, key=lambda x: -x['total_amount']):
print(f"{r['province']}: Total={r['total_amount']}, Avg={r['avg_amount']:.1f}")
优缺点:
- 优点:突破GIL限制、真正并行、多核利用
- 缺点:进程创建开销大、数据通信需要序列化
武器4:生产者-消费者模式
适用场景:任务生成速度和消费速度不匹配(如爬虫 + 解析 + 存储)
# concurrent_pipeline.py - MonkeyCode生成
import threading
import queue
import time
from dataclasses import dataclass
from typing import Optional, Callable
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Task:
task_id: int
url: str
priority: int = 0
@dataclass
class Result:
task_id: int
data: dict
error: Optional[str] = None
class Producer(threading.Thread):
"""生产者:生成任务"""
def __init__(self, task_queue: queue.Queue, urls: list):
super().__init__(daemon=True)
self.task_queue = task_queue
self.urls = urls
self.tasks_generated = 0
def run(self):
for url in self.urls:
task = Task(task_id=self.tasks_generated, url=url, priority=1)
self.task_queue.put(task)
self.tasks_generated += 1
logger.info(f"[Producer] Generated task {task.task_id}: {task.url}")
time.sleep(0.1) # 模拟任务生成间隔
# 发送停止信号
for _ in range(3): # 3个消费者
self.task_queue.put(None)
class Consumer(threading.Thread):
"""消费者:处理任务"""
def __init__(self, consumer_id: int, task_queue: queue.Queue, result_queue: queue.Queue):
super().__init__(daemon=True)
self.consumer_id = consumer_id
self.task_queue = task_queue
self.result_queue = result_queue
self.tasks_processed = 0
def run(self):
while True:
try:
task = self.task_queue.get(timeout=5)
if task is None: # 停止信号
self.task_queue.put(None) # 传递给下一个消费者
break
logger.info(f"[Consumer-{self.consumer_id}] Processing {task.url}")
# 模拟处理
time.sleep(0.5)
result = Result(
task_id=task.task_id,
data={'url': task.url, 'processed_by': self.consumer_id}
)
self.result_queue.put(result)
self.tasks_processed += 1
except queue.Empty:
break
except Exception as e:
logger.error(f"[Consumer-{self.consumer_id}] Error: {e}")
class Pipeline:
"""流水线管理器"""
def __init__(self, num_consumers: int = 3, max_queue_size: int = 100):
self.task_queue = queue.PriorityQueue(maxsize=max_queue_size) # 优先级队列
self.result_queue = queue.Queue()
self.num_consumers = num_consumers
self.producer: Optional[Producer] = None
self.consumers: list = []
def run(self, urls: list):
logger.info(f"Starting pipeline with {self.num_consumers} consumers")
# 启动生产者
self.producer = Producer(self.task_queue, urls)
self.producer.start()
# 启动消费者
self.consumers = [
Consumer(i, self.task_queue, self.result_queue)
for i in range(self.num_consumers)
]
for c in self.consumers:
c.start()
# 等待完成
self.producer.join()
for c in self.consumers:
c.join()
# 收集结果
results = []
while not self.result_queue.empty():
results.append(self.result_queue.get())
return sorted(results, key=lambda x: x.task_id)
def main():
pipeline = Pipeline(num_consumers=3)
urls = [f'http://example.com/item/{i}' for i in range(20)]
start = time.time()
results = pipeline.run(urls)
elapsed = time.time() - start
logger.info(f"\nCompleted {len(results)} tasks in {elapsed:.2f}s")
for r in results[:5]:
logger.info(f" Task {r.task_id}: {r.data}")
if __name__ == '__main__':
main()
武器5:信号量 + 令牌桶
适用场景:API限流控制(每秒N次请求)
# concurrent_rate_limit.py - MonkeyCode生成
import time
import threading
import requests
from collections import deque
from typing import Callable, Any
class TokenBucket:
"""令牌桶算法:限制请求速率"""
def __init__(self, rate: float, capacity: int):
"""
rate: 每秒产生的令牌数
capacity: 桶的容量
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self.lock = threading.Lock()
def acquire(self, tokens: int = 1, timeout: float = None) -> bool:
"""
获取令牌,超时则返回False
"""
deadline = time.time() + timeout if timeout else float('inf')
with self.lock:
# 补充令牌
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_update = now
# 等待令牌
while self.tokens < tokens:
if time.time() >= deadline:
return False
time.sleep(0.01)
self.tokens -= tokens
return True
class SlidingWindow:
"""滑动窗口算法:更精确的限流"""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
self.lock = threading.Lock()
def is_allowed(self) -> bool:
with self.lock:
now = time.time()
# 清除窗口外的请求
while self.requests and self.requests[0] < now - self.window_seconds:
self.requests.popleft()
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
def wait_and_acquire(self):
"""等待直到获取许可"""
while not self.is_allowed():
time.sleep(0.1)
def rate_limited_request(url: str, limiter: TokenBucket, session: requests.Session) -> dict:
"""带限流的请求"""
if not limiter.acquire(timeout=5):
return {'error': 'Rate limit exceeded, timeout waiting for token'}
resp = session.get(url)
return {'status': resp.status_code, 'url': url}
def main():
# 每秒10个请求,令牌桶容量20
limiter = TokenBucket(rate=10, capacity=20)
urls = [f'https://httpbin.org/uuid' for _ in range(30)]
session = requests.Session()
results = []
start = time.time()
for url in urls:
result = rate_limited_request(url, limiter, session)
results.append(result)
elapsed = time.time() - start
print(f"Completed 30 requests in {elapsed:.2f}s")
print(f"Expected (30/10={3.0}s), Actual: {elapsed:.2f}s")
print(f"Rate: {30/elapsed:.1f} req/s")
if __name__ == '__main__':
main()
武器6:并发数据结构
适用场景:多线程安全的数据收集
# concurrent_datastructures.py - MonkeyCode生成
import threading
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from typing import List, Dict
from collections import defaultdict
import time
@dataclass
class ConcurrentCounter:
"""线程安全的计数器"""
_lock: threading.Lock = field(default_factory=threading.Lock)
_counts: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
def increment(self, key: str, value: int = 1):
with self._lock:
self._counts[key] += value
def get(self, key: str) -> int:
with self._lock:
return self._counts.get(key, 0)
def get_all(self) -> Dict[str, int]:
with self._lock:
return dict(self._counts)
class ConcurrentList:
"""线程安全的列表"""
def __init__(self):
self._lock = threading.Lock()
self._items: List = []
def append(self, item):
with self._lock:
self._items.append(item)
def extend(self, items):
with self._lock:
self._items.extend(items)
def get_all(self) -> List:
with self._lock:
return list(self._items)
def __len__(self):
with self._lock:
return len(self._items)
class ThreadSafeCache:
"""线程安全的LRU缓存"""
def __init__(self, maxsize: int = 1000):
self.maxsize = maxsize
self._lock = threading.RLock()
self._cache: Dict[str, any] = {}
self._access_order: List[str] = []
def get(self, key: str):
with self._lock:
if key in self._cache:
# 移动到末尾(最新访问)
self._access_order.remove(key)
self._access_order.append(key)
return self._cache[key]
return None
def set(self, key: str, value):
with self._lock:
if key in self._cache:
self._cache[key] = value
self._access_order.remove(key)
self._access_order.append(key)
else:
if len(self._cache) >= self.maxsize:
# 淘汰最老的
oldest = self._access_order.pop(0)
del self._cache[oldest]
self._cache[key] = value
self._access_order.append(key)
def benchmark_concurrent():
"""测试并发数据结构的性能"""
# 并发写入测试
counter = ConcurrentCounter()
items = ConcurrentList()
def worker(start, end):
for i in range(start, end):
counter.increment('total')
counter.increment(f'worker_{i % 4}')
items.append(i)
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(worker, i*1000, (i+1)*1000) for i in range(8)]
[f.result() for f in futures]
print(f"Counter total: {counter.get('total')}")
print(f"Counter worker_0: {counter.get('worker_0')}")
print(f"List length: {len(items)}")
# LRU Cache测试
cache = ThreadSafeCache(maxsize=100)
for i in range(150):
cache.set(f'key_{i}', f'value_{i}')
# 检查缓存命中率
hits = sum(1 for i in range(50, 150) if cache.get(f'key_{i}'))
print(f"Cache hits (50-150): {hits}/100")
# 0-49应该被淘汰了
misses = sum(1 for i in range(50) if cache.get(f'key_{i}') is None)
print(f"Cache misses (0-49): {misses}/50 (should be ~50 evictions)")
if __name__ == '__main__':
benchmark_concurrent()
武器7:协程 + 线程混合模式
适用场景:既需要高并发IO,又有CPU密集型处理
# concurrent_hybrid.py - MonkeyCode生成
import asyncio
import threading
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import time
import aiohttp
def cpu_task(n: int) -> int:
"""CPU密集型任务"""
# 快速计算
return sum(i*i for i in range(n))
async def fetch_and_process(urls: list, cpu_workers: int = 4):
"""混合模式:异步IO + 进程池CPU处理"""
# 阶段1:异步并发获取数据
async def fetch(session):
results = []
async with session.get('https://httpbin.org/json') as resp:
data = await resp.json()
results.append(data)
return results
async with aiohttp.ClientSession() as session:
tasks = [fetch(session) for _ in range(10)]
fetched = await asyncio.gather(*tasks)
# 阶段2:进程池CPU处理
all_data = [item for sublist in fetched for item in sublist]
with ProcessPoolExecutor(max_workers=cpu_workers) as executor:
futures = [executor.submit(cpu_task, 10000) for _ in range(20)]
processed = [f.result() for f in concurrent.futures.as_completed(futures)]
return {'fetched': len(all_data), 'processed': len(processed)}
async def main():
start = time.time()
result = await fetch_and_process([])
elapsed = time.time() - start
print(f"Hybrid approach: {elapsed:.2f}s")
print(f"Results: {result}")
if __name__ == '__main__':
asyncio.run(main())
7种武器对比总结
| 武器 | 适用场景 | 并发数 | 复杂度 | GIL |
|---|---|---|---|---|
| ThreadPoolExecutor | IO密集(爬虫/API) | 中等(10-100) | 低 | 受限 |
| asyncio+aiohttp | 超高并发IO | 高(1000+) | 中 | 不受影响 |
| ProcessPoolExecutor | CPU密集(计算) | 低(核心数) | 低 | 不受影响 |
| 生产者-消费者 | 任务队列 | 可扩展 | 中 | 受限 |
| 令牌桶/滑动窗口 | API限流 | 任意 | 低 | 不受影响 |
| 并发数据结构 | 多线程数据共享 | 可扩展 | 中 | 受限 |
| 协程+线程混合 | 混合负载 | 高 | 高 | 部分突破 |
用MonkeyCode生成代码时,明确告诉它你的场景,它会选择合适的方案。
文章摘自:https://www.cnblogs.com/jaryn/p/20218912
