Συνδεθείτε μαζί μας

Ασύγχρονες κλήσεις API LLM στην Python: Ένας ολοκληρωμένος οδηγός

Τεχνητή νοημοσύνη

Ασύγχρονες κλήσεις API LLM στην Python: Ένας ολοκληρωμένος οδηγός

mm
Ασύγχρονες κλήσεις API LLM στην Python: Ένας ολοκληρωμένος οδηγός

Ως προγραμματιστές και επιστήμονες dta, συχνά βρισκόμαστε στην ανάγκη να αλληλεπιδράσουμε με αυτά τα ισχυρά μοντέλα μέσω API. Ωστόσο, καθώς οι εφαρμογές μας αυξάνονται σε πολυπλοκότητα και κλίμακα, η ανάγκη για αποτελεσματικές και αποδοτικές αλληλεπιδράσεις API καθίσταται κρίσιμη. Εδώ λάμπει ο ασύγχρονος προγραμματισμός, επιτρέποντάς μας να μεγιστοποιήσουμε την απόδοση και να ελαχιστοποιήσουμε την καθυστέρηση όταν εργαζόμαστε με API LLM.

Σε αυτόν τον ολοκληρωμένο οδηγό, θα εξερευνήσουμε τον κόσμο των ασύγχρονων κλήσεων API LLM σε Python. Θα καλύψουμε τα πάντα, από τα βασικά του ασύγχρονου προγραμματισμού έως τις προηγμένες τεχνικές για τον χειρισμό σύνθετων ροών εργασίας. Μέχρι το τέλος αυτού του άρθρου, θα έχετε μια στέρεη κατανόηση του πώς να αξιοποιήσετε τον ασύγχρονο προγραμματισμό για να ενισχύσετε τις εφαρμογές σας που υποστηρίζονται από LLM.

Πριν εμβαθύνουμε στις λεπτομέρειες των κλήσεων API ασύγχρονου LLM, ας θέσουμε μια σταθερή βάση στις έννοιες του ασύγχρονου προγραμματισμού.

Ο ασύγχρονος προγραμματισμός επιτρέπει πολλαπλές λειτουργίες να εκτελούνται ταυτόχρονα χωρίς να μπλοκάρει το κύριο νήμα εκτέλεσης. Στην Python, αυτό επιτυγχάνεται κυρίως μέσω του ασύγχρονο ενότητα, η οποία παρέχει ένα πλαίσιο για τη σύνταξη ταυτόχρονου κώδικα χρησιμοποιώντας κορουτίνες, βρόχους συμβάντων και συμβόλαια μελλοντικής εκπλήρωσης.

Βασικές έννοιες:

  • Κορουτίνες: Λειτουργίες που ορίζονται με ασύγχρονος ορ που μπορεί να τεθεί σε παύση και να συνεχιστεί.
  • Βρόχος συμβάντος: Ο κεντρικός μηχανισμός εκτέλεσης που διαχειρίζεται και εκτελεί ασύγχρονες εργασίες.
  • Αναμενόμενα: Αντικείμενα που μπορούν να χρησιμοποιηθούν με τη λέξη-κλειδί αναμονής (κορουτίνες, εργασίες, μελλοντικά).

Ακολουθεί ένα απλό παράδειγμα για να επεξηγήσουμε αυτές τις έννοιες:

import asyncio

async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O operation
    print(f"Hello, {name}!")

async def main():
    await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )

asyncio.run(main())

Σε αυτό το παράδειγμα, ορίζουμε μια ασύγχρονη συνάρτηση greet που προσομοιώνει μια λειτουργία I/O με asyncio.sleep(). ο main χρησιμοποιεί τη συνάρτηση asyncio.gather() για να εκτελείτε πολλαπλούς χαιρετισμούς ταυτόχρονα. Παρά την καθυστέρηση ύπνου, και οι τρεις χαιρετισμοί θα εκτυπωθούν μετά από περίπου 1 δευτερόλεπτο, αποδεικνύοντας τη δύναμη της ασύγχρονης εκτέλεσης.

Η ανάγκη για Async στις κλήσεις API LLM

Όταν εργαζόμαστε με LLM API, συναντάμε συχνά σενάρια όπου πρέπει να κάνουμε πολλές κλήσεις API, είτε διαδοχικά είτε παράλληλα. Ο παραδοσιακός σύγχρονος κώδικας μπορεί να οδηγήσει σε σημαντικά σημεία συμφόρησης στην απόδοση, ειδικά όταν αντιμετωπίζουμε λειτουργίες υψηλής καθυστέρησης, όπως αιτήματα δικτύου σε υπηρεσίες LLM.

Σκεφτείτε ένα σενάριο όπου πρέπει να δημιουργήσουμε περιλήψεις για 100 διαφορετικά άρθρα χρησιμοποιώντας ένα API LLM. Με μια σύγχρονη προσέγγιση, κάθε κλήση API θα μπλοκάρει έως ότου λάβει μια απόκριση, ενδεχομένως να χρειαστούν αρκετά λεπτά για να ολοκληρωθούν όλα τα αιτήματα. Μια ασύγχρονη προσέγγιση, από την άλλη πλευρά, μας επιτρέπει να εκκινούμε πολλαπλές κλήσεις API ταυτόχρονα, μειώνοντας δραματικά τον συνολικό χρόνο εκτέλεσης.

Ρύθμιση του περιβάλλοντος σας

Για να ξεκινήσετε με τις κλήσεις API ασύγχρονου LLM, θα χρειαστεί να ρυθμίσετε το περιβάλλον Python με τις απαραίτητες βιβλιοθήκες. Δείτε τι θα χρειαστείτε:

  • Python 3.7 ή υψηλότερη (για εγγενή υποστήριξη asyncio)
  • aiohttp: Μια ασύγχρονη βιβλιοθήκη πελάτη HTTP
  • openai: Το επίσημο Πελάτης OpenAI Python (αν χρησιμοποιείτε τα μοντέλα GPT του OpenAI)
  • langchain: Ένα πλαίσιο για τη δημιουργία εφαρμογών με LLM (προαιρετικό, αλλά συνιστάται για πολύπλοκες ροές εργασίας)

Μπορείτε να εγκαταστήσετε αυτές τις εξαρτήσεις χρησιμοποιώντας το pip:

pip install aiohttp openai langchain
<div class="relative flex flex-col rounded-lg">

Κλήσεις Basic Async LLM API με asyncio και aiohttp

Ας ξεκινήσουμε κάνοντας μια απλή ασύγχρονη κλήση σε ένα API LLM χρησιμοποιώντας το aiohttp. Θα χρησιμοποιήσουμε το API GPT-3.5 του OpenAI ως παράδειγμα, αλλά οι έννοιες ισχύουν και για άλλα API LLM.

import asyncio
import aiohttp
from openai import AsyncOpenAI

async def generate_text(prompt, client):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def main():
    prompts = [
        "Explain quantum computing in simple terms.",
        "Write a haiku about artificial intelligence.",
        "Describe the process of photosynthesis."
    ]
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Σε αυτό το παράδειγμα, ορίζουμε μια ασύγχρονη συνάρτηση generate_text που πραγματοποιεί κλήση στο OpenAI API χρησιμοποιώντας το πρόγραμμα-πελάτη AsyncOpenAI. Ο main η λειτουργία δημιουργεί πολλαπλές εργασίες για διαφορετικές προτροπές και χρήσεις asyncio.gather() να τα τρέχουν ταυτόχρονα.

Αυτή η προσέγγιση μας επιτρέπει να στέλνουμε πολλαπλά αιτήματα στο API LLM ταυτόχρονα, μειώνοντας σημαντικά τον συνολικό χρόνο που απαιτείται για την επεξεργασία όλων των προτροπών.

Προηγμένες τεχνικές: Batching και Concurrency Control

Ενώ το προηγούμενο παράδειγμα καταδεικνύει τα βασικά των κλήσεων API ασύγχρονου LLM, οι εφαρμογές του πραγματικού κόσμου συχνά απαιτούν πιο εξελιγμένες προσεγγίσεις. Ας εξερευνήσουμε δύο σημαντικές τεχνικές: την ομαδική επεξεργασία αιτημάτων και τον έλεγχο της ταυτόχρονης λειτουργίας.

Μαζική επεξεργασία αιτημάτων: Όταν πρόκειται για μεγάλο αριθμό μηνυμάτων προτροπής, είναι συχνά πιο αποτελεσματικό να τα ομαδοποιείτε σε ομάδες παρά να στέλνετε μεμονωμένα αιτήματα για κάθε μήνυμα προτροπής. Αυτό μειώνει την επιβάρυνση πολλαπλών κλήσεων API και μπορεί να οδηγήσει σε καλύτερη απόδοση.

import asyncio
from openai import AsyncOpenAI

