Tekoäly
Asynkroniset LLM-sovellusliittymäkutsut Pythonissa: kattava opas

Kehittäjinä ja dta-tutkijoina joudumme usein olemaan vuorovaikutuksessa näiden tehokkaiden mallien kanssa API:iden kautta. Kuitenkin, kun sovelluksemme monimutkaistuvat ja laajenevat, tehokkaan ja suorituskykyisen API-vuorovaikutuksen tarve tulee ratkaisevaksi. Tässä kohtaa asynkroninen ohjelmointi loistaa, jolloin voimme maksimoida suorituskyvyn ja minimoida latenssin työskennellessämme LLM-sovellusliittymien kanssa.
Tässä kattavassa oppaassa tutkimme asynkronisten LLM API -kutsujen maailmaa Pythonissa. Käsittelemme kaikkea asynkronisen ohjelmoinnin perusteista monimutkaisten työnkulkujen käsittelyyn tarkoitettuihin edistyneisiin tekniikoihin. Tämän artikkelin loppuun mennessä sinulla on vankka ymmärrys siitä, miten voit hyödyntää asynkronista ohjelmointia LLM-pohjaisten sovellustesi tehostamiseksi.
Ennen kuin syvennymme asynkronisten LLM API -kutsujen yksityiskohtiin, luodaan vankka perusta asynkronisen ohjelmoinnin käsitteille.
Asynkroninen ohjelmointi mahdollistaa useiden toimintojen suorittamisen samanaikaisesti ilman, että suorituksen pääsäikettä estetään. Pythonissa tämä saavutetaan ensisijaisesti käyttämällä asyncio moduuli, joka tarjoaa puitteet samanaikaisen koodin kirjoittamiselle korutiinien, tapahtumasilmukoiden ja futuurien avulla.
Keskeiset käsitteet:
- Korutiinit: Toiminnot, jotka on määritelty async def jonka voi keskeyttää ja jatkaa.
- Tapahtumasilmukka: Keskitetty suoritusmekanismi, joka hallitsee ja suorittaa asynkronisia tehtäviä.
- Odotettuja: Objektit, joita voidaan käyttää await-avainsanalla (korutiinit, tehtävät, futuurit).
Tässä on yksinkertainen esimerkki näiden käsitteiden havainnollistamiseksi:
import asyncio async def greet(name): await asyncio.sleep(1) # Simulate an I/O operation print(f"Hello, {name}!") async def main(): await asyncio.gather( greet("Alice"), greet("Bob"), greet("Charlie") ) asyncio.run(main())
Tässä esimerkissä määrittelemme asynkronisen funktion greet
joka simuloi I/O-toimintoa asyncio.sleep()
. main
toiminto käyttää asyncio.gather()
lähettää useita tervehdyksiä samanaikaisesti. Nukkumisviiveestä huolimatta kaikki kolme tervehdystä tulostetaan noin 1 sekunnin kuluttua, mikä osoittaa asynkronisen suorituskyvyn.
Asyncin tarve LLM-sovellusliittymäkutsuissa
Kun työskentelemme LLM-sovellusliittymien kanssa, kohtaamme usein skenaarioita, joissa meidän on tehtävä useita API-kutsuja joko peräkkäin tai rinnakkain. Perinteinen synkroninen koodi voi johtaa merkittäviin suorituskyvyn pullonkauloihin, etenkin kun käsitellään korkean viiveen toimintoja, kuten verkkopyyntöjä LLM-palveluihin.
Harkitse skenaariota, jossa meidän on luotava yhteenvedot 100 eri artikkelista käyttämällä LLM-sovellusliittymää. Synkronisessa lähestymistavassa jokainen API-kutsu estetään, kunnes se saa vastauksen, mikä saattaa kestää useita minuutteja kaikkien pyyntöjen suorittamiseen. Toisaalta asynkroninen lähestymistapa antaa meille mahdollisuuden käynnistää useita API-kutsuja samanaikaisesti, mikä vähentää dramaattisesti kokonaissuoritusaikaa.
Ympäristösi luominen
Aloittaaksesi asynkronisten LLM API -kutsujen käytön sinun on määritettävä Python-ympäristösi tarvittavilla kirjastoilla. Tarvitset seuraavat:
- Python 3.7 tai korkeampi (natiiviasyncio-tuki)
- aiohttp: Asynkroninen HTTP-asiakaskirjasto
- openai: Virallinen OpenAI Python-asiakas (jos käytät OpenAI:n GPT-malleja)
- langchain: Kehys sovellusten rakentamiseen LLM:illä (valinnainen, mutta suositeltava monimutkaisiin työnkulkuihin)
Voit asentaa nämä riippuvuudet pip:n avulla:
pip install aiohttp openai langchain <div class="relative flex flex-col rounded-lg">
Basic Async LLM API -kutsut asyncion ja aiohttp:n kanssa
Aloitetaan tekemällä yksinkertainen asynkroninen kutsu LLM-rajapintaan aiohttp:n avulla. Käytämme esimerkkinä OpenAI:n GPT-3.5-rajapintaa, mutta samat käsitteet pätevät myös muihin LLM-rajapintoihin.
import asyncio import aiohttp from openai import AsyncOpenAI async def generate_text(prompt, client): response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content async def main(): prompts = [ "Explain quantum computing in simple terms.", "Write a haiku about artificial intelligence.", "Describe the process of photosynthesis." ] async with AsyncOpenAI() as client: tasks = [generate_text(prompt, client) for prompt in prompts] results = await asyncio.gather(*tasks) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
Tässä esimerkissä määrittelemme asynkronisen funktion generate_text
joka soittaa OpenAI API:lle AsyncOpenAI-asiakkaan avulla. The main
toiminto luo useita tehtäviä eri kehotteita ja käyttötarkoituksia varten asyncio.gather()
ajaa niitä samanaikaisesti.
Tämän lähestymistavan avulla voimme lähettää useita pyyntöjä LLM API:lle samanaikaisesti, mikä vähentää merkittävästi kaikkien kehotteiden käsittelyyn tarvittavaa kokonaisaikaa.
Kehittyneet tekniikat: Erä ja samanaikaisuuden valvonta
Vaikka edellinen esimerkki havainnollistaa asynkronisten LLM API -kutsujen perusteita, reaalimaailman sovellukset vaativat usein kehittyneempiä lähestymistapoja. Tarkastellaan kahta tärkeää tekniikkaa: pyyntöjen eräajoittamista ja samanaikaisuuden hallintaa.
Pyyntöjen eräkäsittely: Kun käsitellään suurta määrää kehotteita, on usein tehokkaampaa jakaa ne eriin ryhmiin kuin lähettää yksittäisiä pyyntöjä jokaista kehotetta varten. Tämä vähentää useiden API-kutsujen aiheuttamaa ylimääräistä työtä ja voi johtaa parempaan suorituskykyyn.
import asyncio from openai import AsyncOpenAI async def process_batch(batch, client): responses = await asyncio.gather(*[ client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) for prompt in batch ]) return [response.choices[0].message.content for response in responses] async def main(): prompts = [f"Tell me a fact about number {i}" for i in range(100)] batch_size = 10 async with AsyncOpenAI() as client: results = [] for i in range(0, len(prompts), batch_size): batch = prompts[i:i+batch_size] batch_results = await process_batch(batch, client) results.extend(batch_results) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
Samanaikaisuuden hallinta: Vaikka asynkroninen ohjelmointi sallii samanaikaisen suorituksen, on tärkeää hallita samanaikaisuuden tasoa, jotta vältetään API-palvelimen ylikuormitus tai nopeusrajoitusten ylittyminen. Voimme käyttää tähän tarkoitukseen asyncio.Semaphore-ohjelmaa.
import asyncio from openai import AsyncOpenAI async def generate_text(prompt, client, semaphore): async with semaphore: response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content async def main(): prompts = [f"Tell me a fact about number {i}" for i in range(100)] max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as client: tasks = [generate_text(prompt, client, semaphore) for prompt in prompts] results = await asyncio.gather(*tasks) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
Tässä esimerkissä käytämme semaforia rajoittaaksemme samanaikaisten pyyntöjen määrän viiteen, jotta emme ylikuormita API-palvelinta.
Virheiden käsittely ja uudelleenyritykset Async LLM -puheluissa
Ulkoisten API-rajapintojen kanssa työskenneltäessä on ratkaisevan tärkeää toteuttaa vankat virheenkäsittely- ja uudelleenyritysmekanismit. Parannellaan koodiamme käsittelemään yleisiä virheitä ja toteutetaan eksponentiaalinen peruutus uudelleenyrityksille.
import asyncio import random from openai import AsyncOpenAI from tenacity import retry, stop_after_attempt, wait_exponential class APIError(Exception): pass @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def generate_text_with_retry(prompt, client): try: response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content except Exception as e: print(f"Error occurred: {e}") raise APIError("Failed to generate text") async def process_prompt(prompt, client, semaphore): async with semaphore: try: result = await generate_text_with_retry(prompt, client) return prompt, result except APIError: return prompt, "Failed to generate response after multiple attempts." async def main(): prompts = [f"Tell me a fact about number {i}" for i in range(20)] max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as client: tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts] results = await asyncio.gather(*tasks) for prompt, result in results: print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
Tämä parannettu versio sisältää:
- Mukautettu
APIError
poikkeus API-virheiden osalta. - A
generate_text_with_retry
toiminto koristeltu@retry
sitkeyskirjastosta, toteuttaen eksponentiaalisen perääntymisen. - Virhe käsittelyssä
process_prompt
toiminto havaita ja raportoida viat.
Suorituskyvyn optimointi: vastausten suoratoisto
Pitkän muodon sisällön luomiseksi suoratoistovastaukset voivat parantaa merkittävästi sovelluksesi havaittua suorituskykyä. Sen sijaan, että odottaisit koko vastausta, voit käsitellä ja näyttää tekstipalstoja, kun niitä tulee saataville.
import asyncio from openai import AsyncOpenAI async def stream_text(prompt, client): stream = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], stream=True ) full_response = "" async for chunk in stream: if chunk.choices[0].delta.content is not None: content = chunk.choices[0].delta.content full_response += content print(content, end='', flush=True) print("\n") return full_response async def main(): prompt = "Write a short story about a time-traveling scientist." async with AsyncOpenAI() as client: result = await stream_text(prompt, client) print(f"Full response:\n{result}") asyncio.run(main())
Tämä esimerkki osoittaa, kuinka vastaus voidaan suoratoistaa API:sta ja tulostaa jokainen pala sen saapuessa. Tämä lähestymistapa on erityisen hyödyllinen chat-sovelluksissa tai kaikissa skenaarioissa, joissa haluat antaa käyttäjälle reaaliaikaista palautetta.
Async-työnkulkujen luominen LangChainilla
Monimutkaisempia LLM-käyttöisiä sovelluksia varten LangChain-kehys tarjoaa korkean tason abstraktion, joka yksinkertaistaa useiden LLM-kutsujen ketjuttamista ja muiden työkalujen integrointia. Katsotaanpa esimerkkiä LangChainin käytöstä asynkronisten ominaisuuksien kanssa:
Tämä esimerkki osoittaa, kuinka LangChainia voidaan käyttää monimutkaisempien työnkulkujen luomiseen suoratoistolla ja asynkronisella suorituksella. The AsyncCallbackManager
ja StreamingStdOutCallbackHandler
mahdollistaa luodun sisällön reaaliaikaisen suoratoiston.
import asyncio from langchain.llms import OpenAI from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.callbacks.manager import AsyncCallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler async def generate_story(topic): llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()])) prompt = PromptTemplate( input_variables=["topic"], template="Write a short story about {topic}." ) chain = LLMChain(llm=llm, prompt=prompt) return await chain.arun(topic=topic) async def main(): topics = ["a magical forest", "a futuristic city", "an underwater civilization"] tasks = [generate_story(topic) for topic in topics] stories = await asyncio.gather(*tasks) for topic, story in zip(topics, stories): print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n") asyncio.run(main())
Palvelemme Async LLM -sovelluksia FastAPI:n avulla
Jos haluat tehdä asynkronisesta LLM-sovelluksestasi saataville verkkopalveluna, FastAPI on loistava valinta, koska se tukee natiivisti asynkronisia operaatioita. Tässä on esimerkki yksinkertaisen API-päätepisteen luomisesta tekstin luomista varten:
from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel from openai import AsyncOpenAI app = FastAPI() client = AsyncOpenAI() class GenerationRequest(BaseModel): prompt: str class GenerationResponse(BaseModel): generated_text: str @app.post("/generate", response_model=GenerationResponse) async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks): response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": request.prompt}] ) generated_text = response.choices[0].message.content # Simulate some post-processing in the background background_tasks.add_task(log_generation, request.prompt, generated_text) return GenerationResponse(generated_text=generated_text) async def log_generation(prompt: str, generated_text: str): # Simulate logging or additional processing await asyncio.sleep(2) print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
Tämä FastAPI-sovellus luo päätepisteen /generate
joka hyväksyy kehotteen ja palauttaa luodun tekstin. Se myös osoittaa, kuinka taustatehtäviä voidaan käyttää lisäkäsittelyyn estämättä vastausta.
Parhaat käytännöt ja yleiset sudenkuopat
Kun työskentelet async LLM-sovellusliittymien kanssa, pidä seuraavat parhaat käytännöt mielessä:
- Käytä yhteyksien yhdistämistä: Kun teet useita pyyntöjä, käytä yhteyksiä uudelleen vähentääksesi yleiskustannuksia.
- Ota käyttöön asianmukainen virheenkäsittely: Ota aina huomioon verkko-ongelmat, API-virheet ja odottamattomat vastaukset.
- Noudata rajoja: Käytä semaforeja tai muita samanaikaisuuden hallintamekanismeja välttääksesi API:n ylikuormituksen.
- Tarkkaile ja kirjaa: Ota käyttöön kattava kirjaus suorituskyvyn seuraamiseksi ja ongelmien tunnistamiseksi.
- Käytä suoratoistoa pitkäkestoiseen sisältöön: Se parantaa käyttökokemusta ja mahdollistaa osittaisten tulosten varhaisen käsittelyn.