|
#!/usr/bin/env python3 |
|
""" |
|
OpenCode 异步开发任务执行脚本 |
|
|
|
功能: |
|
1. 检查并启动 OpenCode 服务 |
|
2. 创建会话并异步提交开发任务 |
|
3. 监控任务执行状态 |
|
4. 提取结果并发送飞书通知 |
|
|
|
使用方法: |
|
python3 run.py "执行xxx开发任务" |
|
python3 run.py "执行xxx开发任务" --project-dir /path/to/project |
|
""" |
|
|
|
import os |
|
import sys |
|
import time |
|
import socket |
|
import uuid |
|
import argparse |
|
import subprocess |
|
import logging |
|
from typing import Dict, List, Optional |
|
from dataclasses import dataclass |
|
|
|
import requests |
|
|
|
# 配置日志 |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
handlers=[logging.FileHandler("run.log"), logging.StreamHandler()], |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
# 常量配置 |
|
OPENCODE_HOST = "127.0.0.1" |
|
OPENCODE_BIN = "/home/xxx/.opencode/bin/opencode" |
|
OPENCLAW_BIN = "/home/xxx/.nvm/versions/node/v22.21.1/bin/openclaw" |
|
DEFAULT_USER_ID = "ou_xxx" |
|
DEFAULT_PROJECT_DIR = "/mnt/github/demo/backend" |
|
MAX_RETRIES = 120 |
|
RETRY_INTERVAL = 5 |
|
TIMEOUT = 600000 |
|
PORT_RANGE_START = 14000 |
|
PORT_RANGE_END = 14999 |
|
|
|
|
|
def find_available_port() -> int: |
|
"""查找可用端口(跳过已被 opencode 占用的端口)""" |
|
for port in range(PORT_RANGE_START, PORT_RANGE_END): |
|
# 检查端口是否被 opencode 进程占用 |
|
if OpenCodeServiceManager.find_opencode_process(port): |
|
continue |
|
|
|
# 检查端口是否可用 |
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
try: |
|
sock.bind((OPENCODE_HOST, port)) |
|
sock.close() |
|
return port |
|
except socket.error: |
|
continue |
|
raise Exception(f"在端口范围 {PORT_RANGE_START}-{PORT_RANGE_END} 内未找到可用端口") |
|
|
|
|
|
@dataclass |
|
class SessionInfo: |
|
"""会话信息""" |
|
|
|
id: str |
|
version: str |
|
project_id: str |
|
directory: str |
|
title: str |
|
created: int |
|
updated: int |
|
|
|
|
|
@dataclass |
|
class MessagePart: |
|
"""消息部分""" |
|
|
|
id: str |
|
type: str |
|
text: Optional[str] = None |
|
|
|
|
|
@dataclass |
|
class MessageInfo: |
|
"""消息信息""" |
|
|
|
id: str |
|
session_id: str |
|
role: str |
|
created: int |
|
completed: Optional[int] |
|
parent_id: Optional[str] |
|
model_id: str |
|
provider_id: str |
|
mode: str |
|
agent: str |
|
finish: Optional[str] |
|
|
|
|
|
@dataclass |
|
class Message: |
|
"""完整消息""" |
|
|
|
info: MessageInfo |
|
parts: List[MessagePart] |
|
|
|
|
|
class OpenCodeClient: |
|
"""OpenCode API 客户端""" |
|
|
|
def __init__(self, host: str, port: int): |
|
self.host = host |
|
self.port = port |
|
self.base_url = f"http://{host}:{port}" |
|
|
|
def create_session(self) -> SessionInfo: |
|
"""创建会话""" |
|
url = f"{self.base_url}/session" |
|
|
|
logger.info(f"创建会话: {url}") |
|
response = requests.post(url, timeout=30) |
|
response.raise_for_status() |
|
|
|
data = response.json() |
|
return SessionInfo( |
|
id=data["id"], |
|
version=data["version"], |
|
project_id=data["projectID"], |
|
directory=data.get("directory", ""), |
|
title=data["title"], |
|
created=data["time"]["created"], |
|
updated=data["time"]["updated"], |
|
) |
|
|
|
def submit_async_prompt( |
|
self, session_id: str, message: str, message_id: Optional[str] = None |
|
) -> str: |
|
"""异步提交消息""" |
|
if message_id is None: |
|
message_id = f"msg_{self._generate_id()}" |
|
|
|
url = f"{self.base_url}/session/{session_id}/prompt_async" |
|
headers = {"content-type": "application/json"} |
|
|
|
payload = { |
|
"agent": "build", |
|
"model": {"modelID": "glm-4.7", "providerID": "zhipuai-coding-plan"}, |
|
"messageID": message_id, |
|
"parts": [ |
|
{"id": f"prt_{self._generate_id()}", "type": "text", "text": message} |
|
], |
|
} |
|
|
|
logger.info(f"提交异步任务: {session_id}, message_id: {message_id}") |
|
response = requests.post(url, headers=headers, json=payload, timeout=30) |
|
response.raise_for_status() |
|
|
|
return message_id |
|
|
|
def get_messages(self, session_id: str, limit: int = 400) -> List[Message]: |
|
"""获取会话消息""" |
|
url = f"{self.base_url}/session/{session_id}/message" |
|
params = {"limit": limit} |
|
|
|
logger.debug(f"查询消息: {session_id}") |
|
response = requests.get(url, params=params, timeout=30) |
|
response.raise_for_status() |
|
|
|
data = response.json() |
|
messages = [] |
|
|
|
for item in data: |
|
info_data = item["info"] |
|
info = MessageInfo( |
|
id=info_data.get("id", ""), |
|
session_id=info_data.get("sessionID", ""), |
|
role=info_data.get("role", ""), |
|
created=info_data.get("time", {}).get("created", 0), |
|
completed=info_data.get("time", {}).get("completed"), |
|
parent_id=info_data.get("parentID", ""), |
|
model_id=info_data.get("modelID", ""), |
|
provider_id=info_data.get("providerID", ""), |
|
mode=info_data.get("mode", ""), |
|
agent=info_data.get("agent", ""), |
|
finish=info_data.get("finish"), |
|
) |
|
|
|
parts = [] |
|
for part in item.get("parts", []): |
|
parts.append( |
|
MessagePart(id=part["id"], type=part["type"], text=part.get("text")) |
|
) |
|
|
|
messages.append(Message(info=info, parts=parts)) |
|
|
|
return messages |
|
|
|
def abort_session(self, session_id: str): |
|
"""中断会话""" |
|
url = f"{self.base_url}/session/{session_id}/abort" |
|
|
|
logger.warning(f"中断会话: {session_id}") |
|
response = requests.post(url, timeout=30) |
|
response.raise_for_status() |
|
|
|
def health_check(self) -> bool: |
|
"""健康检查""" |
|
url = f"{self.base_url}/global/health" |
|
try: |
|
response = requests.get(url, timeout=5) |
|
return response.status_code == 200 |
|
except requests.RequestException: |
|
return False |
|
|
|
@staticmethod |
|
def _generate_id() -> str: |
|
"""生成唯一ID""" |
|
return str(uuid.uuid4()).replace("-", "") |
|
|
|
|
|
class OpenCodeServiceManager: |
|
"""OpenCode 服务管理""" |
|
|
|
@staticmethod |
|
def is_port_in_use(port: int, host: str = OPENCODE_HOST) -> bool: |
|
"""检查端口是否被占用""" |
|
try: |
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: |
|
sock.settimeout(1) |
|
result = sock.connect_ex((host, port)) |
|
return result == 0 |
|
except socket.error: |
|
return False |
|
|
|
@staticmethod |
|
def find_opencode_process(port: int) -> Optional[int]: |
|
"""查找占用指定端口的 opencode 进程 PID""" |
|
try: |
|
result = subprocess.run( |
|
["lsof", "-i", f":{port}", "-t"], |
|
capture_output=True, |
|
text=True, |
|
timeout=5, |
|
) |
|
if result.returncode == 0 and result.stdout.strip(): |
|
pids = result.stdout.strip().split("\n") |
|
for pid in pids: |
|
# 验证是否是 opencode 进程 |
|
check_cmd = ["ps", "-p", pid, "-o", "command="] |
|
cmd_result = subprocess.run( |
|
check_cmd, capture_output=True, text=True, timeout=5 |
|
) |
|
if "opencode" in cmd_result.stdout and "serve" in cmd_result.stdout: |
|
return int(pid) |
|
except (subprocess.TimeoutExpired, FileNotFoundError, ValueError): |
|
pass |
|
return None |
|
|
|
@staticmethod |
|
def start_service( |
|
port: int, working_dir: Optional[str] = None |
|
) -> Optional[subprocess.Popen]: |
|
"""启动 OpenCode 服务""" |
|
if OpenCodeServiceManager.is_port_in_use(port): |
|
pid = OpenCodeServiceManager.find_opencode_process(port) |
|
if pid: |
|
logger.warning( |
|
f"端口 {port} 已被其他 opencode 进程 (PID: {pid}) 占用,跳过" |
|
) |
|
else: |
|
logger.warning(f"端口 {port} 已被非 opencode 进程占用,跳过") |
|
return None |
|
|
|
cmd = [OPENCODE_BIN, "serve", "--port", str(port)] |
|
if working_dir: |
|
logger.info( |
|
f"启动 OpenCode 服务: {' '.join(cmd)} (工作目录: {working_dir})" |
|
) |
|
else: |
|
logger.info(f"启动 OpenCode 服务: {' '.join(cmd)}") |
|
|
|
# 启动服务,指定工作目录 |
|
process = subprocess.Popen( |
|
cmd, |
|
cwd=working_dir, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
preexec_fn=os.setsid, |
|
) |
|
|
|
# 等待服务启动 |
|
max_wait = 30 |
|
for i in range(max_wait): |
|
if OpenCodeServiceManager.is_port_in_use(port): |
|
logger.info(f"OpenCode 服务启动成功 (耗时 {i + 1}秒)") |
|
return process |
|
time.sleep(1) |
|
|
|
# 启动失败,清理进程 |
|
try: |
|
process.terminate() |
|
process.wait(timeout=5) |
|
except Exception: |
|
process.kill() |
|
raise Exception(f"OpenCode 服务启动超时 ({max_wait}秒)") |
|
|
|
@staticmethod |
|
def ensure_service_running( |
|
port: int, working_dir: Optional[str] = None |
|
) -> Optional[subprocess.Popen]: |
|
"""确保服务运行(只启动新服务,不复用已有服务)""" |
|
return OpenCodeServiceManager.start_service(port, working_dir) |
|
|
|
|
|
class TaskExecutor: |
|
"""任务执行器""" |
|
|
|
def __init__( |
|
self, |
|
host: str = OPENCODE_HOST, |
|
working_dir: Optional[str] = None, |
|
): |
|
self.host = host |
|
self.working_dir = working_dir |
|
self.port: Optional[int] = None |
|
self.opencode_process: Optional[subprocess.Popen] = None |
|
|
|
def _allocate_port(self) -> int: |
|
"""分配可用端口""" |
|
port = find_available_port() |
|
self.port = port |
|
logger.info(f"分配动态端口: {port}") |
|
return port |
|
|
|
def execute_task(self, task_description: str, user_id: Optional[str] = None) -> str: |
|
"""执行开发任务""" |
|
logger.info(f"开始执行任务: {task_description[:100]}...") |
|
|
|
try: |
|
# 0. 分配端口 |
|
port = self._allocate_port() |
|
self.client = OpenCodeClient(self.host, port) |
|
|
|
# 1. 启动服务(在工作目录中启动) |
|
self.opencode_process = OpenCodeServiceManager.ensure_service_running( |
|
port, self.working_dir |
|
) |
|
|
|
# 检查服务是否成功启动 |
|
if self.opencode_process is None: |
|
raise Exception(f"无法启动 OpenCode 服务(端口 {port} 被占用)") |
|
|
|
# 等待服务就绪 |
|
if not self.client.health_check(): |
|
raise Exception(f"OpenCode 服务启动失败(端口 {port})") |
|
|
|
# 2. 创建会话 |
|
session = self.client.create_session() |
|
logger.info(f"会话已创建: {session.id}") |
|
|
|
# 3. 提交异步任务 |
|
message_id = self.client.submit_async_prompt(session.id, task_description) |
|
logger.info(f"任务已提交: {message_id}") |
|
|
|
# 4. 监控执行 |
|
result = self._monitor_session(session.id) |
|
|
|
# 5. 提取结果 |
|
summary = self._extract_summary(result) |
|
logger.info(f"任务完成: {summary[:200]}...") |
|
|
|
# 6. 发送通知 |
|
if user_id: |
|
self._send_notification(summary, user_id) |
|
|
|
return summary |
|
|
|
finally: |
|
# 7. 清理:终止 opencode 进程 |
|
self._terminate_opencode() |
|
|
|
def _terminate_opencode(self): |
|
"""终止 opencode serve 进程""" |
|
if self.opencode_process and self.opencode_process.poll() is None: |
|
logger.info(f"终止 OpenCode 服务 (端口: {self.port})") |
|
try: |
|
self.opencode_process.terminate() |
|
# 等待进程退出 |
|
self.opencode_process.wait(timeout=10) |
|
logger.info("OpenCode 服务已终止") |
|
except subprocess.TimeoutExpired: |
|
logger.warning("OpenCode 服务终止超时,强制 kill") |
|
self.opencode_process.kill() |
|
except Exception as e: |
|
logger.error(f"终止 OpenCode 服务失败: {e}") |
|
|
|
def _monitor_session(self, session_id: str) -> Message: |
|
"""监控会话执行状态""" |
|
logger.info(f"开始监控会话: {session_id}") |
|
|
|
for attempt in range(MAX_RETRIES): |
|
messages = self.client.get_messages(session_id) |
|
|
|
# 查找最新的 assistant 消息 |
|
latest_assistant = None |
|
for msg in reversed(messages): |
|
if msg.info.role == "assistant": |
|
latest_assistant = msg |
|
break |
|
|
|
if latest_assistant: |
|
finish = latest_assistant.info.finish |
|
|
|
if finish == "tool-calls": |
|
# 任务执行中 |
|
if attempt % 10 == 0: |
|
logger.info(f"任务执行中... ({attempt * RETRY_INTERVAL}秒)") |
|
time.sleep(RETRY_INTERVAL) |
|
elif finish == "stop": |
|
# 任务完成 |
|
logger.info(f"任务已完成,耗时 {attempt * RETRY_INTERVAL}秒") |
|
return latest_assistant |
|
else: |
|
# 其他状态 |
|
logger.debug(f"消息状态: {finish}") |
|
time.sleep(RETRY_INTERVAL) |
|
else: |
|
logger.debug(f"等待任务开始... ({attempt * RETRY_INTERVAL}秒)") |
|
time.sleep(RETRY_INTERVAL) |
|
|
|
raise Exception(f"任务执行超时 ({MAX_RETRIES * RETRY_INTERVAL}秒)") |
|
|
|
def _extract_summary(self, result: Message) -> str: |
|
"""提取结果摘要""" |
|
for part in result.parts: |
|
if part.type == "text" and part.text: |
|
# 截取前 1000 字符作为摘要 |
|
return part.text[:1000] |
|
return "任务已完成,但未提取到摘要" |
|
|
|
def _send_notification(self, message: str, user_id: str): |
|
"""发送飞书通知""" |
|
# 检查 openclaw 是否存在 |
|
if not os.path.exists(OPENCLAW_BIN): |
|
logger.warning(f"openclaw 不存在: {OPENCLAW_BIN},跳过飞书通知") |
|
return |
|
|
|
cmd = [ |
|
OPENCLAW_BIN, |
|
"message", |
|
"send", |
|
"--channel", |
|
"feishu", |
|
"--target", |
|
f"user:{user_id}", |
|
"--message", |
|
message[:500], # 限制消息长度 |
|
] |
|
|
|
logger.info(f"发送飞书通知: {message[:50]}...") |
|
try: |
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) |
|
if result.returncode == 0: |
|
logger.info("飞书通知发送成功") |
|
else: |
|
logger.warning(f"飞书通知发送失败: {result.stderr}") |
|
except subprocess.TimeoutExpired: |
|
logger.warning("飞书通知发送超时") |
|
except Exception as e: |
|
logger.warning(f"飞书通知发送异常: {e}") |
|
|
|
|
|
def main(): |
|
"""主函数""" |
|
parser = argparse.ArgumentParser(description="OpenCode 异步开发任务执行脚本") |
|
parser.add_argument("task", help="开发任务描述") |
|
parser.add_argument( |
|
"--project-dir", default=DEFAULT_PROJECT_DIR, help="项目目录路径" |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
# 验证项目目录存在 |
|
if not os.path.exists(args.project_dir): |
|
logger.error(f"项目目录不存在: {args.project_dir}") |
|
sys.exit(1) |
|
|
|
logger.info(f"使用项目目录: {args.project_dir}") |
|
|
|
try: |
|
# 创建任务执行器(传递项目目录作为 OpenCode 工作目录,端口动态分配) |
|
executor = TaskExecutor(host=OPENCODE_HOST, working_dir=args.project_dir) |
|
|
|
# 执行任务 |
|
result = executor.execute_task(args.task, DEFAULT_USER_ID) |
|
|
|
logger.info("=" * 80) |
|
logger.info("任务执行结果:") |
|
logger.info("=" * 80) |
|
logger.info(result) |
|
logger.info("=" * 80) |
|
|
|
except KeyboardInterrupt: |
|
logger.warning("用户中断任务") |
|
sys.exit(1) |
|
except Exception as e: |
|
logger.error(f"任务执行失败: {e}", exc_info=True) |
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |