MonkeyCode批量操作:Python并发编程的7种武器实战

为什么并发编程是高级技能

写爬虫、处理大数据、并发请求API——这些场景单线程跑不动,必须上并发。但Python的GIL让多线程鸡肋,asyncio又门槛高,进程池配置复杂……

这篇文章用MonkeyCode生成7种并发方案的实战代码,每种方案都有适用场景,帮你搞清楚什么时候用什么。

给MonkeyCode的统一Prompt模板

用Python实现XX功能,需要支持高并发。要求:
1. 完整可运行的代码,包含main入口
2. 每种方案对比优劣分析
3. 包含性能测试代码
4. 处理异常和超时
5. 资源控制(最大并发数)

武器1:ThreadPoolExecutor(线程池)

适用场景:IO密集型任务(爬虫、API请求、文件读写)

# concurrent_threading.py - MonkeyCode生成
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class CrawlResult:
    url: str
    status_code: int
    title: str
    error: str = None

def fetch_page(url: str, timeout: int = 10) -> CrawlResult:
    """抓取单个页面"""
    try:
        resp = requests.get(url, timeout=timeout, headers={
            'User-Agent': 'Mozilla/5.0 (compatible; Bot/1.0)'
        })
        # 解析标题
        title = ''
        if 'text/html' in resp.headers.get('Content-Type', ''):
            try:
                from bs4 import BeautifulSoup
                soup = BeautifulSoup(resp.text, 'html.parser')
                title = soup.title.string if soup.title else ''
            except:
                title = resp.text[:200]
        
        return CrawlResult(url=url, status_code=resp.status_code, title=title)
    except requests.Timeout:
        return CrawlResult(url=url, status_code=0, title='', error='Timeout')
    except requests.RequestException as e:
        return CrawlResult(url=url, status_code=0, title='', error=str(e))

def batch_crawl(urls: List[str], max_workers: int = 10) -> List[CrawlResult]:
    """并发抓取URL列表"""
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_url = {
            executor.submit(fetch_page, url): url 
            for url in urls
        }
        
        # 按完成顺序收集结果
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                result = future.result()
                results.append(result)
                print(f"[{result.status_code}] {url}")
            except Exception as e:
                print(f"[ERROR] {url}: {e}")
                results.append(CrawlResult(url=url, status_code=0, title='', error=str(e)))
    
    return results

def benchmark_crawl(urls: List[str], max_workers_list: List[int]):
    """对比不同线程数性能"""
    print("\n=== ThreadPoolExecutor Benchmark ===")
    
    for max_workers in max_workers_list:
        start = time.time()
        results = batch_crawl(urls, max_workers=max_workers)
        elapsed = time.time() - start
        
        success = sum(1 for r in results if r.status_code == 200)
        failed = len(results) - success
        
        print(f"\nWorkers={max_workers}: {elapsed:.2f}s | "
              f"Success={success} | Failed={failed} | "
              f"RPS={len(urls)/elapsed:.1f}")

if __name__ == '__main__':
    # 测试URL(用httpbin做测试)
    test_urls = [f'https://httpbin.org/delay/1' for _ in range(20)]
    
    benchmark_crawl(test_urls, [1, 5, 10, 20])
    
    # 实际抓取示例
    print("\n=== Real Crawl ===")
    real_urls = [
        'https://httpbin.org/html',
        'https://httpbin.org/json',
        'https://httpbin.org/uuid',
    ]
    results = batch_crawl(real_urls, max_workers=3)
    for r in results:
        print(f"URL: {r.url}, Status: {r.status_code}, Title: {r.title[:50]}")

优缺点

  • 优点:简单、IO密集型任务效果好、标准库无需安装
  • 缺点:GIL限制,CPU密集型任务无效

武器2:asyncio + aiohttp(异步IO)

适用场景:大量并发HTTP请求、需要单线程处理超高并发(1000+连接)

# concurrent_async.py - MonkeyCode生成
import asyncio
import aiohttp
import time
from dataclasses import dataclass, field
from typing import List, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class AsyncResult:
    url: str
    status: int
    data: dict = field(default_factory=dict)
    error: Optional[str] = None

