Kontakt z nami

Artificial Intelligence

Asynchroniczne wywołania API LLM w Pythonie: kompleksowy przewodnik

mm
Asynchroniczne wywołania API LLM w Pythonie: kompleksowy przewodnik

Jako deweloperzy i naukowcy dta często musimy wchodzić w interakcje z tymi potężnymi modelami za pośrednictwem interfejsów API. Jednak w miarę jak nasze aplikacje stają się coraz bardziej złożone i skalowalne, potrzeba wydajnych i wydajnych interakcji z interfejsami API staje się kluczowa. To właśnie tutaj asynchroniczne programowanie błyszczy, pozwalając nam maksymalizować przepustowość i minimalizować opóźnienia podczas pracy z interfejsami API LLM.

W tym kompleksowym przewodniku zgłębimy świat asynchronicznych wywołań API LLM w Pythonie. Omówimy wszystko, od podstaw programowania asynchronicznego po zaawansowane techniki obsługi złożonych przepływów pracy. Po przeczytaniu tego artykułu będziesz mieć solidną wiedzę na temat tego, jak wykorzystać programowanie asynchroniczne do udoskonalenia swoich aplikacji opartych na LLM.

Zanim zagłębimy się w szczegóły asynchronicznych wywołań API LLM, przyjrzyjmy się solidnym podstawom koncepcji programowania asynchronicznego.

Programowanie asynchroniczne pozwala na jednoczesne wykonywanie wielu operacji bez blokowania głównego wątku wykonania. W Pythonie jest to osiągane przede wszystkim poprzez asyncio moduł, który zapewnia strukturę do pisania współbieżnego kodu przy użyciu współprogramów, pętli zdarzeń i kontraktów futures.

Kluczowe idee:

  • Współprogramy:Funkcje zdefiniowane za pomocą def asynchroniczny które można wstrzymywać i wznawiać.
  • Pętla zdarzeń:Centralny mechanizm wykonawczy, który zarządza zadaniami asynchronicznymi i je uruchamia.
  • Oczekiwane: Obiekty, których można używać ze słowem kluczowym await (koroutyny, zadania, futures).

Oto prosty przykład ilustrujący te koncepcje:

import asyncio

async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O operation
    print(f"Hello, {name}!")

async def main():
    await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )

asyncio.run(main())

W tym przykładzie definiujemy funkcję asynchroniczną greet który symuluje operację wejścia/wyjścia z asyncio.sleep(), main używa funkcji asyncio.gather() aby uruchomić wiele powitań jednocześnie. Pomimo opóźnienia uśpienia, wszystkie trzy powitania zostaną wydrukowane po około 1 sekundzie, co pokazuje moc asynchronicznego wykonywania.

Potrzeba asynchroniczności w wywołaniach API LLM

Pracując z interfejsami API LLM, często napotykamy scenariusze, w których musimy wykonać wiele wywołań API, sekwencyjnie lub równolegle. Tradycyjny kod synchroniczny może prowadzić do znacznych wąskich gardeł wydajnościowych, szczególnie w przypadku operacji o dużym opóźnieniu, takich jak żądania sieciowe do usług LLM.

Rozważmy scenariusz, w którym musimy wygenerować podsumowania dla 100 różnych artykułów przy użyciu interfejsu API LLM. Przy podejściu synchronicznym każde wywołanie interfejsu API blokowałoby się do momentu otrzymania odpowiedzi, co potencjalnie zajęłoby kilka minut, aby ukończyć wszystkie żądania. Z drugiej strony podejście asynchroniczne pozwala nam inicjować wiele wywołań interfejsu API jednocześnie, co znacznie skraca całkowity czas wykonania.

Konfigurowanie środowiska

Aby rozpocząć korzystanie z asynchronicznych wywołań API LLM, musisz skonfigurować środowisko Pythona z niezbędnymi bibliotekami. Oto, czego będziesz potrzebować:

  • Python 3.7 lub nowszy (dla natywnego wsparcia asyncio)
  • aiohttp:Asynchroniczna biblioteka klienta HTTP
  • openai: Oficjalny Klient OpenAI Python (jeśli używasz modeli GPT OpenAI)
  • łańcuch językowy:Struktura do tworzenia aplikacji z LLM (opcjonalna, ale zalecana w przypadku złożonych przepływów pracy)

Możesz zainstalować te zależności używając pip:

pip install aiohttp openai langchain
<div class="relative flex flex-col rounded-lg">

