Artificial Intelligence
Asynchroniczne wywołania API LLM w Pythonie: kompleksowy przewodnik

Jako deweloperzy i naukowcy dta często musimy wchodzić w interakcje z tymi potężnymi modelami za pośrednictwem interfejsów API. Jednak w miarę jak nasze aplikacje stają się coraz bardziej złożone i skalowalne, potrzeba wydajnych i wydajnych interakcji z interfejsami API staje się kluczowa. To właśnie tutaj asynchroniczne programowanie błyszczy, pozwalając nam maksymalizować przepustowość i minimalizować opóźnienia podczas pracy z interfejsami API LLM.
W tym kompleksowym przewodniku zgłębimy świat asynchronicznych wywołań API LLM w Pythonie. Omówimy wszystko, od podstaw programowania asynchronicznego po zaawansowane techniki obsługi złożonych przepływów pracy. Po przeczytaniu tego artykułu będziesz mieć solidną wiedzę na temat tego, jak wykorzystać programowanie asynchroniczne do udoskonalenia swoich aplikacji opartych na LLM.
Zanim zagłębimy się w szczegóły asynchronicznych wywołań API LLM, przyjrzyjmy się solidnym podstawom koncepcji programowania asynchronicznego.
Programowanie asynchroniczne pozwala na jednoczesne wykonywanie wielu operacji bez blokowania głównego wątku wykonania. W Pythonie jest to osiągane przede wszystkim poprzez asyncio moduł, który zapewnia strukturę do pisania współbieżnego kodu przy użyciu współprogramów, pętli zdarzeń i kontraktów futures.
Kluczowe idee:
- Współprogramy:Funkcje zdefiniowane za pomocą def asynchroniczny które można wstrzymywać i wznawiać.
- Pętla zdarzeń:Centralny mechanizm wykonawczy, który zarządza zadaniami asynchronicznymi i je uruchamia.
- Oczekiwane: Obiekty, których można używać ze słowem kluczowym await (koroutyny, zadania, futures).
Oto prosty przykład ilustrujący te koncepcje:
import asyncio async def greet(name): await asyncio.sleep(1) # Simulate an I/O operation print(f"Hello, {name}!") async def main(): await asyncio.gather( greet("Alice"), greet("Bob"), greet("Charlie") ) asyncio.run(main())
W tym przykładzie definiujemy funkcję asynchroniczną greet
który symuluje operację wejścia/wyjścia z asyncio.sleep()
, main
używa funkcji asyncio.gather()
aby uruchomić wiele powitań jednocześnie. Pomimo opóźnienia uśpienia, wszystkie trzy powitania zostaną wydrukowane po około 1 sekundzie, co pokazuje moc asynchronicznego wykonywania.
Potrzeba asynchroniczności w wywołaniach API LLM
Pracując z interfejsami API LLM, często napotykamy scenariusze, w których musimy wykonać wiele wywołań API, sekwencyjnie lub równolegle. Tradycyjny kod synchroniczny może prowadzić do znacznych wąskich gardeł wydajnościowych, szczególnie w przypadku operacji o dużym opóźnieniu, takich jak żądania sieciowe do usług LLM.
Rozważmy scenariusz, w którym musimy wygenerować podsumowania dla 100 różnych artykułów przy użyciu interfejsu API LLM. Przy podejściu synchronicznym każde wywołanie interfejsu API blokowałoby się do momentu otrzymania odpowiedzi, co potencjalnie zajęłoby kilka minut, aby ukończyć wszystkie żądania. Z drugiej strony podejście asynchroniczne pozwala nam inicjować wiele wywołań interfejsu API jednocześnie, co znacznie skraca całkowity czas wykonania.
Konfigurowanie środowiska
Aby rozpocząć korzystanie z asynchronicznych wywołań API LLM, musisz skonfigurować środowisko Pythona z niezbędnymi bibliotekami. Oto, czego będziesz potrzebować:
- Python 3.7 lub nowszy (dla natywnego wsparcia asyncio)
- aiohttp:Asynchroniczna biblioteka klienta HTTP
- openai: Oficjalny Klient OpenAI Python (jeśli używasz modeli GPT OpenAI)
- łańcuch językowy:Struktura do tworzenia aplikacji z LLM (opcjonalna, ale zalecana w przypadku złożonych przepływów pracy)
Możesz zainstalować te zależności używając pip:
pip install aiohttp openai langchain <div class="relative flex flex-col rounded-lg">
Podstawowe asynchroniczne wywołania API LLM z asyncio i aiohttp
Zacznijmy od wykonania prostego, asynchronicznego wywołania API LLM za pomocą aiohttp. Jako przykład wykorzystamy API GPT-3.5 OpenAI, ale przedstawione koncepcje mają zastosowanie również do innych API LLM.
import asyncio import aiohttp from openai import AsyncOpenAI async def generate_text(prompt, client): response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content async def main(): prompts = [ "Explain quantum computing in simple terms.", "Write a haiku about artificial intelligence.", "Describe the process of photosynthesis." ] async with AsyncOpenAI() as client: tasks = [generate_text(prompt, client) for prompt in prompts] results = await asyncio.gather(*tasks) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
W tym przykładzie definiujemy funkcję asynchroniczną generate_text
który wywołuje API OpenAI przy użyciu klienta AsyncOpenAI. main
funkcja tworzy wiele zadań dla różnych monitów i zastosowań asyncio.gather()
aby uruchomić je jednocześnie.
Takie podejście pozwala nam wysyłać wiele żądań do interfejsu API LLM jednocześnie, co znacznie skraca całkowity czas potrzebny na przetworzenie wszystkich monitów.
Zaawansowane techniki: przetwarzanie wsadowe i kontrola współbieżności
Chociaż poprzedni przykład demonstruje podstawy asynchronicznych wywołań API LLM, rzeczywiste aplikacje często wymagają bardziej zaawansowanych podejść. Przyjrzyjmy się dwóm ważnym technikom: przetwarzaniu wsadowemu żądań i kontrolowaniu współbieżności.
Grupowanie żądań: W przypadku dużej liczby zapytań często bardziej efektywne jest grupowanie ich w grupy niż wysyłanie pojedynczych żądań dla każdego zapytania. Zmniejsza to obciążenie związane z wieloma wywołaniami API i może prowadzić do lepszej wydajności.
import asyncio from openai import AsyncOpenAI async def process_batch(batch, client): responses = await asyncio.gather(*[ client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) for prompt in batch ]) return [response.choices[0].message.content for response in responses] async def main(): prompts = [f"Tell me a fact about number {i}" for i in range(100)] batch_size = 10 async with AsyncOpenAI() as client: results = [] for i in range(0, len(prompts), batch_size): batch = prompts[i:i+batch_size] batch_results = await process_batch(batch, client) results.extend(batch_results) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
Kontrola współbieżności: Chociaż programowanie asynchroniczne umożliwia współbieżne wykonywanie, ważne jest kontrolowanie poziomu współbieżności, aby uniknąć przeciążenia serwera API lub przekroczenia limitów przepustowości. W tym celu możemy użyć asyncio.Semaphore.
import asyncio from openai import AsyncOpenAI async def generate_text(prompt, client, semaphore): async with semaphore: response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content async def main(): prompts = [f"Tell me a fact about number {i}" for i in range(100)] max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as client: tasks = [generate_text(prompt, client, semaphore) for prompt in prompts] results = await asyncio.gather(*tasks) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
W tym przykładzie użyliśmy semafora, aby ograniczyć liczbę równoczesnych żądań do 5, dzięki czemu mamy pewność, że nie przeciążymy serwera API.
Obsługa błędów i ponowne próby w asynchronicznych wywołaniach LLM
Podczas pracy z zewnętrznymi interfejsami API kluczowe jest wdrożenie solidnych mechanizmów obsługi błędów i ponawiania prób. Ulepszmy nasz kod, aby obsługiwał typowe błędy i zaimplementował wykładniczy odczekiwanie dla ponownych prób.
import asyncio import random from openai import AsyncOpenAI from tenacity import retry, stop_after_attempt, wait_exponential class APIError(Exception): pass @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def generate_text_with_retry(prompt, client): try: response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content except Exception as e: print(f"Error occurred: {e}") raise APIError("Failed to generate text") async def process_prompt(prompt, client, semaphore): async with semaphore: try: result = await generate_text_with_retry(prompt, client) return prompt, result except APIError: return prompt, "Failed to generate response after multiple attempts." async def main(): prompts = [f"Tell me a fact about number {i}" for i in range(20)] max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as client: tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts] results = await asyncio.gather(*tasks) for prompt, result in results: print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
Wersja rozszerzona zawiera:
- Zwyczaj
APIError
wyjątek dotyczący błędów związanych z API. - A
generate_text_with_retry
funkcja ozdobiona@retry
z biblioteki tenacity, implementującej wykładniczy backoff. - Obsługa błędów w
process_prompt
Funkcja wykrywania i raportowania awarii.
Optymalizacja wydajności: odpowiedzi strumieniowe
W przypadku generowania treści w długiej formie, strumieniowe odpowiedzi mogą znacznie poprawić postrzeganą wydajność Twojej aplikacji. Zamiast czekać na całą odpowiedź, możesz przetwarzać i wyświetlać fragmenty tekstu, gdy tylko staną się dostępne.
import asyncio from openai import AsyncOpenAI async def stream_text(prompt, client): stream = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], stream=True ) full_response = "" async for chunk in stream: if chunk.choices[0].delta.content is not None: content = chunk.choices[0].delta.content full_response += content print(content, end='', flush=True) print("\n") return full_response async def main(): prompt = "Write a short story about a time-traveling scientist." async with AsyncOpenAI() as client: result = await stream_text(prompt, client) print(f"Full response:\n{result}") asyncio.run(main())
Ten przykład pokazuje, jak przesyłać strumieniowo odpowiedź z API, drukując każdy fragment w miarę jego nadejścia. To podejście jest szczególnie przydatne w przypadku aplikacji czatu lub w każdym scenariuszu, w którym chcesz zapewnić użytkownikowi informacje zwrotne w czasie rzeczywistym.
Tworzenie asynchronicznych przepływów pracy z LangChain
W przypadku bardziej złożonych aplikacji opartych na LLM Framework LangChaina Zapewnia abstrakcję wysokiego poziomu, która upraszcza proces łączenia wielu wywołań LLM i integracji innych narzędzi. Przyjrzyjmy się przykładowi użycia LangChain z funkcjami asynchronicznymi:
Ten przykład pokazuje, jak można użyć LangChain do tworzenia bardziej złożonych przepływów pracy ze strumieniowaniem i asynchronicznym wykonywaniem. AsyncCallbackManager
oraz StreamingStdOutCallbackHandler
włączyć strumieniowanie wygenerowanej treści w czasie rzeczywistym.
import asyncio from langchain.llms import OpenAI from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.callbacks.manager import AsyncCallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler async def generate_story(topic): llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()])) prompt = PromptTemplate( input_variables=["topic"], template="Write a short story about {topic}." ) chain = LLMChain(llm=llm, prompt=prompt) return await chain.arun(topic=topic) async def main(): topics = ["a magical forest", "a futuristic city", "an underwater civilization"] tasks = [generate_story(topic) for topic in topics] stories = await asyncio.gather(*tasks) for topic, story in zip(topics, stories): print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n") asyncio.run(main())
Obsługa asynchronicznych aplikacji LLM za pomocą FastAPI
Aby udostępnić asynchroniczną aplikację LLM jako usługę internetową, FastAPI to doskonały wybór ze względu na natywną obsługę operacji asynchronicznych. Oto przykład, jak utworzyć prosty punkt końcowy API do generowania tekstu:
from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel from openai import AsyncOpenAI app = FastAPI() client = AsyncOpenAI() class GenerationRequest(BaseModel): prompt: str class GenerationResponse(BaseModel): generated_text: str @app.post("/generate", response_model=GenerationResponse) async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks): response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": request.prompt}] ) generated_text = response.choices[0].message.content # Simulate some post-processing in the background background_tasks.add_task(log_generation, request.prompt, generated_text) return GenerationResponse(generated_text=generated_text) async def log_generation(prompt: str, generated_text: str): # Simulate logging or additional processing await asyncio.sleep(2) print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
Ta aplikacja FastAPI tworzy punkt końcowy /generate
który akceptuje monit i zwraca wygenerowany tekst. Pokazuje również, jak używać zadań w tle do dodatkowego przetwarzania bez blokowania odpowiedzi.
Najlepsze praktyki i typowe pułapki
Podczas pracy z asynchronicznymi interfejsami API LLM należy pamiętać o następujących najlepszych praktykach:
- Użyj puli połączeń:W przypadku wysyłania wielu żądań należy ponownie wykorzystywać połączenia, aby zmniejszyć obciążenie.
- Wdrożenie prawidłowej obsługi błędów: Zawsze należy brać pod uwagę problemy sieciowe, błędy API i nieoczekiwane odpowiedzi.
- Przestrzegaj limitów stawek:Używaj semaforów i innych mechanizmów kontroli współbieżności, aby uniknąć przeciążenia interfejsu API.
- Monitoruj i rejestruj:Wprowadź kompleksowe rejestrowanie w celu śledzenia wydajności i identyfikacji problemów.
- Użyj przesyłania strumieniowego w przypadku treści o długiej formie:Poprawia komfort użytkowania i pozwala na wcześniejsze przetwarzanie częściowych wyników.