Artificiell intelligens
Asynkrona LLM API-anrop i Python: A Comprehensive Guide

Som utvecklare och DTA-forskare finner vi ofta att vi behöver interagera med dessa kraftfulla modeller genom API:er. Men nÀr vÄra applikationer vÀxer i komplexitet och skala, blir behovet av effektiva och presterande API-interaktioner avgörande. Det Àr hÀr asynkron programmering lyser, vilket gör att vi kan maximera genomströmningen och minimera latens nÀr vi arbetar med LLM API:er.
I den hÀr omfattande guiden kommer vi att utforska vÀrlden av asynkrona LLM API-anrop i Python. Vi kommer att tÀcka allt frÄn grunderna i asynkron programmering till avancerade tekniker för att hantera komplexa arbetsflöden. I slutet av den hÀr artikeln har du en gedigen förstÄelse för hur du kan utnyttja asynkron programmering för att överladda dina LLM-drivna applikationer.
Innan vi dyker in i detaljerna för asynkrona LLM API-anrop, lÄt oss etablera en solid grund i asynkrona programmeringskoncept.
Asynkron programmering tillÄter flera operationer att utföras samtidigt utan att blockera huvudtrÄden för exekvering. I Python uppnÄs detta frÀmst genom asyncio modul, som tillhandahÄller ett ramverk för att skriva samtidig kod med hjÀlp av koroutiner, hÀndelseloopar och terminer.
Nyckelbegrepp:
- Coroutiner: Funktioner definierade med asynkron def som kan pausas och Äterupptas.
- EventLoop: Den centrala exekveringsmekanismen som hanterar och kör asynkrona uppgifter.
- VÀntar: Objekt som kan anvÀndas med nyckelordet await (coroutines, tasks, futures).
HÀr Àr ett enkelt exempel för att illustrera dessa begrepp:
import asyncio async def greet(name): await asyncio.sleep(1) # Simulate an I/O operation print(f"Hello, {name}!") async def main(): await asyncio.gather( greet("Alice"), greet("Bob"), greet("Charlie") ) asyncio.run(main())
I det hÀr exemplet definierar vi en asynkron funktion greet
som simulerar en I/O-operation med asyncio.sleep()
. De main
funktion anvÀnder asyncio.gather()
för att köra flera hÀlsningar samtidigt. Trots sömnfördröjningen kommer alla tre hÀlsningarna att skrivas ut efter cirka 1 sekund, vilket visar kraften i asynkron exekvering.
Behovet av asynkronisering i LLM API-anrop
NÀr vi arbetar med LLM API:er stöter vi ofta pÄ scenarier dÀr vi behöver göra flera API-anrop, antingen i sekvens eller parallellt. Traditionell synkron kod kan leda till betydande prestandaflaskhalsar, sÀrskilt nÀr man hanterar operationer med hög latens som nÀtverksbegÀranden till LLM-tjÀnster.
TÀnk pÄ ett scenario dÀr vi behöver generera sammanfattningar för 100 olika artiklar med hjÀlp av ett LLM API. Med ett synkront tillvÀgagÄngssÀtt skulle varje API-anrop blockeras tills det fÄr ett svar, vilket kan ta flera minuter att slutföra alla förfrÄgningar. Ett asynkront tillvÀgagÄngssÀtt, Ä andra sidan, tillÄter oss att initiera flera API-anrop samtidigt, vilket dramatiskt minskar den totala exekveringstiden.
StÀlla in din miljö
För att komma igÄng med asynkrona LLM API-anrop mÄste du konfigurera din Python-miljö med de nödvÀndiga biblioteken. HÀr Àr vad du behöver:
- python 3.7 eller högre (för inbyggt asyncio-stöd)
- aiohttp: Ett asynkront HTTP-klientbibliotek
- openai: Den officiella OpenAI Python-klient (om du anvÀnder OpenAI:s GPT-modeller)
- lÄngkedja: Ett ramverk för att bygga applikationer med LLM (valfritt, men rekommenderas för komplexa arbetsflöden)
Du kan installera dessa beroenden med hjÀlp av pip:
pip install aiohttp openai langchain <div class="relative flex flex-col rounded-lg">
Basic Async LLM API-anrop med asyncio och aiohttp
LÄt oss börja med att göra ett enkelt asynkront anrop till ett LLM API med aiohttp. Vi kommer att anvÀnda OpenAI:s GPT-3.5 API som ett exempel, men koncepten gÀller Àven för andra LLM API:er.
import asyncio import aiohttp from openai import AsyncOpenAI async def generate_text(prompt, client): response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content async def main(): prompts = [ "Explain quantum computing in simple terms.", "Write a haiku about artificial intelligence.", "Describe the process of photosynthesis." ] async with AsyncOpenAI() as client: tasks = [generate_text(prompt, client) for prompt in prompts] results = await asyncio.gather(*tasks) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
I det hÀr exemplet definierar vi en asynkron funktion generate_text
som gör ett anrop till OpenAI API med AsyncOpenAI-klienten. De main
funktionen skapar flera uppgifter för olika uppmaningar och anvÀndningsomrÄden asyncio.gather()
att köra dem samtidigt.
Detta tillvÀgagÄngssÀtt tillÄter oss att skicka flera förfrÄgningar till LLM API samtidigt, vilket avsevÀrt minskar den totala tiden som krÀvs för att behandla alla meddelanden.
Avancerade tekniker: Batchning och samtidighetskontroll
Ăven om det föregĂ„ende exemplet visar grunderna för asynkrona LLM API-anrop, krĂ€ver verkliga applikationer ofta mer sofistikerade tillvĂ€gagĂ„ngssĂ€tt. LĂ„t oss utforska tvĂ„ viktiga tekniker: batchförfrĂ„gningar och kontroll av samtidighet.
BatchförfrÄgningar: NÀr man hanterar ett stort antal uppmaningar Àr det ofta mer effektivt att gruppera dem i grupper snarare Àn att skicka individuella förfrÄgningar för varje prompt. Detta minskar omkostnaderna för flera API-anrop och kan leda till bÀttre prestanda.
import asyncio from openai import AsyncOpenAI async def process_batch(batch, client): responses = await asyncio.gather(*[ client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) for prompt in batch ]) return [response.choices[0].message.content for response in responses] async def main(): prompts = [f"Tell me a fact about number {i}" for i in range(100)] batch_size = 10 async with AsyncOpenAI() as client: results = [] for i in range(0, len(prompts), batch_size): batch = prompts[i:i+batch_size] batch_results = await process_batch(batch, client) results.extend(batch_results) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
Samtidighetskontroll: Ăven om asynkron programmering tillĂ„ter samtidig exekvering, Ă€r det viktigt att kontrollera samtidighetsnivĂ„n för att undvika att API-servern överbelastas eller hastighetsgrĂ€nser överskrids. Vi kan anvĂ€nda asyncio.Semaphore för detta Ă€ndamĂ„l.
import asyncio from openai import AsyncOpenAI async def generate_text(prompt, client, semaphore): async with semaphore: response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content async def main(): prompts = [f"Tell me a fact about number {i}" for i in range(100)] max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as client: tasks = [generate_text(prompt, client, semaphore) for prompt in prompts] results = await asyncio.gather(*tasks) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
I det hÀr exemplet anvÀnder vi en semafor för att begrÀnsa antalet samtidiga förfrÄgningar till 5, för att sÀkerstÀlla att vi inte övervÀldiga API-servern.
Felhantering och Äterförsök i Async LLM-samtal
NÀr du arbetar med externa API:er Àr det avgörande att implementera robusta felhanterings- och försöksmekanismer. LÄt oss förbÀttra vÄr kod för att hantera vanliga fel och implementera exponentiell backoff för Äterförsök.
import asyncio import random from openai import AsyncOpenAI from tenacity import retry, stop_after_attempt, wait_exponential class APIError(Exception): pass @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def generate_text_with_retry(prompt, client): try: response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content except Exception as e: print(f"Error occurred: {e}") raise APIError("Failed to generate text") async def process_prompt(prompt, client, semaphore): async with semaphore: try: result = await generate_text_with_retry(prompt, client) return prompt, result except APIError: return prompt, "Failed to generate response after multiple attempts." async def main(): prompts = [f"Tell me a fact about number {i}" for i in range(20)] max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as client: tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts] results = await asyncio.gather(*tasks) for prompt, result in results: print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
Denna förbÀttrade version innehÄller:
- En anpassad
APIError
undantag för API-relaterade fel. - A
generate_text_with_retry
funktion dekorerad med@retry
frÄn tenacity-biblioteket och implementerar exponentiell backoff. - Felhantering i
process_prompt
funktion för att fÄnga upp och rapportera fel.
Optimera prestanda: Strömmande svar
För generering av innehÄll i lÄnga former kan strömmande svar avsevÀrt förbÀttra den upplevda prestandan för din applikation. IstÀllet för att vÀnta pÄ hela svaret kan du bearbeta och visa textbitar nÀr de blir tillgÀngliga.
import asyncio from openai import AsyncOpenAI async def stream_text(prompt, client): stream = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], stream=True ) full_response = "" async for chunk in stream: if chunk.choices[0].delta.content is not None: content = chunk.choices[0].delta.content full_response += content print(content, end='', flush=True) print("\n") return full_response async def main(): prompt = "Write a short story about a time-traveling scientist." async with AsyncOpenAI() as client: result = await stream_text(prompt, client) print(f"Full response:\n{result}") asyncio.run(main())
Det hÀr exemplet visar hur man strömmar svaret frÄn API:t och skriver ut varje del nÀr den kommer. Detta tillvÀgagÄngssÀtt Àr sÀrskilt anvÀndbart för chattapplikationer eller andra scenarier dÀr du vill ge feedback i realtid till anvÀndaren.
Bygga asynkrona arbetsflöden med LangChain
För mer komplexa LLM-drivna applikationer LangChain ramverk ger en abstraktion pÄ hög nivÄ som förenklar processen med att koppla ihop flera LLM-samtal och integrera andra verktyg. LÄt oss titta pÄ ett exempel pÄ hur man anvÀnder LangChain med asynkfunktioner:
Det hÀr exemplet visar hur LangChain kan anvÀndas för att skapa mer komplexa arbetsflöden med streaming och asynkron exekvering. De AsyncCallbackManager
och StreamingStdOutCallbackHandler
möjliggör realtidsströmning av det genererade innehÄllet.
import asyncio from langchain.llms import OpenAI from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.callbacks.manager import AsyncCallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler async def generate_story(topic): llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()])) prompt = PromptTemplate( input_variables=["topic"], template="Write a short story about {topic}." ) chain = LLMChain(llm=llm, prompt=prompt) return await chain.arun(topic=topic) async def main(): topics = ["a magical forest", "a futuristic city", "an underwater civilization"] tasks = [generate_story(topic) for topic in topics] stories = await asyncio.gather(*tasks) for topic, story in zip(topics, stories): print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n") asyncio.run(main())
BetjÀnar Async LLM-applikationer med FastAPI
För att göra din async LLM-applikation tillgÀnglig som en webbtjÀnst Àr FastAPI ett utmÀrkt val pÄ grund av dess inbyggda stöd för asynkrona operationer. HÀr Àr ett exempel pÄ hur man skapar en enkel API-slutpunkt för textgenerering:
from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel from openai import AsyncOpenAI app = FastAPI() client = AsyncOpenAI() class GenerationRequest(BaseModel): prompt: str class GenerationResponse(BaseModel): generated_text: str @app.post("/generate", response_model=GenerationResponse) async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks): response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": request.prompt}] ) generated_text = response.choices[0].message.content # Simulate some post-processing in the background background_tasks.add_task(log_generation, request.prompt, generated_text) return GenerationResponse(generated_text=generated_text) async def log_generation(prompt: str, generated_text: str): # Simulate logging or additional processing await asyncio.sleep(2) print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
Denna FastAPI-applikation skapar en slutpunkt /generate
som accepterar en uppmaning och returnerar genererad text. Den visar ocksÄ hur man anvÀnder bakgrundsuppgifter för ytterligare bearbetning utan att blockera svaret.
BĂ€sta praxis och vanliga fallgropar
NÀr du arbetar med asynkrona LLM API:er bör du ha dessa bÀsta metoder i Ätanke:
- AnvÀnd anslutningspoolning: NÀr du gör flera förfrÄgningar, ÄteranvÀnd anslutningar för att minska omkostnader.
- Implementera korrekt felhantering: Ta alltid hÀnsyn till nÀtverksproblem, API-fel och ovÀntade svar.
- Respektera hastighetsgrÀnser: AnvÀnd semaforer eller andra samtidighetskontrollmekanismer för att undvika att övervÀldiga API:et.
- Ăvervaka och logga: Implementera omfattande loggning för att spĂ„ra prestanda och identifiera problem.
- AnvÀnd streaming för lÄngformat innehÄll: Det förbÀttrar anvÀndarupplevelsen och möjliggör tidig bearbetning av delresultat.