Seguici sui social

Intelligenza Artificiale

Chiamate API LLM asincrone in Python: una guida completa

mm
Chiamate API LLM asincrone in Python: una guida completa

Come sviluppatori e scienziati dta, spesso ci troviamo a dover interagire con questi potenti modelli tramite API. Tuttavia, man mano che le nostre applicazioni crescono in complessità e scala, la necessità di interazioni API efficienti e performanti diventa cruciale. È qui che la programmazione asincrona risplende, consentendoci di massimizzare la produttività e ridurre al minimo la latenza quando lavoriamo con API LLM.

In questa guida completa esploreremo il mondo delle chiamate API LLM asincrone in Python. Tratteremo tutto, dalle basi della programmazione asincrona alle tecniche avanzate per la gestione di flussi di lavoro complessi. Alla fine di questo articolo, avrai una solida comprensione di come sfruttare la programmazione asincrona per potenziare le tue applicazioni basate su LLM.

Prima di addentrarci nei dettagli delle chiamate API LLM asincrone, gettiamo solide basi sui concetti di programmazione asincrona.

La programmazione asincrona consente l'esecuzione simultanea di più operazioni senza bloccare il thread principale di esecuzione. In Python, questo è ottenuto principalmente tramite asincio modulo, che fornisce un framework per la scrittura di codice concorrente utilizzando coroutine, cicli di eventi e future.

Concetti chiave:

  • Coroutine: Funzioni definite con definizione asincrona che può essere messo in pausa e ripreso.
  • Ciclo di eventi: Il meccanismo di esecuzione centrale che gestisce ed esegue attività asincrone.
  • Attesa: Oggetti che possono essere utilizzati con la parola chiave await (coroutine, task, futures).

Ecco un semplice esempio per illustrare questi concetti:

import asyncio

async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O operation
    print(f"Hello, {name}!")

async def main():
    await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )

asyncio.run(main())

In questo esempio definiamo una funzione asincrona greet che simula un'operazione di I/O con asyncio.sleep(). main la funzione usa asyncio.gather() per eseguire più saluti contemporaneamente. Nonostante il ritardo di sospensione, tutti e tre i saluti verranno stampati dopo circa 1 secondo, dimostrando la potenza dell'esecuzione asincrona.

La necessità di Async nelle chiamate API LLM

Quando lavoriamo con le API LLM, spesso ci imbattiamo in scenari in cui dobbiamo effettuare più chiamate API, in sequenza o in parallelo. Il codice sincrono tradizionale può causare significativi colli di bottiglia nelle prestazioni, specialmente quando si hanno a che fare con operazioni ad alta latenza come le richieste di rete ai servizi LLM.

Consideriamo uno scenario in cui dobbiamo generare riepiloghi per 100 articoli diversi utilizzando un'API LLM. Con un approccio sincrono, ogni chiamata API si bloccherebbe finché non riceve una risposta, impiegando potenzialmente diversi minuti per completare tutte le richieste. Un approccio asincrono, d'altro canto, ci consente di avviare più chiamate API contemporaneamente, riducendo drasticamente il tempo di esecuzione complessivo.

Configurare il tuo ambiente

Per iniziare a utilizzare le chiamate API LLM asincrone, è necessario configurare l'ambiente Python con le librerie necessarie. Ecco cosa ti servirà:

  • Python 3.7 o superiore (per il supporto asyncio nativo)
  • aio http: Una libreria client HTTP asincrona
  • openai: L'ufficiale Client Python di OpenAI (se si utilizzano i modelli GPT di OpenAI)
  • langchain: Un framework per la creazione di applicazioni con LLM (facoltativo, ma consigliato per flussi di lavoro complessi)

È possibile installare queste dipendenze utilizzando pip:

pip install aiohttp openai langchain
<div class="relative flex flex-col rounded-lg">

Chiamate API LLM asincrone di base con asyncio e aiohttp

Iniziamo effettuando una semplice chiamata asincrona a un'API LLM utilizzando aiohttp. Useremo l'API GPT-3.5 di OpenAI come esempio, ma i concetti si applicano anche ad altre API LLM.

import asyncio
import aiohttp
from openai import AsyncOpenAI

async def generate_text(prompt, client):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def main():
    prompts = [
        "Explain quantum computing in simple terms.",
        "Write a haiku about artificial intelligence.",
        "Describe the process of photosynthesis."
    ]
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

In questo esempio definiamo una funzione asincrona generate_text che effettua una chiamata all'API OpenAI utilizzando il client AsyncOpenAI. main la funzione crea più attività per diversi prompt e usi asyncio.gather() per eseguirli contemporaneamente.

Questo approccio ci consente di inviare più richieste contemporaneamente all'API LLM, riducendo significativamente il tempo totale necessario per elaborare tutti i prompt.

Tecniche avanzate: Batching e controllo della concorrenza

Mentre l'esempio precedente illustra le basi delle chiamate API LLM asincrone, le applicazioni reali richiedono spesso approcci più sofisticati. Esploriamo due tecniche importanti: l'elaborazione in batch delle richieste e il controllo della concorrenza.

Richieste in batch: quando si gestisce un gran numero di prompt, spesso è più efficiente raggrupparli anziché inviare richieste individuali per ciascun prompt. Questo riduce il sovraccarico di più chiamate API e può portare a prestazioni migliori.

import asyncio
from openai import AsyncOpenAI

async def process_batch(batch, client):
    responses = await asyncio.gather(*[
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        ) for prompt in batch
    ])
    return [response.choices[0].message.content for response in responses]

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    batch_size = 10
    
    async with AsyncOpenAI() as client:
        results = []
        for i in range(0, len(prompts), batch_size):
            batch = prompts[i:i+batch_size]
            batch_results = await process_batch(batch, client)
            results.extend(batch_results)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Controllo della concorrenza: sebbene la programmazione asincrona consenta l'esecuzione simultanea, è importante controllare il livello di concorrenza per evitare di sovraccaricare il server API o superare i limiti di velocità. A questo scopo, possiamo utilizzare asyncio.Semaphore.

import asyncio
from openai import AsyncOpenAI

async def generate_text(prompt, client, semaphore):
    async with semaphore:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

In questo esempio utilizziamo un semaforo per limitare il numero di richieste simultanee a 5, assicurandoci di non sovraccaricare il server API.

Gestione degli errori e nuovi tentativi nelle chiamate LLM asincrone

Quando si lavora con API esterne, è fondamentale implementare solidi meccanismi di gestione degli errori e di ripetizione dei tentativi. Miglioriamo il nostro codice per gestire gli errori più comuni e implementare il backoff esponenziale per i tentativi.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential

class APIError(Exception):
    pass

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
    try:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    except Exception as e:
        print(f"Error occurred: {e}")
        raise APIError("Failed to generate text")

async def process_prompt(prompt, client, semaphore):
    async with semaphore:
        try:
            result = await generate_text_with_retry(prompt, client)
            return prompt, result
        except APIError:
            return prompt, "Failed to generate response after multiple attempts."

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(20)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in results:
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Questa versione migliorata include:

  • Un'usanza APIError eccezione per errori relativi all'API.
  • A generate_text_with_retry funzione decorata con @retry dalla libreria tenacity, implementando il backoff esponenziale.
  • Gestione degli errori in process_prompt funzione per rilevare e segnalare guasti.

Ottimizzazione delle prestazioni: streaming delle risposte

Per la generazione di contenuti di lunga durata, le risposte in streaming possono migliorare significativamente le prestazioni percepite della tua applicazione. Invece di attendere l'intera risposta, puoi elaborare e visualizzare blocchi di testo non appena diventano disponibili.

import asyncio
from openai import AsyncOpenAI

async def stream_text(prompt, client):
    stream = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            full_response += content
            print(content, end='', flush=True)
    
    print("\n")
    return full_response