class AsyncCrawler:
    def __init__(self, max_concurrent: int = 100, timeout: int = 30):
        self.max_concurrent = max_concurrent
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore: Optional[asyncio.Semaphore] = None
        self.results: List[AsyncResult] = []
    
    async def fetch(self, session: aiohttp.ClientSession, url: str) -> AsyncResult:
        """并发控制 + 超时处理"""
        async with self.semaphore:  # 限制并发数
            try:
                async with session.get(url) as resp:
                    data = await resp.json() if resp.headers.get('Content-Type', '').startswith('application/json') else {}
                    return AsyncResult(url=url, status=resp.status, data=data)
            except asyncio.TimeoutError:
                return AsyncResult(url=url, status=0, error='Timeout')
            except aiohttp.ClientError as e:
                return AsyncResult(url=url, status=0, error=str(e))
            except Exception as e:
                return AsyncResult(url=url, status=0, error=str(e))
    
    async def crawl(self, urls: List[str]) -> List[AsyncResult]:
        """异步并发抓取"""
        self.semaphore = asyncio.Semaphore(self.max_concurrent)
        
        # 创建ClientSession(复用一个连接)
        async with aiohttp.ClientSession(timeout=self.timeout) as session:
            tasks = [self.fetch(session, url) for url in urls]
            
            # gather: 等待所有任务完成
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理异常
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    results[i] = AsyncResult(url=urls[i], status=0, error=str(result))
            
            return results
    
    async def crawl_with_progress(self, urls: List[str]) -> List[AsyncResult]:
        """带进度显示的抓取"""
        self.semaphore = asyncio.Semaphore(self.max_concurrent)
        results = []
        
        async with aiohttp.ClientSession(timeout=self.timeout) as session:
            for i, url in enumerate(urls):
                task = asyncio.create_task(self.fetch(session, url))
                result = await task
                results.append(result)
                
                if (i + 1) % 10 == 0:
                    logger.info(f"Progress: {i+1}/{len(urls)}")
        
        return results

async def benchmark_async(urls: List[str], max_concurrent_list: List[int]):
    """对比不同并发数"""
    print("\n=== Async Benchmark ===")
    
    for max_concurrent in max_concurrent_list:
        crawler = AsyncCrawler(max_concurrent=max_concurrent)
        
        start = time.time()
        results = await crawler.crawl(urls)
        elapsed = time.time() - start
        
        success = sum(1 for r in results if r.status == 200)
        print(f"MaxConcurrent={max_concurrent}: {elapsed:.2f}s | "
              f"Success={success}/{len(urls)} | "
              f"RPS={len(urls)/elapsed:.1f}")

async def main():
    # 对比:asyncio vs threading
    test_urls = [f'https://httpbin.org/delay/1' for _ in range(50)]
    
    await benchmark_async(test_urls, [10, 50, 100])
    
    # 实际API调用
    print("\n=== Real API Test ===")
    crawler = AsyncCrawler(max_concurrent=20)
    real_urls = [f'https://httpbin.org/uuid' for _ in range(10)]
    results = await crawler.crawl(real_urls)
    
    for r in results:
        print(f"[{r.status}] {r.url}: {r.data}")

if __name__ == '__main__':
    asyncio.run(main())

优缺点

  • 优点:单线程超高并发、内存占用低(无线程开销)
  • 缺点:需要aiohttp等异步库、代码比线程池复杂

武器3:ProcessPoolExecutor(进程池)

适用场景:CPU密集型任务(数据处理、图像处理、加密计算)

# concurrent_processing.py - MonkeyCode生成
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp
import time
import hashlib
import json
from typing import List, Tuple
from dataclasses import dataclass

def cpu_intensive_task(n: int) -> dict:
    """CPU密集型:计算素数 + hash"""
    # 计算n以内的所有素数
    primes = []
    for i in range(2, n):
        is_prime = True
        for j in range(2, int(i**0.5) + 1):
            if i % j == 0:
                is_prime = False
                break
        if is_prime:
            primes.append(i)
    
    # 计算所有素数的MD5
    combined = ''.join(map(str, primes))
    hash_val = hashlib.sha256(combined.encode()).hexdigest()
    
    return {
        'n': n,
        'prime_count': len(primes),
        'last_prime': primes[-1] if primes else 0,
        'hash': hash_val[:16]
    }

def io_intensive_task(file_path: str) -> dict:
    """IO密集型:读文件 + 处理"""
    try:
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
            content = f.read()
        
        # 统计
        lines = content.count('\n')
        words = len(content.split())
        chars = len(content)
        
        return {
            'file': file_path,
            'lines': lines,
            'words': words,
            'chars': chars
        }
    except Exception as e:
        return {'file': file_path, 'error': str(e)}

def benchmark_cpu(n_list: List[int], workers_list: List[int]):
    """CPU任务:进程池 vs 单进程"""
    print("\n=== CPU Task Benchmark (ProcessPool) ===")
    
    for n in [5000, 10000]:
        print(f"\n--- n={n} ---")
        
        # 单进程基准
        start = time.time()
        [cpu_intensive_task(n) for _ in range(4)]
        single_time = time.time() - start
        print(f"Single process: {single_time:.2f}s")
        
        # 不同worker数
        for workers in workers_list:
            start = time.time()
            with ProcessPoolExecutor(max_workers=workers) as executor:
                futures = [executor.submit(cpu_intensive_task, n) for _ in range(4)]
                results = [f.result() for f in as_completed(futures)]
            elapsed = time.time() - start
            
            speedup = single_time / elapsed
            print(f"  Workers={workers}: {elapsed:.2f}s (Speedup: {speedup:.1f}x)")

def map_reduce_example(data: List[dict]) -> dict:
    """MapReduce模式:用进程池处理大数据"""
    
    def mapper(item: dict) -> Tuple[str, int]:
        """Map: 提取省份和金额"""
        province = item.get('province', 'Unknown')
        amount = item.get('amount', 0)
        return (province, amount)
    
    def reducer(key: str, values: List[int]) -> dict:
        """Reduce: 汇总"""
        return {
            'province': key,
            'total_amount': sum(values),
            'count': len(values),
            'avg_amount': sum(values) / len(values)
        }
    
    # Mapper
    with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
        mapped = list(executor.map(mapper, data))
    
    # Group by key
    groups = {}
    for key, value in mapped:
        if key not in groups:
            groups[key] = []
        groups[key].append(value)
    
    # Reducer
    with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
        results = list(executor.starmap(reducer, groups.items()))
    
    return results

if __name__ == '__main__':
    # CPU Benchmark
    benchmark_cpu([5000, 10000], [2, 4, mp.cpu_count()])
    
    # MapReduce示例
    print("\n=== MapReduce Example ===")
    sample_data = [
        {'province': '广东', 'amount': 100},
        {'province': '北京', 'amount': 200},
        {'province': '广东', 'amount': 150},
        {'province': '上海', 'amount': 300},
        {'province': '北京', 'amount': 250},
    ]
    results = map_reduce_example(sample_data)
    for r in sorted(results, key=lambda x: -x['total_amount']):
        print(f"{r['province']}: Total={r['total_amount']}, Avg={r['avg_amount']:.1f}")

优缺点

  • 优点:突破GIL限制、真正并行、多核利用
  • 缺点:进程创建开销大、数据通信需要序列化

武器4:生产者-消费者模式

适用场景:任务生成速度和消费速度不匹配(如爬虫 + 解析 + 存储)

# concurrent_pipeline.py - MonkeyCode生成
import threading
import queue
import time
from dataclasses import dataclass
from typing import Optional, Callable
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class Task:
    task_id: int
    url: str
    priority: int = 0

@dataclass
class Result:
    task_id: int
    data: dict
    error: Optional[str] = None

class Producer(threading.Thread):
    """生产者:生成任务"""
    def __init__(self, task_queue: queue.Queue, urls: list):
        super().__init__(daemon=True)
        self.task_queue = task_queue
        self.urls = urls
        self.tasks_generated = 0
    
    def run(self):
        for url in self.urls:
            task = Task(task_id=self.tasks_generated, url=url, priority=1)
            self.task_queue.put(task)
            self.tasks_generated += 1
            logger.info(f"[Producer] Generated task {task.task_id}: {task.url}")
            time.sleep(0.1)  # 模拟任务生成间隔
        
        # 发送停止信号
        for _ in range(3):  # 3个消费者
            self.task_queue.put(None)

class Consumer(threading.Thread):
    """消费者:处理任务"""
    def __init__(self, consumer_id: int, task_queue: queue.Queue, result_queue: queue.Queue):
        super().__init__(daemon=True)
        self.consumer_id = consumer_id
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.tasks_processed = 0
    
    def run(self):
        while True:
            try:
                task = self.task_queue.get(timeout=5)
                
                if task is None:  # 停止信号
                    self.task_queue.put(None)  # 传递给下一个消费者
                    break
                
                logger.info(f"[Consumer-{self.consumer_id}] Processing {task.url}")
                
                # 模拟处理
                time.sleep(0.5)
                
                result = Result(
                    task_id=task.task_id,
                    data={'url': task.url, 'processed_by': self.consumer_id}
                )
                self.result_queue.put(result)
                self.tasks_processed += 1
                
            except queue.Empty:
                break
            except Exception as e:
                logger.error(f"[Consumer-{self.consumer_id}] Error: {e}")

