Skip to main content

异步回调

如果您计划使用异步API,建议使用AsyncCallbackHandler来避免阻塞运行循环。

高级如果您在使用异步方法运行llm/chain/tool/agent时使用同步CallbackHandler,它仍然可以工作。然而,在底层,它将使用run_in_executor调用,如果您的CallbackHandler不是线程安全的,可能会引发问题。

import asyncio
from typing import Any, Dict, List

from langchain.chat_models import ChatOpenAI
from langchain.schema import LLMResult, HumanMessage
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler


class MyCustomSyncHandler(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs) -> None:
print(f"Sync handler being called in a `thread_pool_executor`: token: {token}")


class MyCustomAsyncHandler(AsyncCallbackHandler):
"""用于处理来自langchain的回调的异步回调处理程序。"""

async def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""当链开始运行时运行。"""
print("zzzz....")
await asyncio.sleep(0.3)
class_name = serialized["name"]
print("Hi! I just woke up. Your llm is starting")

async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""当链结束运行时运行。"""
print("zzzz....")
await asyncio.sleep(0.3)
print("Hi! I just woke up. Your llm is ending")


# 要启用流式传输,我们在ChatModel构造函数中传入`streaming=True`
# 此外,我们传入一个包含我们自定义处理程序的列表
chat = ChatOpenAI(
max_tokens=25,
streaming=True,
callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],
)

await chat.agenerate([[HumanMessage(content="Tell me a joke")]])

API参考:

zzzz....
Hi! I just woke up. Your llm is starting
Sync handler being called in a `thread_pool_executor`: token:
Sync handler being called in a `thread_pool_executor`: token: Why
Sync handler being called in a `thread_pool_executor`: token: don
Sync handler being called in a `thread_pool_executor`: token: 't
Sync handler being called in a `thread_pool_executor`: token: scientists
Sync handler being called in a `thread_pool_executor`: token: trust
Sync handler being called in a `thread_pool_executor`: token: atoms
Sync handler being called in a `thread_pool_executor`: token: ?
Sync handler being called in a `thread_pool_executor`: token:

Sync handler being called in a `thread_pool_executor`: token: Because
Sync handler being called in a `thread_pool_executor`: token: they
Sync handler being called in a `thread_pool_executor`: token: make
Sync handler being called in a `thread_pool_executor`: token: up
Sync handler being called in a `thread_pool_executor`: token: everything
Sync handler being called in a `thread_pool_executor`: token: .
Sync handler being called in a `thread_pool_executor`: token:
zzzz....
Hi! I just woke up. Your llm is ending


LLMResult(generations=[[ChatGeneration(text="Why don't scientists trust atoms? \n\nBecause they make up everything.", generation_info=None, message=AIMessage(content="Why don't scientists trust atoms? \n\nBecause they make up everything.", additional_kwargs={}, example=False))]], llm_output={'token_usage': {}, 'model_name': 'gpt-3.5-turbo'})