Streaming Chat completion using langchain and websockets – Websocket

by
Ali Hasan
django django-channels langchain llama-cpp-python websocket

Quick Fix: To enable streaming chat completion using langchain and websockets, implement the AsyncIteratorCallbackHandler as shown in the code snippet below:

from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler

class TestLangchainAsync(unittest.IsolatedAsyncioTestCase):
    async def test_aiter(self):
        handler = AsyncIteratorCallbackHandler()
        llm = OpenAI(
            temperature=0,
            streaming=True,
            callbacks=[handler],
            openai_api_key="sk-xxxxx",
            openai_proxy="http://127.0.0.1:7890",
        )
        prompt = PromptTemplate(
            input_variables=["product"],
            template="What is a good name for a company that makes {product}?",
        )
        prompt = prompt.format(product="colorful socks")
        asyncio.create_task(llm.agenerate([prompt]))
        async for i in handler.aiter():
            print(i)

This will allow you to receive streaming results from your LLM model and handle them as they become available.

The Solutions:

Solution 1: AsyncIteratorCallbackHandler

To solve the issue of using AsyncCallbackHandler, you can utilize the AsyncIteratorCallbackHandler class provided by langchain, designed specifically for handling streaming completions asynchronously. Here’s an example of how to use it:

import asyncio

from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler

async def send_message_to_room(room_group_name, message):
    channel_layer = get_channel_layer()
    await channel_layer.group_send(
        room_group_name,
        {
            "type": "chat_message",
            "message": message,
        }
    )

class MyCustomHandler(AsyncIteratorCallbackHandler):

    def __init__(self, room_group_name):
        self.room_group_name = room_group_name

    async def on_llm_new_token(self, token: str, **kwargs):
        await send_message_to_room(self.room_group_name, token)

def generate_cited_answer_stream(roomname, question=question, texts=texts, responsetype="Simple and Pedagogical",
                                       system_message_with_response_type=system_message_with_response_type,
                                       human_message_with_response_type=human_message_with_response_type):

    handler = MyCustomHandler(room_group_name=roomname)

    chat = ChatOpenAI(temperature=0, streaming=True, callbacks=[handler])

    system_message_with_response_type = SystemMessagePromptTemplate.from_template(system_message_with_response_type)
    human_message_prompt = HumanMessagePromptTemplate.from_template(human_message_with_response_type)

    chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

    prompt_value = chat_prompt.format_prompt(question=question, texts=texts, responsetype=responsetype)

    async for i in chat.agenerate([prompt_value.to_messages()]):
        print(i)

In this example, the AsyncIteratorCallbackHandler is used within the MyCustomHandler class, which is registered as a callback for the ChatOpenAI instance. When new tokens are generated by the LLM, the on_llm_new_token method of the MyCustomHandler class is called, allowing you to handle the tokens and send messages to your WebSocket room accordingly.

Q&A

Can you give me an example of a working code snippet?

Yes, AsyncIteratorCallbackHandler works.

Video Explanation:

The following video, titled "DataStreaming with LangChain & FastAPI - YouTube", provides additional insights and in-depth exploration related to the topics discussed in this post.

Play video

In this Video I will explain, how to use data streaming with LLMs, which return token step by step instead of wating for a complete response ...