async def process_batch(batch, client):
    responses = await asyncio.gather(*[
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        ) for prompt in batch
    ])
    return [response.choices[0].message.content for response in responses]

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    batch_size = 10
    
    async with AsyncOpenAI() as client:
        results = []
        for i in range(0, len(prompts), batch_size):
            batch = prompts[i:i+batch_size]
            batch_results = await process_batch(batch, client)
            results.extend(batch_results)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Έλεγχος Ταυτοχρονισμού: Ενώ ο ασύγχρονος προγραμματισμός επιτρέπει την ταυτόχρονη εκτέλεση, είναι σημαντικό να ελέγχεται το επίπεδο ταυτοχρονισμού για να αποφευχθεί η υπερφόρτωση του διακομιστή API ή η υπέρβαση των ορίων ταχύτητας. Μπορούμε να χρησιμοποιήσουμε το asyncio.Semaphore για αυτόν τον σκοπό.

import asyncio
from openai import AsyncOpenAI

async def generate_text(prompt, client, semaphore):
    async with semaphore:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Σε αυτό το παράδειγμα, χρησιμοποιούμε έναν σηματοφορέα για να περιορίσουμε τον αριθμό των ταυτόχρονων αιτημάτων σε 5, διασφαλίζοντας ότι δεν θα υπερφορτώσουμε τον διακομιστή API.

Χειρισμός σφαλμάτων και επανάληψη στις κλήσεις ασυγχρονισμού LLM

Όταν εργάζεστε με εξωτερικά API, είναι σημαντικό να εφαρμόσετε ισχυρούς μηχανισμούς διαχείρισης σφαλμάτων και επανάληψης. Ας βελτιώσουμε τον κώδικά μας ώστε να χειρίζεται συνηθισμένα σφάλματα και να εφαρμόζουμε εκθετική υποχώρηση για επανάληψη.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential

class APIError(Exception):
    pass

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
    try:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    except Exception as e:
        print(f"Error occurred: {e}")
        raise APIError("Failed to generate text")

async def process_prompt(prompt, client, semaphore):
    async with semaphore:
        try:
            result = await generate_text_with_retry(prompt, client)
            return prompt, result
        except APIError:
            return prompt, "Failed to generate response after multiple attempts."

async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(20)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in results:
        print(f"Prompt: {prompt}\nResponse: {result}\n")

asyncio.run(main())

Αυτή η βελτιωμένη έκδοση περιλαμβάνει:

  • Ένα έθιμο APIError εξαίρεση για σφάλματα που σχετίζονται με το API.
  • A generate_text_with_retry λειτουργία διακοσμημένο με @retry από τη βιβλιοθήκη του teacity, υλοποιώντας εκθετική υποχώρηση.
  • Διαχείριση σφαλμάτων στο process_prompt λειτουργία εντοπισμού και αναφοράς αποτυχιών.

Βελτιστοποίηση απόδοσης: Αποκρίσεις ροής

Για τη δημιουργία περιεχομένου μεγάλης μορφής, οι απαντήσεις ροής μπορούν να βελτιώσουν σημαντικά την αντιληπτή απόδοση της εφαρμογής σας. Αντί να περιμένετε για ολόκληρη την απάντηση, μπορείτε να επεξεργαστείτε και να εμφανίσετε κομμάτια κειμένου καθώς γίνονται διαθέσιμα.

import asyncio
from openai import AsyncOpenAI

async def stream_text(prompt, client):
    stream = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            full_response += content
            print(content, end='', flush=True)
    
    print("\n")
    return full_response

async def main():
    prompt = "Write a short story about a time-traveling scientist."
    
    async with AsyncOpenAI() as client:
        result = await stream_text(prompt, client)
    
    print(f"Full response:\n{result}")

asyncio.run(main())

Αυτό το παράδειγμα δείχνει τον τρόπο ροής της απάντησης από το API, εκτυπώνοντας κάθε κομμάτι καθώς φθάνει. Αυτή η προσέγγιση είναι ιδιαίτερα χρήσιμη για εφαρμογές συνομιλίας ή για οποιοδήποτε σενάριο όπου θέλετε να παρέχετε σχόλια σε πραγματικό χρόνο στον χρήστη.

Δημιουργία ασύγχρονων ροών εργασίας με το LangChain

Για πιο σύνθετες εφαρμογές που λειτουργούν με LLM, το Πλαίσιο LangChain παρέχει μια αφαίρεση υψηλού επιπέδου που απλοποιεί τη διαδικασία αλυσιδωτής σύνδεσης πολλαπλών κλήσεων LLM και ενσωμάτωσης άλλων εργαλείων. Ας δούμε ένα παράδειγμα χρήσης του LangChain με δυνατότητες ασύγχρονης μάθησης:

