Intelligence Artificielle
Appels d'API LLM asynchrones en Python : un guide complet

En tant que développeurs et scientifiques de données, nous avons souvent besoin d'interagir avec ces puissants modèles via des API. Cependant, à mesure que nos applications deviennent plus complexes et plus évolutives, le besoin d'interactions API efficaces et performantes devient crucial. C'est là que la programmation asynchrone prend tout son sens, nous permettant de maximiser le débit et de minimiser la latence lorsque nous travaillons avec des API LLM.
Dans ce guide complet, nous explorerons l'univers des appels d'API LLM asynchrones en Python. Nous aborderons tous les aspects, des bases de la programmation asynchrone aux techniques avancées de gestion de workflows complexes. À la fin de cet article, vous maîtriserez parfaitement l'utilisation de la programmation asynchrone pour optimiser vos applications LLM.
Avant de plonger dans les spécificités des appels API LLM asynchrones, établissons une base solide dans les concepts de programmation asynchrone.
La programmation asynchrone permet d'exécuter simultanément plusieurs opérations sans bloquer le thread principal d'exécution. En Python, cela se fait principalement grâce à asynchrone module qui fournit un cadre pour l'écriture de code simultané à l'aide de coroutines, de boucles d'événements et de futures.
Concepts clés:
- Coroutines: Fonctions définies avec définition asynchrone qui peut être mis en pause et repris.
- Boucle d'événement:Le mécanisme d'exécution central qui gère et exécute les tâches asynchrones.
- Attentifs: Objets pouvant être utilisés avec le mot-clé wait (coroutines, tâches, futures).
Voici un exemple simple pour illustrer ces concepts :
import asyncio
async def greet(name):
await asyncio.sleep(1) # Simulate an I/O operation
print(f"Hello, {name}!")
async def main():
await asyncio.gather(
greet("Alice"),
greet("Bob"),
greet("Charlie")
)
asyncio.run(main())
Dans cet exemple, nous définissons une fonction asynchrone greet qui simule une opération d'E/S avec asyncio.sleep()L’ main la fonction utilise asyncio.gather() pour exécuter plusieurs messages d'accueil simultanément. Malgré le délai de mise en veille, les trois messages d'accueil seront imprimés après environ 1 seconde, démontrant ainsi la puissance de l'exécution asynchrone.
La nécessité de l'asynchrone dans les appels d'API LLM
Lorsque nous travaillons avec des API LLM, nous rencontrons souvent des scénarios dans lesquels nous devons effectuer plusieurs appels d'API, soit en séquence, soit en parallèle. Le code synchrone traditionnel peut entraîner des goulots d'étranglement importants en termes de performances, en particulier lorsqu'il s'agit d'opérations à latence élevée comme les requêtes réseau aux services LLM.
Imaginez un scénario dans lequel nous devons générer des résumés pour 100 articles différents à l'aide d'une API LLM. Avec une approche synchrone, chaque appel d'API se bloquerait jusqu'à ce qu'il reçoive une réponse, ce qui prendrait potentiellement plusieurs minutes pour traiter toutes les demandes. Une approche asynchrone, en revanche, nous permet de lancer plusieurs appels d'API simultanément, réduisant ainsi considérablement le temps d'exécution global.
Configuration de votre environnement
Pour démarrer avec les appels d'API LLM asynchrones, vous devez configurer votre environnement Python avec les bibliothèques nécessaires. Voici ce dont vous aurez besoin :
- Python 3.7 ou supérieur (pour la prise en charge asyncio native)
- aihttp:Une bibliothèque client HTTP asynchrone
- ouvert: L'officiel Client Python OpenAI (si vous utilisez les modèles GPT d'OpenAI)
- chaîne de langue:Un cadre pour créer des applications avec des LLM (facultatif, mais recommandé pour les flux de travail complexes)
Vous pouvez installer ces dépendances en utilisant pip :
pip install aiohttp openai langchain <div class="relative flex flex-col rounded-lg">
Appels d'API LLM asynchrones de base avec asyncio et aiohttp
Commençons par effectuer un appel asynchrone simple à une API LLM via aiohttp. Nous utiliserons l'API GPT-3.5 d'OpenAI comme exemple, mais les concepts s'appliquent également à d'autres API LLM.
import asyncio
import aiohttp
from openai import AsyncOpenAI
async def generate_text(prompt, client):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def main():
prompts = [
"Explain quantum computing in simple terms.",
"Write a haiku about artificial intelligence.",
"Describe the process of photosynthesis."
]
async with AsyncOpenAI() as client:
tasks = [generate_text(prompt, client) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())
Dans cet exemple, nous définissons une fonction asynchrone generate_text qui effectue un appel à l'API OpenAI à l'aide du client AsyncOpenAI. main la fonction crée plusieurs tâches pour différentes invites et utilisations asyncio.gather() pour les exécuter simultanément.
Cette approche nous permet d’envoyer plusieurs requêtes à l’API LLM simultanément, réduisant ainsi considérablement le temps total requis pour traiter toutes les invites.
Techniques avancées : traitement par lots et contrôle de la concurrence
Bien que l'exemple précédent illustre les bases des appels d'API LLM asynchrones, les applications concrètes nécessitent souvent des approches plus sophistiquées. Explorons deux techniques importantes : le traitement par lots des requêtes et le contrôle de la concurrence.
Requêtes groupées : lorsqu'un grand nombre d'invites sont traitées, il est souvent plus efficace de les regrouper plutôt que d'envoyer des requêtes individuelles pour chaque invite. Cela réduit la charge liée aux multiples appels d'API et peut améliorer les performances.
import asyncio
from openai import AsyncOpenAI
async def process_batch(batch, client):
responses = await asyncio.gather(*[
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
) for prompt in batch
])
return [response.choices[0].message.content for response in responses]
async def main():
prompts = [f"Tell me a fact about number {i}" for i in range(100)]
batch_size = 10
async with AsyncOpenAI() as client:
results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i+batch_size]
batch_results = await process_batch(batch, client)
results.extend(batch_results)
for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())
Contrôle de la concurrence : Bien que la programmation asynchrone permette l'exécution simultanée, il est important de contrôler le niveau de concurrence pour éviter de surcharger le serveur d'API ou de dépasser les limites de débit. Nous pouvons utiliser asyncio.Semaphore à cette fin.
import asyncio
from openai import AsyncOpenAI
async def generate_text(prompt, client, semaphore):
async with semaphore:
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def main():
prompts = [f"Tell me a fact about number {i}" for i in range(100)]
max_concurrent_requests = 5
semaphore = asyncio.Semaphore(max_concurrent_requests)
async with AsyncOpenAI() as client:
tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, result in zip(prompts, results):
print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())
Dans cet exemple, nous utilisons un sémaphore pour limiter le nombre de requêtes simultanées à 5, garantissant ainsi de ne pas surcharger le serveur API.
Gestion des erreurs et nouvelles tentatives dans les appels LLM asynchrones
Lorsque vous travaillez avec des API externes, il est essentiel de mettre en œuvre des mécanismes robustes de gestion des erreurs et de relance. Améliorons notre code pour gérer les erreurs courantes et implémenter un délai de réponse exponentiel pour les relances.
import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class APIError(Exception):
pass
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
try:
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
print(f"Error occurred: {e}")
raise APIError("Failed to generate text")
async def process_prompt(prompt, client, semaphore):
async with semaphore:
try:
result = await generate_text_with_retry(prompt, client)
return prompt, result
except APIError:
return prompt, "Failed to generate response after multiple attempts."
async def main():
prompts = [f"Tell me a fact about number {i}" for i in range(20)]
max_concurrent_requests = 5
semaphore = asyncio.Semaphore(max_concurrent_requests)
async with AsyncOpenAI() as client:
tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, result in results:
print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())
Cette version améliorée comprend :
- Une coutume
APIErrorexception pour les erreurs liées à l'API. - A
generate_text_with_retryfonction décorée avec@retryde la bibliothèque Tenacity, implémentant un recul exponentiel. - Gestion des erreurs dans le
process_promptfonction permettant de détecter et de signaler les échecs.
Optimisation des performances : réponses en streaming
Pour la génération de contenu long, les réponses en streaming peuvent améliorer considérablement les performances perçues de votre application. Au lieu d'attendre la réponse dans son intégralité, vous pouvez traiter et afficher des fragments de texte au fur et à mesure qu'ils deviennent disponibles.
import asyncio
from openai import AsyncOpenAI
async def stream_text(prompt, client):
stream = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_response += content
print(content, end='', flush=True)
print("\n")
return full_response
async def main():
prompt = "Write a short story about a time-traveling scientist."
async with AsyncOpenAI() as client:
result = await stream_text(prompt, client)
print(f"Full response:\n{result}")
asyncio.run(main())
Cet exemple montre comment diffuser la réponse de l'API, en imprimant chaque fragment au fur et à mesure de son arrivée. Cette approche est particulièrement utile pour les applications de chat ou tout scénario dans lequel vous souhaitez fournir un retour d'information en temps réel à l'utilisateur.
Créer des workflows asynchrones avec LangChain
Pour les applications LLM plus complexes, le Cadre LangChain Fournit une abstraction de haut niveau qui simplifie le processus d'enchaînement de plusieurs appels LLM et l'intégration d'autres outils. Prenons un exemple d'utilisation de LangChain avec des fonctionnalités asynchrones :
Cet exemple montre comment LangChain peut être utilisé pour créer des flux de travail plus complexes avec une exécution en streaming et asynchrone. AsyncCallbackManager que le béton ey StreamingStdOutCallbackHandler permettre la diffusion en temps réel du contenu généré.
import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
async def generate_story(topic):
llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
prompt = PromptTemplate(
input_variables=["topic"],
template="Write a short story about {topic}."
)
chain = LLMChain(llm=llm, prompt=prompt)
return await chain.arun(topic=topic)
async def main():
topics = ["a magical forest", "a futuristic city", "an underwater civilization"]
tasks = [generate_story(topic) for topic in topics]
stories = await asyncio.gather(*tasks)
for topic, story in zip(topics, stories):
print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n")
asyncio.run(main())
Servir des applications LLM asynchrones avec FastAPI
Pour rendre votre application LLM asynchrone disponible en tant que service web, FastAPI est un excellent choix grâce à sa prise en charge native des opérations asynchrones. Voici un exemple de création d'un point de terminaison d'API simple pour la génération de texte :
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
class GenerationRequest(BaseModel):
prompt: str
class GenerationResponse(BaseModel):
generated_text: str
@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": request.prompt}]
)
generated_text = response.choices[0].message.content
# Simulate some post-processing in the background
background_tasks.add_task(log_generation, request.prompt, generated_text)
return GenerationResponse(generated_text=generated_text)
async def log_generation(prompt: str, generated_text: str):
# Simulate logging or additional processing
await asyncio.sleep(2)
print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Cette application FastAPI crée un point de terminaison /generate qui accepte une invite et renvoie le texte généré. Il montre également comment utiliser les tâches en arrière-plan pour un traitement supplémentaire sans bloquer la réponse.
Meilleures pratiques et pièges courants
Lorsque vous travaillez avec des API LLM asynchrones, gardez ces bonnes pratiques à l’esprit :
- Utiliser le pool de connexions:Lorsque vous effectuez plusieurs demandes, réutilisez les connexions pour réduire la surcharge.
- Mettre en œuvre une gestion appropriée des erreurs:Tenez toujours compte des problèmes de réseau, des erreurs d’API et des réponses inattendues.
- Respecter les limites de débit:Utilisez des sémaphores ou d’autres mécanismes de contrôle de concurrence pour éviter de surcharger l’API.
- Surveiller et enregistrer:Implémentez une journalisation complète pour suivre les performances et identifier les problèmes.
- Utilisez le streaming pour les contenus longs:Il améliore l'expérience utilisateur et permet un traitement précoce des résultats partiels.












