Skip to content

Instantly share code, notes, and snippets.

@geosmart
Created February 14, 2026 22:44
Show Gist options
  • Select an option

  • Save geosmart/9b30db262e8952bb81b53f3967c8d6de to your computer and use it in GitHub Desktop.

Select an option

Save geosmart/9b30db262e8952bb81b53f3967c8d6de to your computer and use it in GitHub Desktop.
飞书 -OpenClaw-OpenCode 实现个人远程开发助手
name title description tags created updated
oenclaw-opencode
SKILL
Use when user wants to submit development tasks; involves starting opencode server, creating sessions, submitting async prompts, and sending completion notifications
async
automation
feishu
openclaw
opencode
2026-02-14 10:39:39 -0800
2026-02-14 13:32:14 -0800

SKILL

Overview

OpenCode 异步开发任务提交技能,通过 OpenClaw 飞书机器人接收开发任务,后台执行 OpenCode 开发。

核心流程:

  1. 接收飞书消息触发开发任务
  2. 调用脚本后台执行开发任务
  3. 脚本自动处理:动态分配端口、启动服务、创建会话、提交任务、监控状态、发送通知

When to Use

  1. 用户通过飞书机器人发送"执行 xxx 开发任务"消息
  2. 需要后台异步执行 OpenCode 开发流程

Quick Reference

环境信息

  • 脚本位置:scripts/run.py
  • OpenCode 服务端口:动态分配(14000-14999)
  • 飞书用户 ID:ou_4d7c5e9546ebf96e275749621c670793
  • 默认项目目录:/mnt/gogs/kbase

核心命令

# 基本用法
nohup python3 scripts/run.py "执行xxx开发任务" > run-$time.log 2>&1 &

# 指定项目目录
nohup python3 scripts/run.py "执行xxx开发任务" --project-dir /path/to/project > run-$time.log 2>&1 &

命令行参数

