Anslut dig till vÄrt nÀtverk!

Artificiell intelligens

Asynkrona LLM API-anrop i Python: A Comprehensive Guide

mm
Asynkrona LLM API-anrop i Python: A Comprehensive Guide

Som utvecklare och DTA-forskare finner vi ofta att vi behöver interagera med dessa kraftfulla modeller genom API:er. Men nÀr vÄra applikationer vÀxer i komplexitet och skala, blir behovet av effektiva och presterande API-interaktioner avgörande. Det Àr hÀr asynkron programmering lyser, vilket gör att vi kan maximera genomströmningen och minimera latens nÀr vi arbetar med LLM API:er.

I den hÀr omfattande guiden kommer vi att utforska vÀrlden av asynkrona LLM API-anrop i Python. Vi kommer att tÀcka allt frÄn grunderna i asynkron programmering till avancerade tekniker för att hantera komplexa arbetsflöden. I slutet av den hÀr artikeln har du en gedigen förstÄelse för hur du kan utnyttja asynkron programmering för att överladda dina LLM-drivna applikationer.

Innan vi dyker in i detaljerna för asynkrona LLM API-anrop, lÄt oss etablera en solid grund i asynkrona programmeringskoncept.

Asynkron programmering tillÄter flera operationer att utföras samtidigt utan att blockera huvudtrÄden för exekvering. I Python uppnÄs detta frÀmst genom asyncio modul, som tillhandahÄller ett ramverk för att skriva samtidig kod med hjÀlp av koroutiner, hÀndelseloopar och terminer.

Nyckelbegrepp:

  • Coroutiner: Funktioner definierade med asynkron def som kan pausas och Ă„terupptas.
  • EventLoop: Den centrala exekveringsmekanismen som hanterar och kör asynkrona uppgifter.
  • VĂ€ntar: Objekt som kan anvĂ€ndas med nyckelordet await (coroutines, tasks, futures).

HÀr Àr ett enkelt exempel för att illustrera dessa begrepp:

import asyncio

async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O operation
    print(f"Hello, {name}!")

async def main():
    await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )

asyncio.run(main())

I det hÀr exemplet definierar vi en asynkron funktion greet som simulerar en I/O-operation med asyncio.sleep(). De main funktion anvÀnder asyncio.gather() för att köra flera hÀlsningar samtidigt. Trots sömnfördröjningen kommer alla tre hÀlsningarna att skrivas ut efter cirka 1 sekund, vilket visar kraften i asynkron exekvering.

Behovet av asynkronisering i LLM API-anrop

NÀr vi arbetar med LLM API:er stöter vi ofta pÄ scenarier dÀr vi behöver göra flera API-anrop, antingen i sekvens eller parallellt. Traditionell synkron kod kan leda till betydande prestandaflaskhalsar, sÀrskilt nÀr man hanterar operationer med hög latens som nÀtverksbegÀranden till LLM-tjÀnster.

TÀnk pÄ ett scenario dÀr vi behöver generera sammanfattningar för 100 olika artiklar med hjÀlp av ett LLM API. Med ett synkront tillvÀgagÄngssÀtt skulle varje API-anrop blockeras tills det fÄr ett svar, vilket kan ta flera minuter att slutföra alla förfrÄgningar. Ett asynkront tillvÀgagÄngssÀtt, Ä andra sidan, tillÄter oss att initiera flera API-anrop samtidigt, vilket dramatiskt minskar den totala exekveringstiden.

StÀlla in din miljö

För att komma igÄng med asynkrona LLM API-anrop mÄste du konfigurera din Python-miljö med de nödvÀndiga biblioteken. HÀr Àr vad du behöver:

  • python 3.7 eller högre (för inbyggt asyncio-stöd)
  • aiohttp: Ett asynkront HTTP-klientbibliotek
  • openai: Den officiella OpenAI Python-klient (om du anvĂ€nder OpenAI:s GPT-modeller)
  • lĂ„ngkedja: Ett ramverk för att bygga applikationer med LLM (valfritt, men rekommenderas för komplexa arbetsflöden)

Du kan installera dessa beroenden med hjÀlp av pip:

pip install aiohttp openai langchain
<div class="relative flex flex-col rounded-lg">

Basic Async LLM API-anrop med asyncio och aiohttp