class Pipeline:
    """流水线管理器"""
    def __init__(self, num_consumers: int = 3, max_queue_size: int = 100):
        self.task_queue = queue.PriorityQueue(maxsize=max_queue_size)  # 优先级队列
        self.result_queue = queue.Queue()
        self.num_consumers = num_consumers
        self.producer: Optional[Producer] = None
        self.consumers: list = []
    
    def run(self, urls: list):
        logger.info(f"Starting pipeline with {self.num_consumers} consumers")
        
        # 启动生产者
        self.producer = Producer(self.task_queue, urls)
        self.producer.start()
        
        # 启动消费者
        self.consumers = [
            Consumer(i, self.task_queue, self.result_queue) 
            for i in range(self.num_consumers)
        ]
        for c in self.consumers:
            c.start()
        
        # 等待完成
        self.producer.join()
        for c in self.consumers:
            c.join()
        
        # 收集结果
        results = []
        while not self.result_queue.empty():
            results.append(self.result_queue.get())
        
        return sorted(results, key=lambda x: x.task_id)

def main():
    pipeline = Pipeline(num_consumers=3)
    urls = [f'http://example.com/item/{i}' for i in range(20)]
    
    start = time.time()
    results = pipeline.run(urls)
    elapsed = time.time() - start
    
    logger.info(f"\nCompleted {len(results)} tasks in {elapsed:.2f}s")
    for r in results[:5]:
        logger.info(f"  Task {r.task_id}: {r.data}")

if __name__ == '__main__':
    main()

武器5:信号量 + 令牌桶

适用场景:API限流控制(每秒N次请求)

# concurrent_rate_limit.py - MonkeyCode生成
import time
import threading
import requests
from collections import deque
from typing import Callable, Any