Podstawowe asynchroniczne wywołania API LLM z asyncio i aiohttp

Zacznijmy od wykonania prostego, asynchronicznego wywołania API LLM za pomocą aiohttp. Jako przykład wykorzystamy API GPT-3.5 OpenAI, ale przedstawione koncepcje mają zastosowanie również do innych API LLM.

import asyncio
import aiohttp
from openai import AsyncOpenAI

async def generate_text(prompt, client):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def main():
    prompts = [
        "Explain quantum computing in simple terms.",
        "Write a haiku about artificial intelligence.",
        "Describe the process of photosynthesis."
    ]
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

W tym przykładzie definiujemy funkcję asynchroniczną generate_text który wywołuje API OpenAI przy użyciu klienta AsyncOpenAI. main funkcja tworzy wiele zadań dla różnych monitów i zastosowań asyncio.gather() aby uruchomić je jednocześnie.

Takie podejście pozwala nam wysyłać wiele żądań do interfejsu API LLM jednocześnie, co znacznie skraca całkowity czas potrzebny na przetworzenie wszystkich monitów.

Zaawansowane techniki: przetwarzanie wsadowe i kontrola współbieżności

Chociaż poprzedni przykład demonstruje podstawy asynchronicznych wywołań API LLM, rzeczywiste aplikacje często wymagają bardziej zaawansowanych podejść. Przyjrzyjmy się dwóm ważnym technikom: przetwarzaniu wsadowemu żądań i kontrolowaniu współbieżności.

Grupowanie żądań: W przypadku dużej liczby zapytań często bardziej efektywne jest grupowanie ich w grupy niż wysyłanie pojedynczych żądań dla każdego zapytania. Zmniejsza to obciążenie związane z wieloma wywołaniami API i może prowadzić do lepszej wydajności.

import asyncio
from openai import AsyncOpenAI

async def process_batch(batch, client):
    responses = await asyncio.gather(*[
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        ) for prompt in batch
    ])
    return [response.choices[0].message.content for response in responses]

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    batch_size = 10
    
    async with AsyncOpenAI() as client:
        results = []
        for i in range(0, len(prompts), batch_size):
            batch = prompts[i:i+batch_size]
            batch_results = await process_batch(batch, client)
            results.extend(batch_results)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Kontrola współbieżności: Chociaż programowanie asynchroniczne umożliwia współbieżne wykonywanie, ważne jest kontrolowanie poziomu współbieżności, aby uniknąć przeciążenia serwera API lub przekroczenia limitów przepustowości. W tym celu możemy użyć asyncio.Semaphore.

import asyncio
from openai import AsyncOpenAI

async def generate_text(prompt, client, semaphore):
    async with semaphore:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

W tym przykładzie użyliśmy semafora, aby ograniczyć liczbę równoczesnych żądań do 5, dzięki czemu mamy pewność, że nie przeciążymy serwera API.

Obsługa błędów i ponowne próby w asynchronicznych wywołaniach LLM

Podczas pracy z zewnętrznymi interfejsami API kluczowe jest wdrożenie solidnych mechanizmów obsługi błędów i ponawiania prób. Ulepszmy nasz kod, aby obsługiwał typowe błędy i zaimplementował wykładniczy odczekiwanie dla ponownych prób.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential

class APIError(Exception):
    pass

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
    try:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    except Exception as e:
        print(f"Error occurred: {e}")
        raise APIError("Failed to generate text")

async def process_prompt(prompt, client, semaphore):
    async with semaphore:
        try:
            result = await generate_text_with_retry(prompt, client)
            return prompt, result
        except APIError:
            return prompt, "Failed to generate response after multiple attempts."

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(20)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in results:
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Wersja rozszerzona zawiera:

  • Zwyczaj APIError wyjątek dotyczący błędów związanych z API.
  • A generate_text_with_retry funkcja ozdobiona @retry z biblioteki tenacity, implementującej wykładniczy backoff.
  • Obsługa błędów w process_prompt Funkcja wykrywania i raportowania awarii.

Optymalizacja wydajności: odpowiedzi strumieniowe

W przypadku generowania treści w długiej formie, strumieniowe odpowiedzi mogą znacznie poprawić postrzeganą wydajność Twojej aplikacji. Zamiast czekać na całą odpowiedź, możesz przetwarzać i wyświetlać fragmenty tekstu, gdy tylko staną się dostępne.

import asyncio
from openai import AsyncOpenAI

async def stream_text(prompt, client):
    stream = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            full_response += content
            print(content, end='', flush=True)
    
    print("\n")
    return full_response

async def main():
    prompt = "Write a short story about a time-traveling scientist."
    
    async with AsyncOpenAI() as client:
        result = await stream_text(prompt, client)
    
    print(f"Full response:\n{result}")

asyncio.run(main())

Ten przykład pokazuje, jak przesyłać strumieniowo odpowiedź z API, drukując każdy fragment w miarę jego nadejścia. To podejście jest szczególnie przydatne w przypadku aplikacji czatu lub w każdym scenariuszu, w którym chcesz zapewnić użytkownikowi informacje zwrotne w czasie rzeczywistym.

Tworzenie asynchronicznych przepływów pracy z LangChain

W przypadku bardziej złożonych aplikacji opartych na LLM Framework LangChaina Zapewnia abstrakcję wysokiego poziomu, która upraszcza proces łączenia wielu wywołań LLM i integracji innych narzędzi. Przyjrzyjmy się przykładowi użycia LangChain z funkcjami asynchronicznymi:

Ten przykład pokazuje, jak można użyć LangChain do tworzenia bardziej złożonych przepływów pracy ze strumieniowaniem i asynchronicznym wykonywaniem. AsyncCallbackManager oraz StreamingStdOutCallbackHandler włączyć strumieniowanie wygenerowanej treści w czasie rzeczywistym.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

async def generate_story(topic):
    llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
    prompt = PromptTemplate(
        input_variables=["topic"],
        template="Write a short story about {topic}."
    )
    chain = LLMChain(llm=llm, prompt=prompt)
    return await chain.arun(topic=topic)

async def main():
    topics = ["a magical forest", "a futuristic city", "an underwater civilization"]
    tasks = [generate_story(topic) for topic in topics]
    stories = await asyncio.gather(*tasks)
    
    for topic, story in zip(topics, stories):
        print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n")

asyncio.run(main())

Obsługa asynchronicznych aplikacji LLM za pomocą FastAPI

Aby udostępnić asynchroniczną aplikację LLM jako usługę internetową, FastAPI to doskonały wybór ze względu na natywną obsługę operacji asynchronicznych. Oto przykład, jak utworzyć prosty punkt końcowy API do generowania tekstu:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

class GenerationRequest(BaseModel):
    prompt: str

class GenerationResponse(BaseModel):
    generated_text: str

@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": request.prompt}]
    )
    generated_text = response.choices[0].message.content
    
    # Simulate some post-processing in the background
    background_tasks.add_task(log_generation, request.prompt, generated_text)
    
    return GenerationResponse(generated_text=generated_text)

async def log_generation(prompt: str, generated_text: str):
    # Simulate logging or additional processing
    await asyncio.sleep(2)
    print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Ta aplikacja FastAPI tworzy punkt końcowy /generate który akceptuje monit i zwraca wygenerowany tekst. Pokazuje również, jak używać zadań w tle do dodatkowego przetwarzania bez blokowania odpowiedzi.

Najlepsze praktyki i typowe pułapki

Podczas pracy z asynchronicznymi interfejsami API LLM należy pamiętać o następujących najlepszych praktykach:

  1. Użyj puli połączeń:W przypadku wysyłania wielu żądań należy ponownie wykorzystywać połączenia, aby zmniejszyć obciążenie.
  2. Wdrożenie prawidłowej obsługi błędów: Zawsze należy brać pod uwagę problemy sieciowe, błędy API i nieoczekiwane odpowiedzi.
  3. Przestrzegaj limitów stawek:Używaj semaforów i innych mechanizmów kontroli współbieżności, aby uniknąć przeciążenia interfejsu API.
  4. Monitoruj i rejestruj:Wprowadź kompleksowe rejestrowanie w celu śledzenia wydajności i identyfikacji problemów.
  5. Użyj przesyłania strumieniowego w przypadku treści o długiej formie:Poprawia komfort użytkowania i pozwala na wcześniejsze przetwarzanie częściowych wyników.

Ostatnie pięć lat spędziłem zanurzając się w fascynującym świecie uczenia maszynowego i głębokiego uczenia się. Moja pasja i wiedza sprawiły, że uczestniczyłem w ponad 50 różnorodnych projektach z zakresu inżynierii oprogramowania, ze szczególnym uwzględnieniem AI/ML. Moja ciągła ciekawość przyciągnęła mnie również w stronę przetwarzania języka naturalnego – dziedziny, którą chcę dalej zgłębiać.