Connect with us

Асинхронные вызовы LLM API в Python: Всесторонний гид

Искусственный интеллект

Асинхронные вызовы LLM API в Python: Всесторонний гид

mm
Asynchronous LLM API Calls in Python: A Comprehensive Guide

Как разработчики и ученые-данные, мы часто обнаруживаем себя в ситуации, когда нам нужно взаимодействовать с этими мощными моделями через API. Однако, когда наши приложения растут в сложности и масштабе, необходимость в эффективных и производительных взаимодействиях с API становится крайне важной. Именно здесь асинхронное программирование блестит, позволяя нам максимизировать пропускную способность и минимизировать задержку при работе с LLM API.

В этом всестороннем гиде мы исследуем мир асинхронных вызовов LLM API в Python. Мы рассмотрим все, от основ асинхронного программирования до продвинутых техник для обработки сложных рабочих процессов. К концу этой статьи у вас будет прочное понимание того, как использовать асинхронное программирование дляturbo-зарядки ваших приложений, работающих на LLM.

Прежде чем мы погрузимся в конкретные детали асинхронных вызовов LLM API, давайте установим прочный фундамент в концепциях асинхронного программирования.

Асинхронное программирование позволяет выполнять несколько операций одновременно без блокировки основной нити выполнения. В Python это в основном достигается с помощью модуля asyncio, который предоставляет框워크 для написания конкьюрентного кода с помощью корутин, циклов событий и фьючерсов.

Ключевые концепции:

  • Корутины: Функции, определенные с async def, которые можно приостановить и возобновить.
  • Цикл событий: Центральный механизм выполнения, который управляет и запускает асинхронные задачи.
  • Объекты, ожидающие: Объекты, которые можно использовать с ключевым словом await (корутины, задачи, фьючерсы).

Вот простой пример, иллюстрирующий эти концепции:

import asyncio

async def greet(name):
await asyncio.sleep(1) # Симулируем операцию ввода/вывода
print(f"Привет, {name}!")

async def main():
await asyncio.gather(
greet("Алиса"),
greet("Боб"),
greet("Чарли")
)

asyncio.run(main())

В этом примере мы определяем асинхронную функцию greet, которая симулирует операцию ввода/вывода с помощью asyncio.sleep(). Функция main использует asyncio.gather() для запуска нескольких приветствий одновременно. Несмотря на задержку, все три приветствия будут напечатаны после примерно 1 секунды, демонстрируя силу асинхронного выполнения.

Необходимость асинхронности в вызовах LLM API

Когда мы работаем с LLM API, мы часто сталкиваемся со сценариями, когда нам нужно выполнить несколько вызовов API, либо последовательно, либо параллельно. Традиционный синхронный код может привести к значительным проблемам с производительностью, особенно при работе с операциями высокого уровня задержки, такими как сетевые запросы к службам LLM.

Рассмотрим сценарий, когда нам нужно сгенерировать резюме для 100 разных статей с помощью LLM API. С синхронным подходом каждый вызов API будет блокироваться до тех пор, пока не получит ответ, потенциально занимая несколько минут для завершения всех запросов. Асинхронный подход, с другой стороны, позволяет нам инициировать несколько вызовов API одновременно, что значительно сокращает общее время выполнения.

Настройка среды

Чтобы начать работать с асинхронными вызовами LLM API, вам нужно настроить среду Python с необходимыми библиотеками. Вот что вам нужно:

  • Python 3.7 или выше (для родной поддержки asyncio)
  • aiohttp: Асинхронная библиотека HTTP-клиента
  • openai: Официальный клиент OpenAI для Python (если вы используете модели GPT от OpenAI)
  • langchain: Фреймворк для построения приложений с LLM (необязательно, но рекомендуется для сложных рабочих процессов)

Вы можете установить эти зависимости с помощью pip:


pip install aiohttp openai langchain
<div class="relative flex flex-col rounded-lg">

Базовые асинхронные вызовы LLM API с asyncio и aiohttp

Давайте начнем с простого асинхронного вызова LLM API с помощью aiohttp. Мы будем использовать API GPT-3.5 от OpenAI в качестве примера, но концепции применяются и к другим LLM API.

import asyncio
import aiohttp
from openai import AsyncOpenAI

async def generate_text(prompt, client):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content

async def main():
prompts = [
"Объясните квантовые вычисления в простых терминах.",
"Напишите хокку о искусственном интеллекте.",
"Опишите процесс фотосинтеза."
]

async with AsyncOpenAI() as client:
tasks = [generate_text(prompt, client) for prompt in prompts]
results = await asyncio.gather(*tasks)

for prompt, result in zip(prompts, results):
print(f"Промпт: {prompt}\nОтвет: {result}\n")

asyncio.run(main())

В этом примере мы определяем асинхронную функцию generate_text, которая выполняет вызов API OpenAI с помощью клиента AsyncOpenAI. Функция main создает несколько задач для разных промптов и использует asyncio.gather() для запуска их одновременно.

Этот подход позволяет нам отправлять несколько запросов в LLM API одновременно, что значительно сокращает общее время, необходимое для обработки всех промптов.

Продвинутые техники: пакетная обработка и контроль конкьюрентности

Хотя предыдущий пример демонстрирует основы асинхронных вызовов LLM API, реальные приложения часто требуют более сложных подходов. Давайте исследуем два важных метода: пакетную обработку запросов и контроль конкьюрентности.

Пакетная обработка запросов: Когда мы имеем дело с большим количеством промптов, часто более эффективно группировать их в пакеты, а не отправлять отдельные запросы для каждого промпта. Это снижает накладные расходы на несколько вызовов API и может привести к лучшей производительности.

import asyncio
from openai import AsyncOpenAI

async def process_batch(batch, client):
responses = await asyncio.gather(*[
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
) for prompt in batch
])
return [response.choices[0].message.content for response in responses]

async def main():
prompts = [f"Расскажите факты о числе {i}" for i in range(100)]
batch_size = 10

async with AsyncOpenAI() as client:
results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i+batch_size]
batch_results = await process_batch(batch, client)
results.extend(batch_results)

for prompt, result in zip(prompts, results):
print(f"Промпт: {prompt}\nОтвет: {result}\n")

asyncio.run(main())

Контроль конкьюрентности: Хотя асинхронное программирование позволяет выполнять задачи одновременно, важно контролировать уровень конкьюрентности, чтобы не перегружать сервер API или превышать лимиты скорости. Мы можем использовать asyncio.Semaphore для этой цели.

import asyncio
from openai import AsyncOpenAI

async def generate_text(prompt, client, semaphore):
async with semaphore:
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content

async def main():
prompts = [f"Расскажите факты о числе {i}" for i in range(100)]
max_concurrent_requests = 5
semaphore = asyncio.Semaphore(max_concurrent_requests)

async with AsyncOpenAI() as client:
tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
results = await asyncio.gather(*tasks)

for prompt, result in zip(prompts, results):
print(f"Промпт: {prompt}\nОтвет: {result}\n")

asyncio.run(main())

В этом примере мы используем семафор, чтобы ограничить количество одновременных запросов до 5, гарантируя, что мы не перегружаем сервер API.

Обработка ошибок и повторные попытки в асинхронных вызовах LLM

Когда мы работаем с внешними API, крайне важно реализовать надежную обработку ошибок и механизмы повторных попыток. Давайте улучшим наш код, чтобы обработать общие ошибки и реализовать экспоненциальную задержку для повторных попыток.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential

class APIError(Exception):
pass

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
try:
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
print(f"Ошибка: {e}")
raise APIError("Не удалось сгенерировать текст")

async def process_prompt(prompt, client, semaphore):
async with semaphore:
try:
result = await generate_text_with_retry(prompt, client)
return prompt, result
except APIError:
return prompt, "Не удалось сгенерировать ответ после нескольких попыток."

async def main():
prompts = [f"Расскажите факты о числе {i}" for i in range(20)]
max_concurrent_requests = 5
semaphore = asyncio.Semaphore(max_concurrent_requests)

async with AsyncOpenAI() as client:
tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
results = await asyncio.gather(*tasks)

for prompt, result in results:
print(f"Промпт: {prompt}\nОтвет: {result}\n")

asyncio.run(main())

Эта улучшенная версия включает:

  • Пользовательское исключение APIError для ошибок, связанных с API.
  • Функцию generate_text_with_retry, украшенную @retry из библиотеки tenacity, реализующей экспоненциальную задержку.
  • Обработку ошибок в функции process_prompt для перехвата и сообщения о неудачах.

Оптимизация производительности: потоковая передача ответов

Для генерации длинного контента потоковая передача ответов может значительно улучшить воспринимаемую производительность вашего приложения. Вместо ожидания всего ответа вы можете обрабатывать и отображать фрагменты текста по мере их поступления.

import asyncio
from openai import AsyncOpenAI

async def stream_text(prompt, client):
stream = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True
)

full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_response += content
print(content, end='', flush=True)

print("\n")
return full_response

async def main():
prompt = "Напишите короткий рассказ о ученом, путешествующем во времени."

async with AsyncOpenAI() as client:
result = await stream_text(prompt, client)

print(f"Полный ответ:\n{result}")

asyncio.run(main())

Этот пример демонстрирует, как потоково передавать ответ от API, печатая каждый фрагмент по мере его поступления. Этот подход особенно полезен для чат-приложений или любых сценариев, когда вы хотите предоставить обратную связь в режиме реального времени пользователю.

Создание асинхронных рабочих процессов с LangChain

Для более сложных приложений, работающих на LLM, фреймворк LangChain предоставляет высокоуровневую абстракцию, которая упрощает процесс цепочки нескольких вызовов LLM и интеграции других инструментов. Давайте посмотрим на пример использования LangChain с асинхронными возможностями:

Этот пример показывает, как LangChain можно использовать для создания более сложных рабочих процессов с потоковой передачей и асинхронным выполнением. AsyncCallbackManager и StreamingStdOutCallbackHandler позволяют потоково передавать сгенерированный контент в режиме реального времени.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

async def generate_story(topic):
llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
prompt = PromptTemplate(
input_variables=["topic"],
template="Напишите короткий рассказ о {topic}."
)
chain = LLMChain(llm=llm, prompt=prompt)
return await chain.arun(topic=topic)

async def main():
topics = ["магический лес", "будущий город", "подводная цивилизация"]
tasks = [generate_story(topic) for topic in topics]
stories = await asyncio.gather(*tasks)

for topic, story in zip(topics, stories):
print(f"\nТема: {topic}\nИстория: {story}\n{'='*50}\n")

asyncio.run(main())

Развертывание асинхронных приложений LLM с FastAPI

Чтобы сделать ваше асинхронное приложение LLM доступным в качестве веб-сервиса, FastAPI является отличным выбором благодаря своей родной поддержке асинхронных операций. Вот пример того, как создать простой API-конечную точку для генерации текста:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

class GenerationRequest(BaseModel):
prompt: str

class GenerationResponse(BaseModel):
generated_text: str

@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": request.prompt}]
)
generated_text = response.choices[0].message.content

# Симулируем некоторые пост-обработки в фоне
background_tasks.add_task(log_generation, request.prompt, generated_text)

return GenerationResponse(generated_text=generated_text)

async def log_generation(prompt: str, generated_text: str):
# Симулируем логирование или дополнительную обработку
await asyncio.sleep(2)
print(f"Logged: Промпт '{prompt}' сгенерировал текст длиной {len(generated_text)}")

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Этот приложение FastAPI создает конечную точку /generate, которая принимает промпт и возвращает сгенерированный текст. Он также демонстрирует, как использовать задачи в фоне для дополнительной обработки без блокировки ответа.

Лучшие практики и распространенные ошибки

Когда вы работаете с асинхронными вызовами LLM API, помните о следующих лучших практиках:

  1. Используйте пул соединений: Когда вы выполняете несколько запросов, повторно используйте соединения, чтобы снизить накладные расходы.
  2. Реализуйте правильную обработку ошибок: Всегда учитывайте проблемы с сетью, ошибки API и неожиданные ответы.
  3. Уважайте лимиты скорости: Используйте семафоры или другие механизмы контроля конкьюрентности, чтобы не перегружать сервер API.
  4. Мониторьте и логгируйте: Реализуйте комплексное логирование, чтобы отслеживать производительность и выявлять проблемы.
  5. Используйте потоковую передачу для длинного контента: Это улучшает пользовательский опыт и позволяет обрабатывать частичные результаты раньше.

Я провел последние пять лет, погружаясь в увлекательный мир Machine Learning и Deep Learning. Моя страсть и экспертиза привели меня к участию в более чем 50 различных проектах по разработке программного обеспечения, с особым акцентом на AI/ML. Мое непрекращающееся любопытство также привело меня к Natural Language Processing, области, которую я с нетерпением жду возможности изучить более подробно.