LÄt oss börja med att göra ett enkelt asynkront anrop till ett LLM API med aiohttp. Vi kommer att anvÀnda OpenAI:s GPT-3.5 API som ett exempel, men koncepten gÀller Àven för andra LLM API:er.

import asyncio
import aiohttp
from openai import AsyncOpenAI

async def generate_text(prompt, client):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def main():
    prompts = [
        "Explain quantum computing in simple terms.",
        "Write a haiku about artificial intelligence.",
        "Describe the process of photosynthesis."
    ]
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

I det hÀr exemplet definierar vi en asynkron funktion generate_text som gör ett anrop till OpenAI API med AsyncOpenAI-klienten. De main funktionen skapar flera uppgifter för olika uppmaningar och anvÀndningsomrÄden asyncio.gather() att köra dem samtidigt.

Detta tillvÀgagÄngssÀtt tillÄter oss att skicka flera förfrÄgningar till LLM API samtidigt, vilket avsevÀrt minskar den totala tiden som krÀvs för att behandla alla meddelanden.

Avancerade tekniker: Batchning och samtidighetskontroll

Även om det föregĂ„ende exemplet visar grunderna för asynkrona LLM API-anrop, krĂ€ver verkliga applikationer ofta mer sofistikerade tillvĂ€gagĂ„ngssĂ€tt. LĂ„t oss utforska tvĂ„ viktiga tekniker: batchförfrĂ„gningar och kontroll av samtidighet.

BatchförfrÄgningar: NÀr man hanterar ett stort antal uppmaningar Àr det ofta mer effektivt att gruppera dem i grupper snarare Àn att skicka individuella förfrÄgningar för varje prompt. Detta minskar omkostnaderna för flera API-anrop och kan leda till bÀttre prestanda.

import asyncio
from openai import AsyncOpenAI

async def process_batch(batch, client):
    responses = await asyncio.gather(*[
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        ) for prompt in batch
    ])
    return [response.choices[0].message.content for response in responses]

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    batch_size = 10
    
    async with AsyncOpenAI() as client:
        results = []
        for i in range(0, len(prompts), batch_size):
            batch = prompts[i:i+batch_size]
            batch_results = await process_batch(batch, client)
            results.extend(batch_results)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Samtidighetskontroll: Även om asynkron programmering tillĂ„ter samtidig exekvering, Ă€r det viktigt att kontrollera samtidighetsnivĂ„n för att undvika att API-servern överbelastas eller hastighetsgrĂ€nser överskrids. Vi kan anvĂ€nda asyncio.Semaphore för detta Ă€ndamĂ„l.

import asyncio
from openai import AsyncOpenAI

async def generate_text(prompt, client, semaphore):
    async with semaphore:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

I det hÀr exemplet anvÀnder vi en semafor för att begrÀnsa antalet samtidiga förfrÄgningar till 5, för att sÀkerstÀlla att vi inte övervÀldiga API-servern.

Felhantering och Äterförsök i Async LLM-samtal

NÀr du arbetar med externa API:er Àr det avgörande att implementera robusta felhanterings- och försöksmekanismer. LÄt oss förbÀttra vÄr kod för att hantera vanliga fel och implementera exponentiell backoff för Äterförsök.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential

class APIError(Exception):
    pass

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
    try:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    except Exception as e:
        print(f"Error occurred: {e}")
        raise APIError("Failed to generate text")

async def process_prompt(prompt, client, semaphore):
    async with semaphore:
        try:
            result = await generate_text_with_retry(prompt, client)
            return prompt, result
        except APIError:
            return prompt, "Failed to generate response after multiple attempts."

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(20)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in results:
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Denna förbÀttrade version innehÄller:

  • En anpassad APIError undantag för API-relaterade fel.
  • A generate_text_with_retry funktion dekorerad med @retry frĂ„n tenacity-biblioteket och implementerar exponentiell backoff.
  • Felhantering i process_prompt funktion för att fĂ„nga upp och rapportera fel.

Optimera prestanda: Strömmande svar

För generering av innehÄll i lÄnga former kan strömmande svar avsevÀrt förbÀttra den upplevda prestandan för din applikation. IstÀllet för att vÀnta pÄ hela svaret kan du bearbeta och visa textbitar nÀr de blir tillgÀngliga.

import asyncio
from openai import AsyncOpenAI