class TokenBucket:
    """令牌桶算法:限制请求速率"""
    def __init__(self, rate: float, capacity: int):
        """
        rate: 每秒产生的令牌数
        capacity: 桶的容量
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self.lock = threading.Lock()
    
    def acquire(self, tokens: int = 1, timeout: float = None) -> bool:
        """
        获取令牌,超时则返回False
        """
        deadline = time.time() + timeout if timeout else float('inf')
        
        with self.lock:
            # 补充令牌
            now = time.time()
            elapsed = now - self.last_update
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now
            
            # 等待令牌
            while self.tokens < tokens:
                if time.time() >= deadline:
                    return False
                time.sleep(0.01)
            
            self.tokens -= tokens
            return True

class SlidingWindow:
    """滑动窗口算法:更精确的限流"""
    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = deque()
        self.lock = threading.Lock()
    
    def is_allowed(self) -> bool:
        with self.lock:
            now = time.time()
            
            # 清除窗口外的请求
            while self.requests and self.requests[0] < now - self.window_seconds:
                self.requests.popleft()
            
            if len(self.requests) < self.max_requests:
                self.requests.append(now)
                return True
            return False
    
    def wait_and_acquire(self):
        """等待直到获取许可"""
        while not self.is_allowed():
            time.sleep(0.1)

def rate_limited_request(url: str, limiter: TokenBucket, session: requests.Session) -> dict:
    """带限流的请求"""
    if not limiter.acquire(timeout=5):
        return {'error': 'Rate limit exceeded, timeout waiting for token'}
    
    resp = session.get(url)
    return {'status': resp.status_code, 'url': url}

def main():
    # 每秒10个请求,令牌桶容量20
    limiter = TokenBucket(rate=10, capacity=20)
    
    urls = [f'https://httpbin.org/uuid' for _ in range(30)]
    
    session = requests.Session()
    results = []
    
    start = time.time()
    for url in urls:
        result = rate_limited_request(url, limiter, session)
        results.append(result)
    
    elapsed = time.time() - start
    
    print(f"Completed 30 requests in {elapsed:.2f}s")
    print(f"Expected (30/10={3.0}s), Actual: {elapsed:.2f}s")
    print(f"Rate: {30/elapsed:.1f} req/s")

if __name__ == '__main__':
    main()

武器6:并发数据结构

适用场景:多线程安全的数据收集

# concurrent_datastructures.py - MonkeyCode生成
import threading
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from typing import List, Dict
from collections import defaultdict
import time

@dataclass
class ConcurrentCounter:
    """线程安全的计数器"""
    _lock: threading.Lock = field(default_factory=threading.Lock)
    _counts: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
    
    def increment(self, key: str, value: int = 1):
        with self._lock:
            self._counts[key] += value
    
    def get(self, key: str) -> int:
        with self._lock:
            return self._counts.get(key, 0)
    
    def get_all(self) -> Dict[str, int]:
        with self._lock:
            return dict(self._counts)

class ConcurrentList:
    """线程安全的列表"""
    def __init__(self):
        self._lock = threading.Lock()
        self._items: List = []
    
    def append(self, item):
        with self._lock:
            self._items.append(item)
    
    def extend(self, items):
        with self._lock:
            self._items.extend(items)
    
    def get_all(self) -> List:
        with self._lock:
            return list(self._items)
    
    def __len__(self):
        with self._lock:
            return len(self._items)

class ThreadSafeCache:
    """线程安全的LRU缓存"""
    def __init__(self, maxsize: int = 1000):
        self.maxsize = maxsize
        self._lock = threading.RLock()
        self._cache: Dict[str, any] = {}
        self._access_order: List[str] = []
    
    def get(self, key: str):
        with self._lock:
            if key in self._cache:
                # 移动到末尾(最新访问)
                self._access_order.remove(key)
                self._access_order.append(key)
                return self._cache[key]
            return None
    
    def set(self, key: str, value):
        with self._lock:
            if key in self._cache:
                self._cache[key] = value
                self._access_order.remove(key)
                self._access_order.append(key)
            else:
                if len(self._cache) >= self.maxsize:
                    # 淘汰最老的
                    oldest = self._access_order.pop(0)
                    del self._cache[oldest]
                
                self._cache[key] = value
                self._access_order.append(key)

def benchmark_concurrent():
    """测试并发数据结构的性能"""
    
    # 并发写入测试
    counter = ConcurrentCounter()
    items = ConcurrentList()
    
    def worker(start, end):
        for i in range(start, end):
            counter.increment('total')
            counter.increment(f'worker_{i % 4}')
            items.append(i)
    
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = [executor.submit(worker, i*1000, (i+1)*1000) for i in range(8)]
        [f.result() for f in futures]
    
    print(f"Counter total: {counter.get('total')}")
    print(f"Counter worker_0: {counter.get('worker_0')}")
    print(f"List length: {len(items)}")
    
    # LRU Cache测试
    cache = ThreadSafeCache(maxsize=100)
    
    for i in range(150):
        cache.set(f'key_{i}', f'value_{i}')
    
    # 检查缓存命中率
    hits = sum(1 for i in range(50, 150) if cache.get(f'key_{i}'))
    print(f"Cache hits (50-150): {hits}/100")
    
    # 0-49应该被淘汰了
    misses = sum(1 for i in range(50) if cache.get(f'key_{i}') is None)
    print(f"Cache misses (0-49): {misses}/50 (should be ~50 evictions)")

if __name__ == '__main__':
    benchmark_concurrent()

武器7:协程 + 线程混合模式

适用场景:既需要高并发IO,又有CPU密集型处理

# concurrent_hybrid.py - MonkeyCode生成
import asyncio
import threading
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import time
import aiohttp

def cpu_task(n: int) -> int:
    """CPU密集型任务"""
    # 快速计算
    return sum(i*i for i in range(n))

async def fetch_and_process(urls: list, cpu_workers: int = 4):
    """混合模式:异步IO + 进程池CPU处理"""
    
    # 阶段1:异步并发获取数据
    async def fetch(session):
        results = []
        async with session.get('https://httpbin.org/json') as resp:
            data = await resp.json()
            results.append(data)
        return results
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session) for _ in range(10)]
        fetched = await asyncio.gather(*tasks)
    
    # 阶段2:进程池CPU处理
    all_data = [item for sublist in fetched for item in sublist]
    
    with ProcessPoolExecutor(max_workers=cpu_workers) as executor:
        futures = [executor.submit(cpu_task, 10000) for _ in range(20)]
        processed = [f.result() for f in concurrent.futures.as_completed(futures)]
    
    return {'fetched': len(all_data), 'processed': len(processed)}

async def main():
    start = time.time()
    result = await fetch_and_process([])
    elapsed = time.time() - start
    print(f"Hybrid approach: {elapsed:.2f}s")
    print(f"Results: {result}")

if __name__ == '__main__':
    asyncio.run(main())

7种武器对比总结

武器 适用场景 并发数 复杂度 GIL
ThreadPoolExecutor IO密集(爬虫/API) 中等(10-100) 受限
asyncio+aiohttp 超高并发IO 高(1000+) 不受影响
ProcessPoolExecutor CPU密集(计算) 低(核心数) 不受影响
生产者-消费者 任务队列 可扩展 受限
令牌桶/滑动窗口 API限流 任意 不受影响
并发数据结构 多线程数据共享 可扩展 受限
协程+线程混合 混合负载 部分突破

用MonkeyCode生成代码时,明确告诉它你的场景,它会选择合适的方案。

文章摘自:https://www.cnblogs.com/jaryn/p/20218912