Created
July 29, 2024 14:36
-
-
Save jacepark12/1886f6e95d9051f198992323758defa1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| from typing import NamedTuple, Literal, Union, List, Tuple | |
| from langfuse.decorators import observe | |
| import websockets | |
| import json | |
| import os | |
| class Message(NamedTuple): | |
| type: Literal["message", "messageEnd"] | |
| data: str | |
| class SourceMetaData(NamedTuple): | |
| title: str | |
| url: str | |
| class Source(NamedTuple): | |
| page_content: str | |
| meta_data: SourceMetaData | |
| class Sources(NamedTuple): | |
| type: Literal["sources"] | |
| datas: List[Source] | |
| def parse_message(response: str) -> Union[Message, Sources]: | |
| try: | |
| message = json.loads(response) | |
| except json.JSONDecodeError: | |
| raise ValueError("Invalid JSON response") | |
| if not isinstance(message, dict): | |
| raise TypeError("Expected a JSON object") | |
| message_type = message.get('type') | |
| data = message.get("data", "") | |
| if message_type in ["message", "messageEnd"]: | |
| return Message(type=message_type, data=data) | |
| elif message_type == "sources": | |
| sources: List[Source] = [] | |
| if not isinstance(data, list): | |
| raise ValueError("Unable to parse response value") | |
| for source in data: | |
| meta_data = SourceMetaData( | |
| title=source.get("metadata").get("title"), | |
| url=source.get("metadata").get("url"), | |
| ) | |
| sources.append(Source( | |
| page_content=source.get("pageContent"), | |
| meta_data=meta_data | |
| )) | |
| return Sources( | |
| type=message_type, | |
| datas=sources | |
| ) | |
| raise ValueError("Unable to parse response value") | |
| async def websocket_client(query: str) -> Tuple[str, Sources]: | |
| chat_id = os.urandom(20).hex() | |
| # Replace with your WebSocket server URI | |
| uri = "ws://127.0.0.1:3001/?chatModel=GPT-3.5+turbo&chatModelProvider=openai&embeddingModel=Text+embedding+3+small&embeddingModelProvider=openai" | |
| async with websockets.connect(uri) as websocket: | |
| # Send an initial message if needed | |
| initial_message = {"type": "message", "message": {"chatId": chat_id, | |
| "content": query}, | |
| "focusMode": "webSearch", "history": [["human", query]]} | |
| await websocket.send(json.dumps(initial_message)) | |
| gathered_messages: str = "" | |
| sources: Sources = Sources( | |
| type="sources", | |
| datas=[] | |
| ) | |
| while True: | |
| response = await websocket.recv() | |
| message = json.loads(response) | |
| print(f"Received: {message}") | |
| try: | |
| message = parse_message(response) | |
| except (ValueError, TypeError) as e: | |
| print(f"Error: {e}") | |
| # Gather the message | |
| if isinstance(message, Message): | |
| gathered_messages += message.data | |
| elif isinstance(message, Sources): | |
| sources = message | |
| # Check for the termination message | |
| if message.type == "messageEnd": | |
| print("Received messageEnd. Stopping message gathering.") | |
| break | |
| return gathered_messages, sources | |
| @observe() | |
| def query_to_perplexica(query: str) -> tuple[str, Sources]: | |
| response, response_sources = asyncio.get_event_loop().run_until_complete(websocket_client(query)) | |
| return response, response_sources | |
| if __name__ == "__main__": | |
| query = "AI Agents" | |
| gathered_messages, sources = asyncio.get_event_loop().run_until_complete(websocket_client(query)) | |
| print("All gathered messages:") | |
| print(gathered_messages) | |
| print("Sources") | |
| print(sources) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a great idea! I am not being able to make it work though. I modified it a bit to get logging, but I never get a response.
I set a timeout of 60 seconds and still not getting a response. I am able to use the normal Perplexica frontend though, and inspecting it's WS behaviour, I am unable to find the difference and why my script revieves no asnwer.