async def stream_text(prompt, client):
    stream = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            full_response += content
            print(content, end='', flush=True)
    
    print("\n")
    return full_response

async def main():
    prompt = "Write a short story about a time-traveling scientist."
    
    async with AsyncOpenAI() as client:
        result = await stream_text(prompt, client)
    
    print(f"Full response:\n{result}")

asyncio.run(main())

Det hÀr exemplet visar hur man strömmar svaret frÄn API:t och skriver ut varje del nÀr den kommer. Detta tillvÀgagÄngssÀtt Àr sÀrskilt anvÀndbart för chattapplikationer eller andra scenarier dÀr du vill ge feedback i realtid till anvÀndaren.

Bygga asynkrona arbetsflöden med LangChain

För mer komplexa LLM-drivna applikationer LangChain ramverk ger en abstraktion pÄ hög nivÄ som förenklar processen med att koppla ihop flera LLM-samtal och integrera andra verktyg. LÄt oss titta pÄ ett exempel pÄ hur man anvÀnder LangChain med asynkfunktioner:

Det hÀr exemplet visar hur LangChain kan anvÀndas för att skapa mer komplexa arbetsflöden med streaming och asynkron exekvering. De AsyncCallbackManager och StreamingStdOutCallbackHandler möjliggör realtidsströmning av det genererade innehÄllet.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

async def generate_story(topic):
    llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
    prompt = PromptTemplate(
        input_variables=["topic"],
        template="Write a short story about {topic}."
    )
    chain = LLMChain(llm=llm, prompt=prompt)
    return await chain.arun(topic=topic)

async def main():
    topics = ["a magical forest", "a futuristic city", "an underwater civilization"]
    tasks = [generate_story(topic) for topic in topics]
    stories = await asyncio.gather(*tasks)
    
    for topic, story in zip(topics, stories):
        print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n")

asyncio.run(main())

BetjÀnar Async LLM-applikationer med FastAPI

För att göra din async LLM-applikation tillgÀnglig som en webbtjÀnst Àr FastAPI ett utmÀrkt val pÄ grund av dess inbyggda stöd för asynkrona operationer. HÀr Àr ett exempel pÄ hur man skapar en enkel API-slutpunkt för textgenerering:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

class GenerationRequest(BaseModel):
    prompt: str

class GenerationResponse(BaseModel):
    generated_text: str

@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": request.prompt}]
    )
    generated_text = response.choices[0].message.content
    
    # Simulate some post-processing in the background
    background_tasks.add_task(log_generation, request.prompt, generated_text)
    
    return GenerationResponse(generated_text=generated_text)

async def log_generation(prompt: str, generated_text: str):
    # Simulate logging or additional processing
    await asyncio.sleep(2)
    print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Denna FastAPI-applikation skapar en slutpunkt /generate som accepterar en uppmaning och returnerar genererad text. Den visar ocksÄ hur man anvÀnder bakgrundsuppgifter för ytterligare bearbetning utan att blockera svaret.

BĂ€sta praxis och vanliga fallgropar

NÀr du arbetar med asynkrona LLM API:er bör du ha dessa bÀsta metoder i Ätanke:

  1. AnvÀnd anslutningspoolning: NÀr du gör flera förfrÄgningar, ÄteranvÀnd anslutningar för att minska omkostnader.
  2. Implementera korrekt felhantering: Ta alltid hÀnsyn till nÀtverksproblem, API-fel och ovÀntade svar.
  3. Respektera hastighetsgrÀnser: AnvÀnd semaforer eller andra samtidighetskontrollmekanismer för att undvika att övervÀldiga API:et.
  4. Övervaka och logga: Implementera omfattande loggning för att spĂ„ra prestanda och identifiera problem.
  5. AnvÀnd streaming för lÄngformat innehÄll: Det förbÀttrar anvÀndarupplevelsen och möjliggör tidig bearbetning av delresultat.

Jag har Àgnat de senaste fem Ären Ät att fördjupa mig i den fascinerande vÀrlden av Machine Learning och Deep Learning. Min passion och expertis har lett mig till att bidra till över 50 olika programvaruutvecklingsprojekt, med sÀrskilt fokus pÄ AI/ML. Min pÄgÄende nyfikenhet har ocksÄ dragit mig mot Natural Language Processing, ett omrÄde som jag Àr ivrig att utforska vidare.