Skip to content

Instantly share code, notes, and snippets.

@jacepark12
Created July 29, 2024 14:36
Show Gist options
  • Select an option

  • Save jacepark12/1886f6e95d9051f198992323758defa1 to your computer and use it in GitHub Desktop.

Select an option

Save jacepark12/1886f6e95d9051f198992323758defa1 to your computer and use it in GitHub Desktop.
import asyncio
from typing import NamedTuple, Literal, Union, List, Tuple
from langfuse.decorators import observe
import websockets
import json
import os
class Message(NamedTuple):
type: Literal["message", "messageEnd"]
data: str
class SourceMetaData(NamedTuple):
title: str
url: str
class Source(NamedTuple):
page_content: str
meta_data: SourceMetaData
class Sources(NamedTuple):
type: Literal["sources"]
datas: List[Source]
def parse_message(response: str) -> Union[Message, Sources]:
try:
message = json.loads(response)
except json.JSONDecodeError:
raise ValueError("Invalid JSON response")
if not isinstance(message, dict):
raise TypeError("Expected a JSON object")
message_type = message.get('type')
data = message.get("data", "")
if message_type in ["message", "messageEnd"]:
return Message(type=message_type, data=data)
elif message_type == "sources":
sources: List[Source] = []
if not isinstance(data, list):
raise ValueError("Unable to parse response value")
for source in data:
meta_data = SourceMetaData(
title=source.get("metadata").get("title"),
url=source.get("metadata").get("url"),
)
sources.append(Source(
page_content=source.get("pageContent"),
meta_data=meta_data
))
return Sources(
type=message_type,
datas=sources
)
raise ValueError("Unable to parse response value")
async def websocket_client(query: str) -> Tuple[str, Sources]:
chat_id = os.urandom(20).hex()
# Replace with your WebSocket server URI
uri = "ws://127.0.0.1:3001/?chatModel=GPT-3.5+turbo&chatModelProvider=openai&embeddingModel=Text+embedding+3+small&embeddingModelProvider=openai"
async with websockets.connect(uri) as websocket:
# Send an initial message if needed
initial_message = {"type": "message", "message": {"chatId": chat_id,
"content": query},
"focusMode": "webSearch", "history": [["human", query]]}
await websocket.send(json.dumps(initial_message))
gathered_messages: str = ""
sources: Sources = Sources(
type="sources",
datas=[]
)
while True:
response = await websocket.recv()
message = json.loads(response)
print(f"Received: {message}")
try:
message = parse_message(response)
except (ValueError, TypeError) as e:
print(f"Error: {e}")
# Gather the message
if isinstance(message, Message):
gathered_messages += message.data
elif isinstance(message, Sources):
sources = message
# Check for the termination message
if message.type == "messageEnd":
print("Received messageEnd. Stopping message gathering.")
break
return gathered_messages, sources
@observe()
def query_to_perplexica(query: str) -> tuple[str, Sources]:
response, response_sources = asyncio.get_event_loop().run_until_complete(websocket_client(query))
return response, response_sources
if __name__ == "__main__":
query = "AI Agents"
gathered_messages, sources = asyncio.get_event_loop().run_until_complete(websocket_client(query))
print("All gathered messages:")
print(gathered_messages)
print("Sources")
print(sources)
@estcap2
Copy link

estcap2 commented Aug 7, 2024

This is a great idea! I am not being able to make it work though. I modified it a bit to get logging, but I never get a response.

DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:asyncio:Close <_UnixSelectorEventLoop running=False closed=False debug=True>
DEBUG:websockets.client:= connection is CONNECTING
DEBUG:websockets.client:> GET /?chatModel=mistral-nemo%3A12b-instruct-2407-q8_0&chatModelProvider=ollama&embeddingModel=BGE+Small&embeddingModelProvider=local HTTP/1.1
DEBUG:websockets.client:> Host: localhost:3001
DEBUG:websockets.client:> Upgrade: websocket
DEBUG:websockets.client:> Connection: Upgrade
DEBUG:websockets.client:> Sec-WebSocket-Key: 9cXWIslMuTlHInqGEhwxCg==
DEBUG:websockets.client:> Sec-WebSocket-Version: 13
DEBUG:websockets.client:> Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
DEBUG:websockets.client:> Origin: http://localhost:3000
DEBUG:websockets.client:> User-Agent: Python/3.12 websockets/12.0
DEBUG:websockets.client:< HTTP/1.1 101 Switching Protocols
DEBUG:websockets.client:< Upgrade: websocket
DEBUG:websockets.client:< Connection: Upgrade
DEBUG:websockets.client:< Sec-WebSocket-Accept: 7/gFaBcvT03e4UHizYyfOQlqT9E=
DEBUG:websockets.client:= connection is OPEN
Sending initial message: {
  "type": "message",
  "message": {
    "chatId": "eb211b583fb39c4761405db73028b1f443c960d1",
    "content": "AI agents what are they"
  },
  "focusMode": "webSearch",
  "history": [
    [
      "human",
      "AI agents what are they"
    ]
  ]
}
DEBUG:websockets.client:> TEXT '{"type": "message", "message": {"chatId": "eb21...gents what are they"]]}' [201 bytes]
Timeout waiting for response
DEBUG:websockets.client:= connection is CLOSING
DEBUG:websockets.client:> CLOSE 1000 (OK) [2 bytes]
DEBUG:websockets.client:< CLOSE 1000 (OK) [2 bytes]
DEBUG:websockets.client:= connection is CLOSED

I set a timeout of 60 seconds and still not getting a response. I am able to use the normal Perplexica frontend though, and inspecting it's WS behaviour, I am unable to find the difference and why my script revieves no asnwer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment