Intelligence artificielle
Appels d’API LLM asynchrones en Python : Un guide complet

En tant que développeurs et scientifiques de données, nous nous retrouvons souvent dans le besoin d’interagir avec ces modèles puissants via des API. Cependant, à mesure que nos applications grandissent en complexité et en échelle, le besoin d’interactions d’API efficaces et performantes devient crucial. C’est là que la programmation asynchrone brille, nous permettant de maximiser le débit et de minimiser la latence lors du travail avec les API LLM.
Dans ce guide complet, nous allons explorer le monde des appels d’API LLM asynchrones en Python. Nous allons couvrir tout, des bases de la programmation asynchrone aux techniques avancées pour gérer des flux de travail complexes. À la fin de cet article, vous aurez une solide compréhension de la façon de tirer parti de la programmation asynchrone pour booster vos applications alimentées par LLM.
Avant de plonger dans les spécificités des appels d’API LLM asynchrones, établissons une solide fondation dans les concepts de programmation asynchrone.
La programmation asynchrone permet l’exécution multiple d’opérations sans bloquer le fil d’exécution principal. En Python, ceci est principalement réalisé grâce au module asyncio, qui fournit un framework pour écrire du code concurrent en utilisant des coroutines, des boucles d’événements et des futures.
Concepts clés :
- Coroutines : Fonctions définies avec async def qui peuvent être suspendues et reprises.
- Boucle d’événements : Le mécanisme d’exécution central qui gère et exécute les tâches asynchrones.
- Awaitables : Objets qui peuvent être utilisés avec le mot-clé await (coroutines, tâches, futures).
Voici un exemple simple pour illustrer ces concepts :
import asyncio
async def greet(name):
await asyncio.sleep(1) # Simule une opération d'E/S
print(f"Bonjour, {name} !")
async def main():
await asyncio.gather(
greet("Alice"),
greet("Bob"),
greet("Charlie")
)
asyncio.run(main())
Dans cet exemple, nous définissons une fonction asynchrone greet qui simule une opération d’E/S avec asyncio.sleep(). La fonction main utilise asyncio.gather() pour exécuter plusieurs salutations de manière concurrente. Malgré le délai de sommeil, les trois salutations seront imprimées après environ 1 seconde, démontrant la puissance de l’exécution asynchrone.
Besoin d’asynchrone dans les appels d’API LLM
Lorsque nous travaillons avec les API LLM, nous rencontrons souvent des scénarios où nous devons effectuer plusieurs appels d’API, soit en séquence, soit en parallèle. Le code synchrone traditionnel peut entraîner des goulets d’étranglement de performances importants, en particulier lors de la gestion d’opérations à latence élevée comme les requêtes réseau vers les services LLM.
Considérons un scénario où nous devons générer des résumés pour 100 articles différents à l’aide d’une API LLM. Avec une approche synchrone, chaque appel d’API bloquerait jusqu’à ce qu’il reçoive une réponse, pouvant prendre plusieurs minutes pour compléter toutes les requêtes. Une approche asynchrone, en revanche, nous permet d’initier plusieurs appels d’API de manière concurrente, réduisant ainsi de manière spectaculaire le temps d’exécution total.
Configuration de votre environnement
Pour commencer avec les appels d’API LLM asynchrones, vous devrez configurer votre environnement Python avec les bibliothèques nécessaires. Voici ce dont vous aurez besoin :
- Python 3.7 ou supérieur (pour la prise en charge native d’asyncio)
- aiohttp : Une bibliothèque cliente HTTP asynchrone
- openai : Le client Python officiel OpenAI (si vous utilisez les modèles GPT d’OpenAI)
- langchain : Un framework pour construire des applications avec des LLM (facultatif, mais recommandé pour les flux de travail complexes)
Vous pouvez installer ces dépendances en utilisant pip :
pip install aiohttp openai langchain <div class="relative flex flex-col rounded-lg">
Appels d’API LLM asynchrones de base avec asyncio et aiohttp
Commençons par effectuer un appel asynchrone simple à une API LLM en utilisant aiohttp. Nous allons utiliser l’API GPT-3.5 d’OpenAI comme exemple, mais les concepts s’appliquent à d’autres API LLM.
import asyncio
import aiohttp
from openai import AsyncOpenAI
async def generate_text(prompt, client):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def main():
prompts = [
"Expliquez l'informatique quantique en termes simples.",
"Écrivez un haïku sur l'intelligence artificielle.",
"Décrivez le processus de photosynthèse."
]
async with AsyncOpenAI() as client:
tasks = [generate_text(prompt, client) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, result in zip(prompts, results):
print(f"Prompt : {prompt}\nRéponse : {result}\n")
asyncio.run(main())
Dans cet exemple, nous définissons une fonction asynchrone generate_text qui effectue un appel à l’API d’OpenAI en utilisant le client AsyncOpenAI. La fonction main crée plusieurs tâches pour différents prompts et utilise asyncio.gather() pour les exécuter de manière concurrente.
Cette approche nous permet d’envoyer plusieurs requêtes à l’API LLM simultanément, réduisant ainsi de manière significative le temps total nécessaire pour traiter tous les prompts.
Techniques avancées : regroupement et contrôle de la concurrence
Alors que l’exemple précédent démontre les bases des appels d’API LLM asynchrones, les applications du monde réel nécessitent souvent des approches plus sophistiquées. Explorons deux techniques importantes : le regroupement des requêtes et le contrôle de la concurrence.
Regroupement des requêtes : Lorsque nous traitons un grand nombre de prompts, il est souvent plus efficace de les regrouper en groupes plutôt que d’envoyer des requêtes individuelles pour chaque prompt. Cela réduit la surcharge de plusieurs appels d’API et peut conduire à de meilleures performances.
import asyncio
from openai import AsyncOpenAI
async def process_batch(batch, client):
responses = await asyncio.gather(*[
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
) for prompt in batch
])
return [response.choices[0].message.content for response in responses]
async def main():
prompts = [f"Dites-moi un fait sur le nombre {i}" for i in range(100)]
batch_size = 10
async with AsyncOpenAI() as client:
results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i+batch_size]
batch_results = await process_batch(batch, client)
results.extend(batch_results)
for prompt, result in zip(prompts, results):
print(f"Prompt : {prompt}\nRéponse : {result}\n")
asyncio.run(main())
Contrôle de la concurrence : Même si la programmation asynchrone permet l’exécution concurrente, il est important de contrôler le niveau de concurrence pour éviter de submerger le serveur d’API ou de dépasser les limites de taux. Nous pouvons utiliser asyncio.Semaphore pour cela.
import asyncio
from openai import AsyncOpenAI
async def generate_text(prompt, client, semaphore):
async with semaphore:
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def main():
prompts = [f"Dites-moi un fait sur le nombre {i}" for i in range(100)]
max_concurrent_requests = 5
semaphore = asyncio.Semaphore(max_concurrent_requests)
async with AsyncOpenAI() as client:
tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, result in zip(prompts, results):
print(f"Prompt : {prompt}\nRéponse : {result}\n")
asyncio.run(main())
Dans cet exemple, nous utilisons un sémaphore pour limiter le nombre de requêtes concurrentes à 5, nous assurant ainsi de ne pas submerger le serveur d’API.
Gestion des erreurs et des réessais dans les appels LLM asynchrones
Lorsque nous travaillons avec des API externes, il est crucial de mettre en œuvre une gestion d’erreurs robuste et des mécanismes de réessai. Améliorons notre code pour gérer les erreurs courantes et mettre en œuvre un réessai exponentiel.
import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class APIError(Exception):
pass
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
try:
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
print(f"Erreur survenue : {e}")
raise APIError("Échec de la génération de texte")
async def process_prompt(prompt, client, semaphore):
async with semaphore:
try:
result = await generate_text_with_retry(prompt, client)
return prompt, result
except APIError:
return prompt, "Échec de la génération de réponse après plusieurs tentatives."
async def main():
prompts = [f"Dites-moi un fait sur le nombre {i}" for i in range(20)]
max_concurrent_requests = 5
semaphore = asyncio.Semaphore(max_concurrent_requests)
async with AsyncOpenAI() as client:
tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, result in results:
print(f"Prompt : {prompt}\nRéponse : {result}\n")
asyncio.run(main())
Cette version améliorée inclut :
- Une exception personnalisée
APIErrorpour les erreurs liées à l’API. - Une fonction
generate_text_with_retrydécorée avec@retryde la bibliothèque tenacity, mettant en œuvre un réessai exponentiel. - Une gestion d’erreurs dans la fonction
process_promptpour intercepter et signaler les échecs.
Optimisation des performances : diffusion de réponses
Pour la génération de contenu de longue forme, la diffusion de réponses peut améliorer de manière significative les performances perçues de votre application. Au lieu d’attendre la réponse complète, vous pouvez traiter et afficher des morceaux de texte à mesure qu’ils deviennent disponibles.
import asyncio
from openai import AsyncOpenAI
async def stream_text(prompt, client):
stream = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_response += content
print(content, end='', flush=True)
print("\n")
return full_response
async def main():
prompt = "Écrivez une histoire courte sur un scientifique voyageant dans le temps."
async with AsyncOpenAI() as client:
result = await stream_text(prompt, client)
print(f"Réponse complète :\n{result}")
asyncio.run(main())
Cet exemple démontre comment diffuser la réponse de l’API, imprimant chaque morceau à mesure qu’il arrive. Cette approche est particulièrement utile pour les applications de chat ou tout scénario où vous souhaitez fournir une rétroaction en temps réel à l’utilisateur.
Construire des flux de travail asynchrones avec LangChain
Pour des applications plus complexes alimentées par LLM, le framework LangChain fournit une abstraction de niveau supérieur qui simplifie le processus de chaînage de plusieurs appels LLM et d’intégration d’autres outils. Voyons un exemple d’utilisation de LangChain avec des capacités asynchrones :
Cet exemple montre comment LangChain peut être utilisé pour créer des flux de travail plus complexes avec diffusion et exécution asynchrone. Le AsyncCallbackManager et le StreamingStdOutCallbackHandler permettent la diffusion en temps réel du contenu généré.
import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
async def generate_story(topic):
llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
prompt = PromptTemplate(
input_variables=["topic"],
template="Écrivez une histoire courte sur {topic}."
)
chain = LLMChain(llm=llm, prompt=prompt)
return await chain.arun(topic=topic)
async def main():
topics = ["une forêt magique", "une ville futuriste", "une civilisation sous-marine"]
tasks = [generate_story(topic) for topic in topics]
stories = await asyncio.gather(*tasks)
for topic, story in zip(topics, stories):
print(f"\nTopic : {topic}\nHistoire : {story}\n{'='*50}\n")
asyncio.run(main())
Servir des applications LLM asynchrones avec FastAPI
Pour rendre votre application LLM asynchrone disponible en tant que service Web, FastAPI est un excellent choix en raison de son support natif pour les opérations asynchrones. Voici un exemple de création d’un point de terminaison d’API simple pour la génération de texte :
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
class GenerationRequest(BaseModel):
prompt: str
class GenerationResponse(BaseModel):
generated_text: str
@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": request.prompt}]
)
generated_text = response.choices[0].message.content
# Simulez un traitement post-génération en arrière-plan
background_tasks.add_task(log_generation, request.prompt, generated_text)
return GenerationResponse(generated_text=generated_text)
async def log_generation(prompt: str, generated_text: str):
# Simulez un enregistrement ou un traitement supplémentaire
await asyncio.sleep(2)
print(f"Enregistré : Prompt '{prompt}' a généré un texte de longueur {len(generated_text)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Cette application FastAPI crée un point de terminaison /generate qui accepte un prompt et retourne le texte généré. Elle démontre également l’utilisation de tâches en arrière-plan pour un traitement supplémentaire sans bloquer la réponse.
Meilleures pratiques et pièges courants
Lorsque vous travaillez avec les appels d’API LLM asynchrones, gardez à l’esprit ces meilleures pratiques :
- Utilisez le regroupement de connexions : Lorsque vous effectuez plusieurs requêtes, réutilisez les connexions pour réduire la surcharge.
- Mettez en œuvre une gestion d’erreurs appropriée : Comptez toujours les problèmes de réseau, les erreurs d’API et les réponses inattendues.
- Respectez les limites de taux : Utilisez des sémaphores ou d’autres mécanismes de contrôle de la concurrence pour éviter de submerger le serveur d’API.
- Surveillez et enregistrez : Mettez en œuvre une journalisation complète pour suivre les performances et identifier les problèmes.
- Utilisez la diffusion pour le contenu de longue forme : Cela améliore l’expérience utilisateur et permet un traitement précoce des résultats partiels.