async def main():
    prompt = "Write a short story about a time-traveling scientist."
    
    async with AsyncOpenAI() as client:
        result = await stream_text(prompt, client)
    
    print(f"Full response:\n{result}")

asyncio.run(main())

Questo esempio dimostra come trasmettere in streaming la risposta dall'API, stampando ogni blocco man mano che arriva. Questo approccio è particolarmente utile per le applicazioni di chat o qualsiasi scenario in cui si desidera fornire feedback in tempo reale all'utente.

Creazione di flussi di lavoro asincroni con LangChain

Per applicazioni più complesse basate su LLM, Quadro LangChain Fornisce un'astrazione di alto livello che semplifica il processo di concatenamento di più chiamate LLM e di integrazione di altri strumenti. Diamo un'occhiata a un esempio di utilizzo di LangChain con funzionalità asincrone:

Questo esempio mostra come LangChain può essere utilizzato per creare flussi di lavoro più complessi con streaming ed esecuzione asincrona. AsyncCallbackManager e StreamingStdOutCallbackHandler abilitare lo streaming in tempo reale dei contenuti generati.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

async def generate_story(topic):
    llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
    prompt = PromptTemplate(
        input_variables=["topic"],
        template="Write a short story about {topic}."
    )
    chain = LLMChain(llm=llm, prompt=prompt)
    return await chain.arun(topic=topic)

async def main():
    topics = ["a magical forest", "a futuristic city", "an underwater civilization"]
    tasks = [generate_story(topic) for topic in topics]
    stories = await asyncio.gather(*tasks)
    
    for topic, story in zip(topics, stories):
        print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n")

asyncio.run(main())

Fornitura di applicazioni LLM asincrone con FastAPI

Per rendere la tua applicazione LLM asincrona disponibile come servizio web, FastAPI è un'ottima scelta grazie al suo supporto nativo per le operazioni asincrone. Ecco un esempio di come creare un semplice endpoint API per la generazione di testo:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

class GenerationRequest(BaseModel):
    prompt: str

class GenerationResponse(BaseModel):
    generated_text: str

@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": request.prompt}]
    )
    generated_text = response.choices[0].message.content
    
    # Simulate some post-processing in the background
    background_tasks.add_task(log_generation, request.prompt, generated_text)
    
    return GenerationResponse(generated_text=generated_text)

async def log_generation(prompt: str, generated_text: str):
    # Simulate logging or additional processing
    await asyncio.sleep(2)
    print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Questa applicazione FastAPI crea un endpoint /generate che accetta un prompt e restituisce il testo generato. Dimostra inoltre come usare le attività in background per un'elaborazione aggiuntiva senza bloccare la risposta.

Best practice e insidie ​​comuni

Quando lavori con le API LLM asincrone, tieni a mente queste buone pratiche:

  1. Utilizzare il pool di connessioni: Quando si effettuano più richieste, riutilizzare le connessioni per ridurre il sovraccarico.
  2. Implementare una corretta gestione degli errori: Tenere sempre in considerazione i problemi di rete, gli errori API e le risposte inaspettate.
  3. Rispettare i limiti di velocità: Utilizzare semafori o altri meccanismi di controllo della concorrenza per evitare di sovraccaricare l'API.
  4. Monitorare e registrare: Implementare una registrazione completa per monitorare le prestazioni e identificare i problemi.
  5. Utilizzare lo streaming per contenuti di lunga durata: Migliora l'esperienza dell'utente e consente l'elaborazione anticipata dei risultati parziali.

Ho trascorso gli ultimi cinque anni immergendomi nell'affascinante mondo del Machine Learning e del Deep Learning. La mia passione e competenza mi hanno portato a contribuire a oltre 50 diversi progetti di ingegneria del software, con un focus particolare su AI/ML. La mia continua curiosità mi ha anche attirato verso l'elaborazione del linguaggio naturale, un campo che non vedo l'ora di esplorare ulteriormente.