Αυτό το παράδειγμα δείχνει πώς το LangChain μπορεί να χρησιμοποιηθεί για τη δημιουργία πιο περίπλοκων ροών εργασίας με ροή και ασύγχρονη εκτέλεση. Ο AsyncCallbackManager και StreamingStdOutCallbackHandler ενεργοποιήστε τη ροή σε πραγματικό χρόνο του παραγόμενου περιεχομένου.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

async def generate_story(topic):
    llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
    prompt = PromptTemplate(
        input_variables=["topic"],
        template="Write a short story about {topic}."
    )
    chain = LLMChain(llm=llm, prompt=prompt)
    return await chain.arun(topic=topic)

async def main():
    topics = ["a magical forest", "a futuristic city", "an underwater civilization"]
    tasks = [generate_story(topic) for topic in topics]
    stories = await asyncio.gather(*tasks)
    
    for topic, story in zip(topics, stories):
        print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n")

asyncio.run(main())

Εξυπηρέτηση εφαρμογών Async LLM με το FastAPI

Για να διαθέσετε την εφαρμογή async LLM σας ως διαδικτυακή υπηρεσία, το FastAPI είναι μια εξαιρετική επιλογή λόγω της εγγενούς υποστήριξης για ασύγχρονες λειτουργίες. Ακολουθεί ένα παράδειγμα για το πώς να δημιουργήσετε ένα απλό τελικό σημείο API για τη δημιουργία κειμένου:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

class GenerationRequest(BaseModel):
    prompt: str

class GenerationResponse(BaseModel):
    generated_text: str

@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": request.prompt}]
    )
    generated_text = response.choices[0].message.content
    
    # Simulate some post-processing in the background
    background_tasks.add_task(log_generation, request.prompt, generated_text)
    
    return GenerationResponse(generated_text=generated_text)

async def log_generation(prompt: str, generated_text: str):
    # Simulate logging or additional processing
    await asyncio.sleep(2)
    print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Αυτή η εφαρμογή FastAPI δημιουργεί ένα τελικό σημείο /generate που δέχεται μια προτροπή και επιστρέφει κείμενο που δημιουργείται. Δείχνει επίσης πώς να χρησιμοποιείτε εργασίες παρασκηνίου για πρόσθετη επεξεργασία χωρίς να εμποδίζετε την απόκριση.

Βέλτιστες πρακτικές και κοινές παγίδες

Καθώς εργάζεστε με ασύγχρονα API LLM, έχετε υπόψη σας αυτές τις βέλτιστες πρακτικές:

  1. Χρησιμοποιήστε τη συγκέντρωση σύνδεσης: Όταν κάνετε πολλά αιτήματα, χρησιμοποιήστε ξανά τις συνδέσεις για να μειώσετε τα έξοδα.
  2. Εφαρμόστε τον σωστό χειρισμό σφαλμάτων: Να λαμβάνετε πάντα υπόψη ζητήματα δικτύου, σφάλματα API και μη αναμενόμενες αποκρίσεις.
  3. Σεβαστείτε τα όρια ποσοστών: Χρησιμοποιήστε σηματοφόρους ή άλλους μηχανισμούς ελέγχου συγχρονισμού για να αποφύγετε την υπερκάλυψη του API.
  4. Παρακολούθηση και καταγραφή: Εφαρμογή ολοκληρωμένης καταγραφής για την παρακολούθηση της απόδοσης και τον εντοπισμό προβλημάτων.
  5. Χρησιμοποιήστε ροή για περιεχόμενο μεγάλης μορφής: Βελτιώνει την εμπειρία του χρήστη και επιτρέπει την έγκαιρη επεξεργασία των μερικών αποτελεσμάτων.

Έχω περάσει τα τελευταία πέντε χρόνια βυθίζοντας τον εαυτό μου στον συναρπαστικό κόσμο της Μηχανικής Μάθησης και της Βαθιάς Μάθησης. Το πάθος και η εξειδίκευσή μου με οδήγησαν να συνεισφέρω σε περισσότερα από 50 διαφορετικά έργα μηχανικής λογισμικού, με ιδιαίτερη έμφαση στην AI/ML. Η συνεχής περιέργειά μου με έχει τραβήξει επίσης προς την Επεξεργασία Φυσικής Γλώσσας, έναν τομέα που ανυπομονώ να εξερευνήσω περαιτέρω.