Created
January 11, 2025 19:23
-
-
Save christian-taillon/a51a8570cddcc4d987a2c0223ebfee71 to your computer and use it in GitHub Desktop.
A Python wrapper for Cloudflare WorkersAI that enables seamless integration with their REST API, allowing direct access to AI models through a simplified manifold pipeline pattern.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: CloudFlare Manifold Pipe | |
| authors: christian-taillon | |
| author_url: https://christiant.io | |
| funding_url: https://github.com/open-webui | |
| version: 0.1.6 | |
| required_open_webui_version: 0.3.17 | |
| license: MIT | |
| """ | |
| import os | |
| import requests | |
| import json | |
| import time | |
| from typing import List, Union, Generator, Iterator | |
| from pydantic import BaseModel, Field | |
| from open_webui.utils.misc import pop_system_message | |
| class Pipe: | |
| class Valves(BaseModel): | |
| CF_ACCOUNT_ID: str = Field(default="") | |
| CF_API_KEY: str = Field(default="") | |
| CF_AI_Gateway: str = Field(default="") | |
| def __init__(self): | |
| self.type = "manifold" | |
| self.id = "cloudflare" | |
| self.name = "cloudflare/" | |
| self.valves = self.Valves( | |
| **{ | |
| "CF_ACCOUNT_ID": os.getenv("CF_ACCOUNT_ID", ""), | |
| "CF_API_KEY": os.getenv("CF_API_KEY", ""), | |
| "CF_AI_Gateway": os.getenv("CF_AI_Gateway", ""), | |
| } | |
| ) | |
| def get_cloudflare_models(self): | |
| return [ | |
| {"id": "@cf/meta/llama-3.1-8b-instruct", "name": "llama3.1:8b"}, | |
| {"id": "@cf/meta/llama-2-7b-chat-fp16", "name": "llama2:7b"}, | |
| {"id": "@cf/meta/llama-3.3-70b-instruct-fp8-fast", "name": "llama3.3:70b"}, | |
| {"id": "@cf/meta/llama-3.2-11b-vision-instruct", "name": "llama3.2:11b"}, | |
| { | |
| "id": "@hf/thebloke/deepseek-coder-6.7b-instruct-awq", | |
| "name": "deepseek:6.7b", | |
| }, | |
| ] | |
| def pipes(self) -> List[dict]: | |
| return self.get_cloudflare_models() | |
| def pipe(self, body: dict) -> Union[str, Generator, Iterator]: | |
| system_message, messages = pop_system_message(body["messages"]) | |
| model_id = ( | |
| body["model"].split("cloudfare_workerai.")[-1] | |
| if "cloudfare_workerai." in body["model"] | |
| else body["model"] | |
| ) | |
| url = f"https://gateway.ai.cloudflare.com/v1/{self.valves.CF_ACCOUNT_ID}/{self.valves.CF_AI_Gateway}/workers-ai/{model_id}" | |
| headers = { | |
| "Authorization": f"Bearer {self.valves.CF_API_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "prompt": messages[-1]["content"], | |
| "stream": body.get("stream", False), | |
| **({"system": str(system_message)} if system_message else {}), | |
| } | |
| try: | |
| if body.get("stream", False): | |
| return self.stream_response(url, headers, payload) | |
| else: | |
| return self.non_stream_response(url, headers, payload) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request failed: {e}") | |
| return f"Error: Request failed: {e}" | |
| except Exception as e: | |
| print(f"Error in pipe method: {e}") | |
| return f"Error: {e}" | |
| def stream_response(self, url, headers, payload): | |
| try: | |
| with requests.post( | |
| url, headers=headers, json=payload, stream=True | |
| ) as response: | |
| if response.status_code != 200: | |
| raise Exception( | |
| f"HTTP Error {response.status_code}: {response.text}" | |
| ) | |
| for chunk in response.iter_lines(): | |
| if chunk: | |
| line = chunk.decode("utf-8") | |
| if line.startswith("data: "): | |
| if line.strip() == "data: [DONE]": | |
| break | |
| try: | |
| json_str = line[6:] | |
| data = json.loads(json_str) | |
| if "response" in data: | |
| yield data["response"] | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| yield f"Error: {str(e)}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment