人工智能
Python 中的异步 LLM API 调用:综合指南

作为开发人员和数据科学家,我们经常需要通过 API 与这些强大的模型交互。然而,当我们的应用程序变得更加复杂和庞大时,高效和高性能的 API 交互变得至关重要。这就是异步编程的用处,它允许我们在使用 LLM API 时最大化吞吐量和最小化延迟。
在这份综合指南中,我们将探索 Python 中异步 LLM API 调用的世界。我们将涵盖从异步编程基础到处理复杂工作流的高级技术。到本文结束时,您将对如何利用异步编程来增强您的 LLM 应用程序有一个扎实的理解。
在我们深入异步 LLM API 调用的具体细节之前,让我们先建立异步编程概念的坚实基础。
异步编程允许多个操作并发执行而不会阻塞主执行线程。在 Python 中,这主要是通过 asyncio 模块实现的,该模块提供了一个框架,用于使用协程、事件循环和未来来编写并发代码。
关键概念:
- 协程:使用 async def 定义的函数,可以暂停和恢复。
- 事件循环:管理和运行异步任务的中心执行机制。
- 可等待对象:可以使用 await 关键字的对象(协程、任务、未来)。
以下是一个简单的示例来说明这些概念:
import asyncio
async def greet(name):
await asyncio.sleep(1) # 模拟 I/O 操作
print(f"Hello, {name}!")
async def main():
await asyncio.gather(
greet("Alice"),
greet("Bob"),
greet("Charlie")
)
asyncio.run(main())
在这个示例中,我们定义一个异步函数 greet,它使用 asyncio.sleep() 模拟一个 I/O 操作。 main 函数使用 asyncio.gather() 并发运行多个问候。尽管有睡眠延迟,但所有三个问候都会在大约 1 秒后打印出来,展示了异步执行的力量。
异步 LLM API 调用的必要性
当我们处理 LLM API 时,我们经常遇到需要进行多个 API 调用的场景,无论是顺序还是并行。传统的同步代码可能会导致显著的性能瓶颈,尤其是在处理高延迟操作(如网络请求到 LLM 服务)时。
考虑一个场景,我们需要使用 LLM API 为 100 个不同的文章生成摘要。使用同步方法,每个 API 调用都会阻塞,直到它收到响应,可能需要几分钟才能完成所有请求。另一方面,异步方法允许我们同时启动多个 API 调用,大大减少了总执行时间。
设置您的环境
要开始使用异步 LLM API 调用,您需要使用必要的库设置您的 Python 环境。您需要以下内容:
- Python 3.7 或更高版本(用于本地 asyncio 支持)
- aiohttp:一个异步 HTTP 客户端库
- openai:官方 OpenAI Python 客户端(如果您使用 OpenAI 的 GPT 模型)
- langchain:一个用于构建 LLM 应用程序的框架(可选,但对于复杂工作流推荐)
您可以使用 pip 安装这些依赖项:
pip install aiohttp openai langchain <div class="relative flex flex-col rounded-lg">
使用 asyncio 和 aiohttp 的基本异步 LLM API 调用
让我们首先使用 aiohttp 对 LLM API 进行一个简单的异步调用。我们将使用 OpenAI 的 GPT-3.5 API 作为示例,但这些概念也适用于其他 LLM API。
import asyncio
import aiohttp
from openai import AsyncOpenAI
async def generate_text(prompt, client):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def main():
prompts = [
"用简单的术语解释量子计算。",
"写一首关于人工智能的俳句。",
"描述光合作用的过程。"
]
async with AsyncOpenAI() as client:
tasks = [generate_text(prompt, client) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())
在这个示例中,我们定义一个异步函数 generate_text,它使用 AsyncOpenAI 客户端调用 OpenAI API。 main 函数为不同的提示创建多个任务,并使用 asyncio.gather() 并发运行它们。
这种方法使我们能够同时向 LLM API 发送多个请求,大大减少了处理所有提示所需的总时间。
高级技术:批处理和并发控制
虽然前面的示例演示了异步 LLM API 调用的基础知识,但实际应用程序通常需要更复杂的方法。让我们探索两个重要的技术:批处理请求和控制并发性。
批处理请求:处理大量提示时,将它们分成组而不是为每个提示发送单独的请求通常更高效。这减少了多个 API 调用的开销,并且可以带来更好的性能。
import asyncio
from openai import AsyncOpenAI
async def process_batch(batch, client):
responses = await asyncio.gather(*[
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
) for prompt in batch
])
return [response.choices[0].message.content for response in responses]
async def main():
prompts = [f"告诉我关于数字 {i} 的一个事实" for i in range(100)]
batch_size = 10
async with AsyncOpenAI() as client:
results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i+batch_size]
batch_results = await process_batch(batch, client)
results.extend(batch_results)
for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())
并发控制:虽然异步编程允许并发执行,但控制并发级别以避免让 API 服务器不堪重负或超过速率限制至关重要。我们可以使用 asyncio.Semaphore 来实现这一点。
import asyncio
from openai import AsyncOpenAI
async def generate_text(prompt, client, semaphore):












