The Solutions:
Solution 1: AsyncIteratorCallbackHandler
To solve the issue of using AsyncCallbackHandler, you can utilize the AsyncIteratorCallbackHandler
class provided by langchain
, designed specifically for handling streaming completions asynchronously. Here’s an example of how to use it:
import asyncio
from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler
async def send_message_to_room(room_group_name, message):
channel_layer = get_channel_layer()
await channel_layer.group_send(
room_group_name,
{
"type": "chat_message",
"message": message,
}
)
class MyCustomHandler(AsyncIteratorCallbackHandler):
def __init__(self, room_group_name):
self.room_group_name = room_group_name
async def on_llm_new_token(self, token: str, **kwargs):
await send_message_to_room(self.room_group_name, token)
def generate_cited_answer_stream(roomname, question=question, texts=texts, responsetype="Simple and Pedagogical",
system_message_with_response_type=system_message_with_response_type,
human_message_with_response_type=human_message_with_response_type):
handler = MyCustomHandler(room_group_name=roomname)
chat = ChatOpenAI(temperature=0, streaming=True, callbacks=[handler])
system_message_with_response_type = SystemMessagePromptTemplate.from_template(system_message_with_response_type)
human_message_prompt = HumanMessagePromptTemplate.from_template(human_message_with_response_type)
chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])
prompt_value = chat_prompt.format_prompt(question=question, texts=texts, responsetype=responsetype)
async for i in chat.agenerate([prompt_value.to_messages()]):
print(i)
In this example, the AsyncIteratorCallbackHandler
is used within the MyCustomHandler
class, which is registered as a callback for the ChatOpenAI
instance. When new tokens are generated by the LLM, the on_llm_new_token
method of the MyCustomHandler
class is called, allowing you to handle the tokens and send messages to your WebSocket room accordingly.
Q&A
Can you give me an example of a working code snippet?
Yes, AsyncIteratorCallbackHandler works.
Video Explanation:
The following video, titled "DataStreaming with LangChain & FastAPI - YouTube", provides additional insights and in-depth exploration related to the topics discussed in this post.
In this Video I will explain, how to use data streaming with LLMs, which return token step by step instead of wating for a complete response ...
The following video, titled "DataStreaming with LangChain & FastAPI - YouTube", provides additional insights and in-depth exploration related to the topics discussed in this post.
In this Video I will explain, how to use data streaming with LLMs, which return token step by step instead of wating for a complete response ...