当前位置:首页 > Python > 正文

Python协程并发控制(从零开始掌握asyncio并发与限流技巧)

在现代Python开发中,Python协程已成为处理高并发I/O密集型任务的首选方案。相比传统的多线程或多进程模型,协程以更轻量、更高效的方式实现并发,尤其适合网络请求、文件读写等场景。然而,如果不加以控制,并发数量过多可能导致系统资源耗尽或被目标服务器限流。本文将带你从零开始理解asyncio并发控制,并学会如何安全地限制并发数量。

什么是协程?

协程(Coroutine)是一种可以暂停和恢复执行的函数。在Python中,通过 async def 定义协程函数,使用 await 暂停执行,等待异步操作完成。

为什么需要并发控制?

假设你要同时向1000个网站发起HTTP请求。如果全部并发执行,可能瞬间耗尽内存、打开太多连接,甚至触发目标服务器的反爬机制。因此,我们需要一种机制来限制同时运行的协程数量——这就是协程限流的核心。

Python协程并发控制(从零开始掌握asyncio并发与限流技巧) Python协程  asyncio并发控制 异步编程入门 协程限流 第1张

使用 asyncio.Semaphore 实现并发控制

asyncio.Semaphore 是Python标准库提供的信号量工具,用于限制同时访问某段代码的协程数量。下面是一个完整示例:

import asyncioimport aiohttp# 并发限制:最多同时运行5个协程semaphore = asyncio.Semaphore(5)async def fetch_url(session, url):    async with semaphore:  # 获取信号量        print(f"正在请求: {url}")        try:            async with session.get(url) as response:                await response.text()                print(f"完成请求: {url}")        except Exception as e:            print(f"请求失败 {url}: {e}")async def main():    urls = [        "https://httpbin.org/delay/1",        "https://httpbin.org/delay/1",        "https://httpbin.org/delay/1",        "https://httpbin.org/delay/1",        "https://httpbin.org/delay/1",        "https://httpbin.org/delay/1",        "https://httpbin.org/delay/1",    ]    async with aiohttp.ClientSession() as session:        tasks = [fetch_url(session, url) for url in urls]        await asyncio.gather(*tasks)# 运行主协程asyncio.run(main())  

在这个例子中,我们创建了一个容量为5的信号量。每次调用 fetch_url 时,必须先获取信号量(async with semaphore),当并发数达到5时,后续协程会自动等待,直到有协程释放信号量。

使用 asyncio.as_completed 控制任务流

另一种方式是使用 asyncio.as_completed,它允许你逐个处理已完成的任务,从而自然限制并发:

async def controlled_fetch(urls, max_concurrent=5):    async with aiohttp.ClientSession() as session:        pending = set()        results = []        for url in urls:            if len(pending) >= max_concurrent:                # 等待任意一个任务完成                done, pending = await asyncio.wait(                    pending, return_when=asyncio.FIRST_COMPLETED                )                results.extend(done)            task = asyncio.create_task(fetch_url(session, url))            pending.add(task)        # 等待剩余任务        if pending:            done, _ = await asyncio.wait(pending)            results.extend(done)        return results  

总结:掌握异步编程入门的关键

通过本文,你已经学会了如何使用 asyncio.Semaphore 和任务调度来实现Python协程的并发控制。记住以下几点:

  • 不要无限制地并发大量协程;
  • 使用信号量是最简单有效的限流方式;
  • 合理设置并发上限(如5~20),根据实际网络和系统资源调整;
  • 结合 aiohttpaiomysql 等异步库发挥最大效能。

现在,你已经具备了进行安全、高效asyncio并发控制的能力。快去优化你的爬虫、API客户端或数据处理脚本吧!