Connect with us

人工智能

Python 中的异步 LLM API 调用:综合指南

mm
Asynchronous LLM API Calls in Python: A Comprehensive Guide

作为开发人员和数据科学家,我们经常需要通过 API 与这些强大的模型交互。然而,当我们的应用程序变得更加复杂和庞大时,高效和高性能的 API 交互变得至关重要。这就是异步编程的用处,它允许我们在使用 LLM API 时最大化吞吐量和最小化延迟。

在这份综合指南中,我们将探索 Python 中异步 LLM API 调用的世界。我们将涵盖从异步编程基础到处理复杂工作流的高级技术。到本文结束时,您将对如何利用异步编程来增强您的 LLM 应用程序有一个扎实的理解。

在我们深入异步 LLM API 调用的具体细节之前,让我们先建立异步编程概念的坚实基础。

异步编程允许多个操作并发执行而不会阻塞主执行线程。在 Python 中,这主要是通过 asyncio 模块实现的,该模块提供了一个框架,用于使用协程、事件循环和未来来编写并发代码。

关键概念:

  • 协程:使用 async def 定义的函数,可以暂停和恢复。
  • 事件循环:管理和运行异步任务的中心执行机制。
  • 可等待对象:可以使用 await 关键字的对象(协程、任务、未来)。

以下是一个简单的示例来说明这些概念:

import asyncio

async def greet(name):
await asyncio.sleep(1) # 模拟 I/O 操作
print(f"Hello, {name}!")

async def main():
await asyncio.gather(
greet("Alice"),
greet("Bob"),
greet("Charlie")
)

asyncio.run(main())

在这个示例中,我们定义一个异步函数 greet,它使用 asyncio.sleep() 模拟一个 I/O 操作。 main 函数使用 asyncio.gather() 并发运行多个问候。尽管有睡眠延迟,但所有三个问候都会在大约 1 秒后打印出来,展示了异步执行的力量。

异步 LLM API 调用的必要性

当我们处理 LLM API 时,我们经常遇到需要进行多个 API 调用的场景,无论是顺序还是并行。传统的同步代码可能会导致显著的性能瓶颈,尤其是在处理高延迟操作(如网络请求到 LLM 服务)时。

考虑一个场景,我们需要使用 LLM API 为 100 个不同的文章生成摘要。使用同步方法,每个 API 调用都会阻塞,直到它收到响应,可能需要几分钟才能完成所有请求。另一方面,异步方法允许我们同时启动多个 API 调用,大大减少了总执行时间。

设置您的环境

要开始使用异步 LLM API 调用,您需要使用必要的库设置您的 Python 环境。您需要以下内容:

  • Python 3.7 或更高版本(用于本地 asyncio 支持)
  • aiohttp:一个异步 HTTP 客户端库
  • openai:官方 OpenAI Python 客户端(如果您使用 OpenAI 的 GPT 模型)
  • langchain:一个用于构建 LLM 应用程序的框架(可选,但对于复杂工作流推荐)

您可以使用 pip 安装这些依赖项:


pip install aiohttp openai langchain
<div class="relative flex flex-col rounded-lg">

使用 asyncio 和 aiohttp 的基本异步 LLM API 调用

让我们首先使用 aiohttp 对 LLM API 进行一个简单的异步调用。我们将使用 OpenAI 的 GPT-3.5 API 作为示例,但这些概念也适用于其他 LLM API。

import asyncio
import aiohttp
from openai import AsyncOpenAI

async def generate_text(prompt, client):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content

async def main():
prompts = [
"用简单的术语解释量子计算。",
"写一首关于人工智能的俳句。",
"描述光合作用的过程。"
]

async with AsyncOpenAI() as client:
tasks = [generate_text(prompt, client) for prompt in prompts]
results = await asyncio.gather(*tasks)

for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

在这个示例中,我们定义一个异步函数 generate_text,它使用 AsyncOpenAI 客户端调用 OpenAI API。 main 函数为不同的提示创建多个任务,并使用 asyncio.gather() 并发运行它们。

这种方法使我们能够同时向 LLM API 发送多个请求,大大减少了处理所有提示所需的总时间。

高级技术:批处理和并发控制

虽然前面的示例演示了异步 LLM API 调用的基础知识,但实际应用程序通常需要更复杂的方法。让我们探索两个重要的技术:批处理请求和控制并发性。

批处理请求:处理大量提示时,将它们分成组而不是为每个提示发送单独的请求通常更高效。这减少了多个 API 调用的开销,并且可以带来更好的性能。

import asyncio
from openai import AsyncOpenAI

async def process_batch(batch, client):
responses = await asyncio.gather(*[
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
) for prompt in batch
])
return [response.choices[0].message.content for response in responses]

async def main():
prompts = [f"告诉我关于数字 {i} 的一个事实" for i in range(100)]
batch_size = 10

async with AsyncOpenAI() as client:
results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i+batch_size]
batch_results = await process_batch(batch, client)
results.extend(batch_results)

for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

并发控制:虽然异步编程允许并发执行,但控制并发级别以避免让 API 服务器不堪重负或超过速率限制至关重要。我们可以使用 asyncio.Semaphore 来实现这一点。

import asyncio
from openai import AsyncOpenAI

async def generate_text(prompt, client, semaphore):

I have spent the past five years immersing myself in the fascinating world of Machine Learning and Deep Learning. My passion and expertise have led me to contribute to over 50 diverse software engineering projects, with a particular focus on AI/ML. My ongoing curiosity has also drawn me toward Natural Language Processing, a field I am eager to explore further.