参数 必需 默认值 说明
task - 开发任务描述
--project-dir /mnt/gogs/kbase 项目目录路径
#!/usr/bin/env python3
"""
OpenCode 异步开发任务执行脚本
功能:
1. 检查并启动 OpenCode 服务
2. 创建会话并异步提交开发任务
3. 监控任务执行状态
4. 提取结果并发送飞书通知
使用方法:
python3 run.py "执行xxx开发任务"
python3 run.py "执行xxx开发任务" --project-dir /path/to/project
"""
import os
import sys
import time
import socket
import uuid
import argparse
import subprocess
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
import requests
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("run.log"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
# 常量配置
OPENCODE_HOST = "127.0.0.1"
OPENCODE_BIN = "/home/xxx/.opencode/bin/opencode"
OPENCLAW_BIN = "/home/xxx/.nvm/versions/node/v22.21.1/bin/openclaw"
DEFAULT_USER_ID = "ou_xxx"
DEFAULT_PROJECT_DIR = "/mnt/github/demo/backend"
MAX_RETRIES = 120
RETRY_INTERVAL = 5
TIMEOUT = 600000
PORT_RANGE_START = 14000
PORT_RANGE_END = 14999
def find_available_port() -> int:
"""查找可用端口(跳过已被 opencode 占用的端口)"""
for port in range(PORT_RANGE_START, PORT_RANGE_END):
# 检查端口是否被 opencode 进程占用
if OpenCodeServiceManager.find_opencode_process(port):
continue
# 检查端口是否可用
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.bind((OPENCODE_HOST, port))
sock.close()
return port
except socket.error:
continue
raise Exception(f"在端口范围 {PORT_RANGE_START}-{PORT_RANGE_END} 内未找到可用端口")
@dataclass
class SessionInfo:
"""会话信息"""
id: str
version: str
project_id: str
directory: str
title: str
created: int
updated: int
@dataclass
class MessagePart:
"""消息部分"""
id: str
type: str
text: Optional[str] = None
@dataclass
class MessageInfo:
"""消息信息"""
id: str
session_id: str
role: str
created: int
completed: Optional[int]
parent_id: Optional[str]
model_id: str
provider_id: str
mode: str
agent: str
finish: Optional[str]
@dataclass
class Message:
"""完整消息"""
info: MessageInfo
parts: List[MessagePart]
class OpenCodeClient:
"""OpenCode API 客户端"""
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.base_url = f"http://{host}:{port}"
def create_session(self) -> SessionInfo:
"""创建会话"""
url = f"{self.base_url}/session"
logger.info(f"创建会话: {url}")
response = requests.post(url, timeout=30)
response.raise_for_status()
data = response.json()
return SessionInfo(
id=data["id"],
version=data["version"],
project_id=data["projectID"],
directory=data.get("directory", ""),
title=data["title"],
created=data["time"]["created"],
updated=data["time"]["updated"],
)
def submit_async_prompt(
self, session_id: str, message: str, message_id: Optional[str] = None
) -> str:
"""异步提交消息"""
if message_id is None:
message_id = f"msg_{self._generate_id()}"
url = f"{self.base_url}/session/{session_id}/prompt_async"
headers = {"content-type": "application/json"}
payload = {
"agent": "build",
"model": {"modelID": "glm-4.7", "providerID": "zhipuai-coding-plan"},
"messageID": message_id,
"parts": [
{"id": f"prt_{self._generate_id()}", "type": "text", "text": message}
],
}
logger.info(f"提交异步任务: {session_id}, message_id: {message_id}")
response = requests.post(url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
return message_id
def get_messages(self, session_id: str, limit: int = 400) -> List[Message]:
"""获取会话消息"""
url = f"{self.base_url}/session/{session_id}/message"
params = {"limit": limit}
logger.debug(f"查询消息: {session_id}")
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
messages = []
for item in data:
info_data = item["info"]
info = MessageInfo(
id=info_data.get("id", ""),
session_id=info_data.get("sessionID", ""),
role=info_data.get("role", ""),
created=info_data.get("time", {}).get("created", 0),
completed=info_data.get("time", {}).get("completed"),
parent_id=info_data.get("parentID", ""),
model_id=info_data.get("modelID", ""),
provider_id=info_data.get("providerID", ""),
mode=info_data.get("mode", ""),
agent=info_data.get("agent", ""),
finish=info_data.get("finish"),
)
parts = []
for part in item.get("parts", []):
parts.append(
MessagePart(id=part["id"], type=part["type"], text=part.get("text"))
)
messages.append(Message(info=info, parts=parts))
return messages
def abort_session(self, session_id: str):
"""中断会话"""
url = f"{self.base_url}/session/{session_id}/abort"
logger.warning(f"中断会话: {session_id}")
response = requests.post(url, timeout=30)
response.raise_for_status()
def health_check(self) -> bool:
"""健康检查"""
url = f"{self.base_url}/global/health"
try:
response = requests.get(url, timeout=5)
return response.status_code == 200
except requests.RequestException:
return False
@staticmethod
def _generate_id() -> str:
"""生成唯一ID"""
return str(uuid.uuid4()).replace("-", "")
class OpenCodeServiceManager:
"""OpenCode 服务管理"""
@staticmethod
def is_port_in_use(port: int, host: str = OPENCODE_HOST) -> bool:
"""检查端口是否被占用"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1)
result = sock.connect_ex((host, port))
return result == 0
except socket.error:
return False
@staticmethod
def find_opencode_process(port: int) -> Optional[int]:
"""查找占用指定端口的 opencode 进程 PID"""
try:
result = subprocess.run(
["lsof", "-i", f":{port}", "-t"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
pids = result.stdout.strip().split("\n")
for pid in pids:
# 验证是否是 opencode 进程
check_cmd = ["ps", "-p", pid, "-o", "command="]
cmd_result = subprocess.run(
check_cmd, capture_output=True, text=True, timeout=5
)
if "opencode" in cmd_result.stdout and "serve" in cmd_result.stdout:
return int(pid)
except (subprocess.TimeoutExpired, FileNotFoundError, ValueError):
pass
return None
@staticmethod
def start_service(
port: int, working_dir: Optional[str] = None
) -> Optional[subprocess.Popen]:
"""启动 OpenCode 服务"""
if OpenCodeServiceManager.is_port_in_use(port):
pid = OpenCodeServiceManager.find_opencode_process(port)
if pid:
logger.warning(
f"端口 {port} 已被其他 opencode 进程 (PID: {pid}) 占用,跳过"
)
else:
logger.warning(f"端口 {port} 已被非 opencode 进程占用,跳过")
return None
cmd = [OPENCODE_BIN, "serve", "--port", str(port)]
if working_dir:
logger.info(
f"启动 OpenCode 服务: {' '.join(cmd)} (工作目录: {working_dir})"
)
else:
logger.info(f"启动 OpenCode 服务: {' '.join(cmd)}")
# 启动服务,指定工作目录
process = subprocess.Popen(
cmd,
cwd=working_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
)
# 等待服务启动
max_wait = 30
for i in range(max_wait):
if OpenCodeServiceManager.is_port_in_use(port):
logger.info(f"OpenCode 服务启动成功 (耗时 {i + 1}秒)")
return process
time.sleep(1)
# 启动失败,清理进程
try:
process.terminate()
process.wait(timeout=5)
except Exception:
process.kill()
raise Exception(f"OpenCode 服务启动超时 ({max_wait}秒)")
@staticmethod
def ensure_service_running(
port: int, working_dir: Optional[str] = None
) -> Optional[subprocess.Popen]:
"""确保服务运行(只启动新服务,不复用已有服务)"""
return OpenCodeServiceManager.start_service(port, working_dir)
class TaskExecutor:
"""任务执行器"""
def __init__(
self,
host: str = OPENCODE_HOST,
working_dir: Optional[str] = None,
):
self.host = host
self.working_dir = working_dir
self.port: Optional[int] = None
self.opencode_process: Optional[subprocess.Popen] = None
def _allocate_port(self) -> int:
"""分配可用端口"""
port = find_available_port()
self.port = port
logger.info(f"分配动态端口: {port}")
return port
def execute_task(self, task_description: str, user_id: Optional[str] = None) -> str:
"""执行开发任务"""
logger.info(f"开始执行任务: {task_description[:100]}...")
try:
# 0. 分配端口
port = self._allocate_port()
self.client = OpenCodeClient(self.host, port)
# 1. 启动服务(在工作目录中启动)
self.opencode_process = OpenCodeServiceManager.ensure_service_running(
port, self.working_dir
)
# 检查服务是否成功启动
if self.opencode_process is None:
raise Exception(f"无法启动 OpenCode 服务(端口 {port} 被占用)")
# 等待服务就绪
if not self.client.health_check():
raise Exception(f"OpenCode 服务启动失败(端口 {port})")
# 2. 创建会话
session = self.client.create_session()
logger.info(f"会话已创建: {session.id}")
# 3. 提交异步任务
message_id = self.client.submit_async_prompt(session.id, task_description)
logger.info(f"任务已提交: {message_id}")
# 4. 监控执行
result = self._monitor_session(session.id)
# 5. 提取结果
summary = self._extract_summary(result)
logger.info(f"任务完成: {summary[:200]}...")
# 6. 发送通知
if user_id:
self._send_notification(summary, user_id)
return summary
finally:
# 7. 清理:终止 opencode 进程
self._terminate_opencode()
def _terminate_opencode(self):
"""终止 opencode serve 进程"""
if self.opencode_process and self.opencode_process.poll() is None:
logger.info(f"终止 OpenCode 服务 (端口: {self.port})")
try:
self.opencode_process.terminate()
# 等待进程退出
self.opencode_process.wait(timeout=10)
logger.info("OpenCode 服务已终止")
except subprocess.TimeoutExpired:
logger.warning("OpenCode 服务终止超时,强制 kill")
self.opencode_process.kill()
except Exception as e:
logger.error(f"终止 OpenCode 服务失败: {e}")
def _monitor_session(self, session_id: str) -> Message:
"""监控会话执行状态"""
logger.info(f"开始监控会话: {session_id}")
for attempt in range(MAX_RETRIES):
messages = self.client.get_messages(session_id)
# 查找最新的 assistant 消息
latest_assistant = None
for msg in reversed(messages):
if msg.info.role == "assistant":
latest_assistant = msg
break
if latest_assistant:
finish = latest_assistant.info.finish
if finish == "tool-calls":
# 任务执行中
if attempt % 10 == 0:
logger.info(f"任务执行中... ({attempt * RETRY_INTERVAL}秒)")
time.sleep(RETRY_INTERVAL)
elif finish == "stop":
# 任务完成
logger.info(f"任务已完成,耗时 {attempt * RETRY_INTERVAL}秒")
return latest_assistant
else:
# 其他状态
logger.debug(f"消息状态: {finish}")
time.sleep(RETRY_INTERVAL)
else:
logger.debug(f"等待任务开始... ({attempt * RETRY_INTERVAL}秒)")
time.sleep(RETRY_INTERVAL)
raise Exception(f"任务执行超时 ({MAX_RETRIES * RETRY_INTERVAL}秒)")
def _extract_summary(self, result: Message) -> str:
"""提取结果摘要"""
for part in result.parts:
if part.type == "text" and part.text:
# 截取前 1000 字符作为摘要
return part.text[:1000]
return "任务已完成,但未提取到摘要"
def _send_notification(self, message: str, user_id: str):
"""发送飞书通知"""
# 检查 openclaw 是否存在
if not os.path.exists(OPENCLAW_BIN):
logger.warning(f"openclaw 不存在: {OPENCLAW_BIN},跳过飞书通知")
return
cmd = [
OPENCLAW_BIN,
"message",
"send",
"--channel",
"feishu",
"--target",
f"user:{user_id}",
"--message",
message[:500], # 限制消息长度
]
logger.info(f"发送飞书通知: {message[:50]}...")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
logger.info("飞书通知发送成功")
else:
logger.warning(f"飞书通知发送失败: {result.stderr}")
except subprocess.TimeoutExpired:
logger.warning("飞书通知发送超时")
except Exception as e:
logger.warning(f"飞书通知发送异常: {e}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="OpenCode 异步开发任务执行脚本")
parser.add_argument("task", help="开发任务描述")
parser.add_argument(
"--project-dir", default=DEFAULT_PROJECT_DIR, help="项目目录路径"
)
args = parser.parse_args()
# 验证项目目录存在
if not os.path.exists(args.project_dir):
logger.error(f"项目目录不存在: {args.project_dir}")
sys.exit(1)
logger.info(f"使用项目目录: {args.project_dir}")
try:
# 创建任务执行器(传递项目目录作为 OpenCode 工作目录,端口动态分配)
executor = TaskExecutor(host=OPENCODE_HOST, working_dir=args.project_dir)
# 执行任务
result = executor.execute_task(args.task, DEFAULT_USER_ID)
logger.info("=" * 80)
logger.info("任务执行结果:")
logger.info("=" * 80)
logger.info(result)
logger.info("=" * 80)
except KeyboardInterrupt:
logger.warning("用户中断任务")
sys.exit(1)
except Exception as e:
logger.error(f"任务执行失败: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment