Искусственный интеллект
Асинхронные вызовы LLM API в Python: подробное руководство

Как разработчики и ученые dta, мы часто сталкиваемся с необходимостью взаимодействия с этими мощными моделями через API. Однако по мере того, как наши приложения становятся сложнее и масштабнее, потребность в эффективных и производительных взаимодействиях API становится решающей. Именно здесь асинхронное программирование блистает, позволяя нам максимизировать пропускную способность и минимизировать задержку при работе с LLM API.
В этом подробном руководстве мы рассмотрим мир асинхронных вызовов LLM API в Python. Мы рассмотрим всё: от основ асинхронного программирования до продвинутых методов управления сложными рабочими процессами. К концу статьи вы получите чёткое представление о том, как использовать асинхронное программирование для повышения эффективности ваших приложений на базе LLM.
Прежде чем углубляться в специфику асинхронных вызовов LLM API, давайте заложим прочную основу в концепциях асинхронного программирования.
Асинхронное программирование позволяет выполнять несколько операций одновременно, не блокируя основной поток выполнения. В Python это достигается в первую очередь за счет асинцио модуль, который предоставляет основу для написания параллельного кода с использованием сопрограмм, циклов событий и фьючерсов.
Ключевые идеи:
- Сопрограммы: Функции, определенные с помощью асинхронное определение который можно приостановить и возобновить.
- Цикл событий: Центральный механизм исполнения, который управляет и запускает асинхронные задачи.
- Ожидаемые: Объекты, которые можно использовать с ключевым словом await (сопрограммы, задачи, фьючерсы).
Вот простой пример, иллюстрирующий эти концепции:
import asyncio
async def greet(name):
await asyncio.sleep(1) # Simulate an I/O operation
print(f"Hello, {name}!")
async def main():
await asyncio.gather(
greet("Alice"),
greet("Bob"),
greet("Charlie")
)
asyncio.run(main())
В этом примере мы определяем асинхронную функцию greet который имитирует операцию ввода-вывода с asyncio.sleep(), main функция использует asyncio.gather() для одновременного запуска нескольких приветствий. Несмотря на задержку сна, все три приветствия будут напечатаны примерно через 1 секунду, демонстрируя мощь асинхронного выполнения.
Необходимость асинхронности в вызовах API LLM
При работе с LLM API мы часто сталкиваемся со сценариями, в которых нам нужно выполнить несколько вызовов API, последовательно или параллельно. Традиционный синхронный код может привести к значительным узким местам производительности, особенно при работе с операциями с высокой задержкой, такими как сетевые запросы к службам LLM.
Рассмотрим сценарий, в котором нам нужно сгенерировать сводки для 100 различных статей с использованием LLM API. При синхронном подходе каждый вызов API будет блокироваться до тех пор, пока не получит ответ, что может занять несколько минут для выполнения всех запросов. Асинхронный подход, с другой стороны, позволяет нам инициировать несколько вызовов API одновременно, что значительно сокращает общее время выполнения.
Настройка вашей среды
Чтобы начать работу с асинхронными вызовами LLM API, вам потребуется настроить среду Python с необходимыми библиотеками. Вот что вам понадобится:
- Python 3.7 или выше (для собственной поддержки asyncio)
- айоhttp: Асинхронная клиентская HTTP-библиотека
- openai: Официальный OpenAI Python-клиент (если вы используете модели GPT OpenAI)
- длинная цепь: Фреймворк для создания приложений с LLM (необязательно, но рекомендуется для сложных рабочих процессов)
Вы можете установить эти зависимости с помощью pip:
pip install aiohttp openai langchain <div class="relative flex flex-col rounded-lg">
Базовые асинхронные вызовы LLM API с помощью asyncio и aiohttp
Начнём с простого асинхронного вызова LLM API с помощью aiohttp. В качестве примера мы будем использовать API OpenAI GPT-3.5, но эти концепции применимы и к другим LLM API.
import asyncio
import aiohttp
from openai import AsyncOpenAI
async def generate_text(prompt, client):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def main():
prompts = [
"Explain quantum computing in simple terms.",
"Write a haiku about artificial intelligence.",
"Describe the process of photosynthesis."
]
async with AsyncOpenAI() as client:
tasks = [generate_text(prompt, client) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())
В этом примере мы определяем асинхронную функцию generate_text который делает вызов API OpenAI с помощью клиента AsyncOpenAI. main функция создает несколько задач для разных запросов и применений asyncio.gather() для их одновременного запуска.
Такой подход позволяет нам отправлять несколько запросов к API LLM одновременно, что значительно сокращает общее время, необходимое для обработки всех запросов.
Расширенные методы: пакетная обработка и управление параллелизмом
Хотя предыдущий пример демонстрирует основы асинхронных вызовов LLM API, реальные приложения часто требуют более сложных подходов. Давайте рассмотрим два важных метода: пакетирование запросов и управление параллельными процессами.
Пакетирование запросов: при работе с большим количеством запросов часто эффективнее объединять их в группы, чем отправлять отдельные запросы для каждого запроса. Это снижает накладные расходы на множественные вызовы API и может повысить производительность.
import asyncio
from openai import AsyncOpenAI
async def process_batch(batch, client):
responses = await asyncio.gather(*[
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
) for prompt in batch
])
return [response.choices[0].message.content for response in responses]
async def main():
prompts = [f"Tell me a fact about number {i}" for i in range(100)]
batch_size = 10
async with AsyncOpenAI() as client:
results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i+batch_size]
batch_results = await process_batch(batch, client)
results.extend(batch_results)
for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())
Управление параллелизмом: хотя асинхронное программирование допускает параллельное выполнение, важно контролировать уровень параллелизма, чтобы избежать перегрузки API-сервера или превышения ограничений по скорости. Для этого можно использовать asyncio.Semaphore.
import asyncio
from openai import AsyncOpenAI
async def generate_text(prompt, client, semaphore):
async with semaphore:
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def main():
prompts = [f"Tell me a fact about number {i}" for i in range(100)]
max_concurrent_requests = 5
semaphore = asyncio.Semaphore(max_concurrent_requests)
async with AsyncOpenAI() as client:
tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())
В этом примере мы используем семафор, чтобы ограничить количество одновременных запросов до 5, гарантируя, что мы не перегрузим сервер API.
Обработка ошибок и повторные попытки в асинхронных вызовах LLM
При работе с внешними API крайне важно реализовать надежную обработку ошибок и механизмы повторных попыток. Давайте улучшим наш код для обработки распространённых ошибок и реализуем экспоненциальную задержку для повторных попыток.
import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class APIError(Exception):
pass
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
try:
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
print(f"Error occurred: {e}")
raise APIError("Failed to generate text")
async def process_prompt(prompt, client, semaphore):
async with semaphore:
try:
result = await generate_text_with_retry(prompt, client)
return prompt, result
except APIError:
return prompt, "Failed to generate response after multiple attempts."
async def main():
prompts = [f"Tell me a fact about number {i}" for i in range(20)]
max_concurrent_requests = 5
semaphore = asyncio.Semaphore(max_concurrent_requests)
async with AsyncOpenAI() as client:
tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, result in results:
print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())
Эта улучшенная версия включает в себя:
- Обычай
APIErrorисключение для ошибок, связанных с API. - A
generate_text_with_retryфункция украшена@retryиз библиотеки Tenacity, реализующей экспоненциальную задержку. - Обработка ошибок в
process_promptфункция обнаружения и сообщения о сбоях.
Оптимизация производительности: потоковые ответы
Для генерации длинного контента потоковые ответы могут значительно улучшить воспринимаемую производительность вашего приложения. Вместо того, чтобы ждать весь ответ, вы можете обрабатывать и отображать фрагменты текста по мере их поступления.
import asyncio
from openai import AsyncOpenAI
async def stream_text(prompt, client):
stream = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_response += content
print(content, end='', flush=True)
print("\n")
return full_response
async def main():
prompt = "Write a short story about a time-traveling scientist."
async with AsyncOpenAI() as client:
result = await stream_text(prompt, client)
print(f"Full response:\n{result}")
asyncio.run(main())
Этот пример демонстрирует, как передавать поток ответа от API, печатая каждый фрагмент по мере его поступления. Этот подход особенно полезен для приложений чата или любого сценария, где вы хотите предоставить пользователю обратную связь в реальном времени.
Создание асинхронных рабочих процессов с помощью LangChain
Для более сложных приложений на базе LLM Фреймворк Лангчейн Предоставляет высокоуровневую абстракцию, которая упрощает процесс объединения нескольких вызовов LLM и интеграции других инструментов. Рассмотрим пример использования LangChain с асинхронными возможностями:
В этом примере показано, как LangChain можно использовать для создания более сложных рабочих процессов с потоковой передачей и асинхронным выполнением. AsyncCallbackManager и StreamingStdOutCallbackHandler обеспечить потоковую передачу сгенерированного контента в режиме реального времени.
import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
async def generate_story(topic):
llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
prompt = PromptTemplate(
input_variables=["topic"],
template="Write a short story about {topic}."
)
chain = LLMChain(llm=llm, prompt=prompt)
return await chain.arun(topic=topic)
async def main():
topics = ["a magical forest", "a futuristic city", "an underwater civilization"]
tasks = [generate_story(topic) for topic in topics]
stories = await asyncio.gather(*tasks)
for topic, story in zip(topics, stories):
print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n")
asyncio.run(main())
Обслуживание асинхронных приложений LLM с помощью FastAPI
Чтобы сделать ваше асинхронное LLM-приложение доступным как веб-сервис, FastAPI — отличный выбор благодаря встроенной поддержке асинхронных операций. Вот пример создания простой конечной точки API для генерации текста:
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
class GenerationRequest(BaseModel):
prompt: str
class GenerationResponse(BaseModel):
generated_text: str
@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": request.prompt}]
)
generated_text = response.choices[0].message.content
# Simulate some post-processing in the background
background_tasks.add_task(log_generation, request.prompt, generated_text)
return GenerationResponse(generated_text=generated_text)
async def log_generation(prompt: str, generated_text: str):
# Simulate logging or additional processing
await asyncio.sleep(2)
print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Это приложение FastAPI создает конечную точку /generate который принимает приглашение и возвращает сгенерированный текст. Он также демонстрирует, как использовать фоновые задачи для дополнительной обработки, не блокируя ответ.
Лучшие практики и распространенные ошибки
При работе с асинхронными API LLM помните о следующих рекомендациях:
- Использовать пул соединений: При выполнении нескольких запросов повторно используйте соединения, чтобы сократить накладные расходы.
- Внедрить правильную обработку ошибок: Всегда учитывайте проблемы с сетью, ошибки API и неожиданные ответы.
- Соблюдайте ограничения по скорости: Используйте семафоры или другие механизмы управления параллелизмом, чтобы избежать перегрузки API.
- Мониторинг и регистрация: Внедрите комплексное ведение журнала для отслеживания производительности и выявления проблем.
- Используйте потоковую передачу для длинного контента: Это улучшает пользовательский опыт и позволяет выполнять раннюю обработку частичных результатов.












