Свяжитесь с нами:

Асинхронные вызовы LLM API в Python: подробное руководство

Искусственный интеллект

Асинхронные вызовы LLM API в Python: подробное руководство

mm
Асинхронные вызовы LLM API в Python: подробное руководство

Как разработчики и ученые dta, мы часто сталкиваемся с необходимостью взаимодействия с этими мощными моделями через API. Однако по мере того, как наши приложения становятся сложнее и масштабнее, потребность в эффективных и производительных взаимодействиях API становится решающей. Именно здесь асинхронное программирование блистает, позволяя нам максимизировать пропускную способность и минимизировать задержку при работе с LLM API.

В этом подробном руководстве мы рассмотрим мир асинхронных вызовов LLM API в Python. Мы рассмотрим всё: от основ асинхронного программирования до продвинутых методов управления сложными рабочими процессами. К концу статьи вы получите чёткое представление о том, как использовать асинхронное программирование для повышения эффективности ваших приложений на базе LLM.

Прежде чем углубляться в специфику асинхронных вызовов LLM API, давайте заложим прочную основу в концепциях асинхронного программирования.

Асинхронное программирование позволяет выполнять несколько операций одновременно, не блокируя основной поток выполнения. В Python это достигается в первую очередь за счет асинцио модуль, который предоставляет основу для написания параллельного кода с использованием сопрограмм, циклов событий и фьючерсов.

Ключевые идеи:

  • Сопрограммы: Функции, определенные с помощью асинхронное определение который можно приостановить и возобновить.
  • Цикл событий: Центральный механизм исполнения, который управляет и запускает асинхронные задачи.
  • Ожидаемые: Объекты, которые можно использовать с ключевым словом await (сопрограммы, задачи, фьючерсы).

Вот простой пример, иллюстрирующий эти концепции:

import asyncio

async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O operation
    print(f"Hello, {name}!")

async def main():
    await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )

asyncio.run(main())

В этом примере мы определяем асинхронную функцию greet который имитирует операцию ввода-вывода с asyncio.sleep(), main функция использует asyncio.gather() для одновременного запуска нескольких приветствий. Несмотря на задержку сна, все три приветствия будут напечатаны примерно через 1 секунду, демонстрируя мощь асинхронного выполнения.

Необходимость асинхронности в вызовах API LLM

При работе с LLM API мы часто сталкиваемся со сценариями, в которых нам нужно выполнить несколько вызовов API, последовательно или параллельно. Традиционный синхронный код может привести к значительным узким местам производительности, особенно при работе с операциями с высокой задержкой, такими как сетевые запросы к службам LLM.

Рассмотрим сценарий, в котором нам нужно сгенерировать сводки для 100 различных статей с использованием LLM API. При синхронном подходе каждый вызов API будет блокироваться до тех пор, пока не получит ответ, что может занять несколько минут для выполнения всех запросов. Асинхронный подход, с другой стороны, позволяет нам инициировать несколько вызовов API одновременно, что значительно сокращает общее время выполнения.

Настройка вашей среды

Чтобы начать работу с асинхронными вызовами LLM API, вам потребуется настроить среду Python с необходимыми библиотеками. Вот что вам понадобится:

  • Python 3.7 или выше (для собственной поддержки asyncio)
  • айоhttp: Асинхронная клиентская HTTP-библиотека
  • openai: Официальный OpenAI Python-клиент (если вы используете модели GPT OpenAI)
  • длинная цепь: Фреймворк для создания приложений с LLM (необязательно, но рекомендуется для сложных рабочих процессов)

Вы можете установить эти зависимости с помощью pip:

pip install aiohttp openai langchain
<div class="relative flex flex-col rounded-lg">

Базовые асинхронные вызовы LLM API с помощью asyncio и aiohttp

Начнём с простого асинхронного вызова LLM API с помощью aiohttp. В качестве примера мы будем использовать API OpenAI GPT-3.5, но эти концепции применимы и к другим LLM API.

import asyncio
import aiohttp
from openai import AsyncOpenAI

async def generate_text(prompt, client):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def main():
    prompts = [
        "Explain quantum computing in simple terms.",
        "Write a haiku about artificial intelligence.",
        "Describe the process of photosynthesis."
    ]
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

В этом примере мы определяем асинхронную функцию generate_text который делает вызов API OpenAI с помощью клиента AsyncOpenAI. main функция создает несколько задач для разных запросов и применений asyncio.gather() для их одновременного запуска.

Такой подход позволяет нам отправлять несколько запросов к API LLM одновременно, что значительно сокращает общее время, необходимое для обработки всех запросов.

Расширенные методы: пакетная обработка и управление параллелизмом

Хотя предыдущий пример демонстрирует основы асинхронных вызовов LLM API, реальные приложения часто требуют более сложных подходов. Давайте рассмотрим два важных метода: пакетирование запросов и управление параллельными процессами.

Пакетирование запросов: при работе с большим количеством запросов часто эффективнее объединять их в группы, чем отправлять отдельные запросы для каждого запроса. Это снижает накладные расходы на множественные вызовы API и может повысить производительность.

import asyncio
from openai import AsyncOpenAI

async def process_batch(batch, client):
    responses = await asyncio.gather(*[
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        ) for prompt in batch
    ])
    return [response.choices[0].message.content for response in responses]

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    batch_size = 10
    
    async with AsyncOpenAI() as client:
        results = []
        for i in range(0, len(prompts), batch_size):
            batch = prompts[i:i+batch_size]
            batch_results = await process_batch(batch, client)
            results.extend(batch_results)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Управление параллелизмом: хотя асинхронное программирование допускает параллельное выполнение, важно контролировать уровень параллелизма, чтобы избежать перегрузки API-сервера или превышения ограничений по скорости. Для этого можно использовать asyncio.Semaphore.

import asyncio
from openai import AsyncOpenAI

async def generate_text(prompt, client, semaphore):
    async with semaphore:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

В этом примере мы используем семафор, чтобы ограничить количество одновременных запросов до 5, гарантируя, что мы не перегрузим сервер API.

Обработка ошибок и повторные попытки в асинхронных вызовах LLM

При работе с внешними API крайне важно реализовать надежную обработку ошибок и механизмы повторных попыток. Давайте улучшим наш код для обработки распространённых ошибок и реализуем экспоненциальную задержку для повторных попыток.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential

class APIError(Exception):
    pass

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
    try:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    except Exception as e:
        print(f"Error occurred: {e}")
        raise APIError("Failed to generate text")

async def process_prompt(prompt, client, semaphore):
    async with semaphore:
        try:
            result = await generate_text_with_retry(prompt, client)
            return prompt, result
        except APIError:
            return prompt, "Failed to generate response after multiple attempts."

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(20)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in results:
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Эта улучшенная версия включает в себя:

  • Обычай APIError исключение для ошибок, связанных с API.
  • A generate_text_with_retry функция украшена @retry из библиотеки Tenacity, реализующей экспоненциальную задержку.
  • Обработка ошибок в process_prompt функция обнаружения и сообщения о сбоях.

Оптимизация производительности: потоковые ответы

Для генерации длинного контента потоковые ответы могут значительно улучшить воспринимаемую производительность вашего приложения. Вместо того, чтобы ждать весь ответ, вы можете обрабатывать и отображать фрагменты текста по мере их поступления.

import asyncio
from openai import AsyncOpenAI

async def stream_text(prompt, client):
    stream = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            full_response += content
            print(content, end='', flush=True)
    
    print("\n")
    return full_response

async def main():
    prompt = "Write a short story about a time-traveling scientist."
    
    async with AsyncOpenAI() as client:
        result = await stream_text(prompt, client)
    
    print(f"Full response:\n{result}")

asyncio.run(main())

Этот пример демонстрирует, как передавать поток ответа от API, печатая каждый фрагмент по мере его поступления. Этот подход особенно полезен для приложений чата или любого сценария, где вы хотите предоставить пользователю обратную связь в реальном времени.

Создание асинхронных рабочих процессов с помощью LangChain

Для более сложных приложений на базе LLM Фреймворк Лангчейн Предоставляет высокоуровневую абстракцию, которая упрощает процесс объединения нескольких вызовов LLM и интеграции других инструментов. Рассмотрим пример использования LangChain с асинхронными возможностями:

В этом примере показано, как LangChain можно использовать для создания более сложных рабочих процессов с потоковой передачей и асинхронным выполнением. AsyncCallbackManager и StreamingStdOutCallbackHandler обеспечить потоковую передачу сгенерированного контента в режиме реального времени.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

async def generate_story(topic):
    llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
    prompt = PromptTemplate(
        input_variables=["topic"],
        template="Write a short story about {topic}."
    )
    chain = LLMChain(llm=llm, prompt=prompt)
    return await chain.arun(topic=topic)

async def main():
    topics = ["a magical forest", "a futuristic city", "an underwater civilization"]
    tasks = [generate_story(topic) for topic in topics]
    stories = await asyncio.gather(*tasks)
    
    for topic, story in zip(topics, stories):
        print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n")

asyncio.run(main())

Обслуживание асинхронных приложений LLM с помощью FastAPI

Чтобы сделать ваше асинхронное LLM-приложение доступным как веб-сервис, FastAPI — отличный выбор благодаря встроенной поддержке асинхронных операций. Вот пример создания простой конечной точки API для генерации текста:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

class GenerationRequest(BaseModel):
    prompt: str

class GenerationResponse(BaseModel):
    generated_text: str

@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": request.prompt}]
    )
    generated_text = response.choices[0].message.content
    
    # Simulate some post-processing in the background
    background_tasks.add_task(log_generation, request.prompt, generated_text)
    
    return GenerationResponse(generated_text=generated_text)

async def log_generation(prompt: str, generated_text: str):
    # Simulate logging or additional processing
    await asyncio.sleep(2)
    print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Это приложение FastAPI создает конечную точку /generate который принимает приглашение и возвращает сгенерированный текст. Он также демонстрирует, как использовать фоновые задачи для дополнительной обработки, не блокируя ответ.

Лучшие практики и распространенные ошибки

При работе с асинхронными API LLM помните о следующих рекомендациях:

  1. Использовать пул соединений: При выполнении нескольких запросов повторно используйте соединения, чтобы сократить накладные расходы.
  2. Внедрить правильную обработку ошибок: Всегда учитывайте проблемы с сетью, ошибки API и неожиданные ответы.
  3. Соблюдайте ограничения по скорости: Используйте семафоры или другие механизмы управления параллелизмом, чтобы избежать перегрузки API.
  4. Мониторинг и регистрация: Внедрите комплексное ведение журнала для отслеживания производительности и выявления проблем.
  5. Используйте потоковую передачу для длинного контента: Это улучшает пользовательский опыт и позволяет выполнять раннюю обработку частичных результатов.

Последние пять лет я погружался в увлекательный мир машинного обучения и глубокого обучения. Моя страсть и опыт позволили мне принять участие в более чем 50 различных проектах по разработке программного обеспечения, уделяя особое внимание AI/ML. Мое постоянное любопытство также привлекло меня к обработке естественного языка, области, которую я очень хочу исследовать дальше.