|
#!/usr/bin/env python3 |
|
""" |
|
Seed VS Code Remote-SSH server layout under ~/.vscode-server for a given |
|
VS Code version (e.g. 1.107.1) and commit (e.g. 994fd12f...). |
|
|
|
It downloads: |
|
1) CLI tarball: https://update.code.visualstudio.com/<version>/cli-linux-x64/stable |
|
-> contains an executable named "code" |
|
-> installed as ~/.vscode-server/code-<commit> |
|
|
|
2) Server tarball: https://update.code.visualstudio.com/commit:<commit>/server-linux-x64/stable |
|
-> extracts to a directory named "vscode-server-linux-x64" |
|
-> installed as ~/.vscode-server/cli/servers/Stable-<commit>/server/ |
|
|
|
Usage: |
|
python3 install_vscode_server_seed.py --version 1.107.1 --commit 994f... |
|
|
|
Optional: |
|
--force overwrite existing installations |
|
--base-dir override ~/.vscode-server |
|
--cli-platform cli-linux-x64 | cli-alpine-x64 (default: cli-linux-x64) |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import argparse |
|
import hashlib |
|
import os |
|
import shutil |
|
import stat |
|
import sys |
|
import tarfile |
|
import tempfile |
|
import time |
|
import urllib.request |
|
from pathlib import Path |
|
|
|
|
|
def eprint(*args, **kwargs): |
|
print(*args, file=sys.stderr, **kwargs) |
|
|
|
|
|
def human_bytes(n: float) -> str: |
|
units = ["B", "KiB", "MiB", "GiB", "TiB"] |
|
n = float(n) |
|
for u in units: |
|
if n < 1024.0 or u == units[-1]: |
|
return f"{n:,.2f} {u}" |
|
n /= 1024.0 |
|
return f"{n:,.2f} B" |
|
|
|
|
|
def human_rate(bytes_per_sec: float) -> str: |
|
return f"{human_bytes(bytes_per_sec)}/s" |
|
|
|
|
|
def human_time(seconds: float) -> str: |
|
if seconds < 0 or seconds == float("inf"): |
|
return "?" |
|
seconds = int(seconds) |
|
h, rem = divmod(seconds, 3600) |
|
m, s = divmod(rem, 60) |
|
if h: |
|
return f"{h}h{m:02d}m{s:02d}s" |
|
if m: |
|
return f"{m}m{s:02d}s" |
|
return f"{s}s" |
|
|
|
|
|
def sha256_file(path: Path) -> str: |
|
h = hashlib.sha256() |
|
with path.open("rb") as f: |
|
for chunk in iter(lambda: f.read(1024 * 1024), b""): |
|
h.update(chunk) |
|
return h.hexdigest() |
|
|
|
|
|
def download_url_with_progress( |
|
url: str, |
|
dest: Path, |
|
*, |
|
label: str = "download", |
|
chunk_size: int = 1024 * 1024, |
|
update_interval_s: float = 0.2, |
|
) -> None: |
|
""" |
|
Download URL -> dest, printing progress + speed. |
|
|
|
If Content-Length is known: |
|
shows percent + ETA + avg speed + current speed. |
|
Otherwise: |
|
shows downloaded bytes + avg/current speed. |
|
""" |
|
dest.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
req = urllib.request.Request(url, headers={"User-Agent": "vscode-server-seed/1.1"}) |
|
start = time.monotonic() |
|
last_update = 0.0 |
|
downloaded = 0 |
|
|
|
# For "current speed" smoothing |
|
window_bytes = 0 |
|
window_start = start |
|
|
|
def render_line(total: int | None, force: bool = False) -> None: |
|
nonlocal last_update, window_bytes, window_start |
|
now = time.monotonic() |
|
if not force and (now - last_update) < update_interval_s: |
|
return |
|
|
|
elapsed = max(now - start, 1e-6) |
|
avg_bps = downloaded / elapsed |
|
|
|
window_elapsed = max(now - window_start, 1e-6) |
|
cur_bps = window_bytes / window_elapsed |
|
|
|
if total and total > 0: |
|
pct = (downloaded / total) * 100.0 |
|
remaining = max(total - downloaded, 0) |
|
eta = remaining / max(avg_bps, 1e-6) |
|
line = ( |
|
f"{label}: {pct:6.2f}% " |
|
f"{human_bytes(downloaded)} / {human_bytes(total)} " |
|
f"avg {human_rate(avg_bps)} cur {human_rate(cur_bps)} " |
|
f"ETA {human_time(eta)}" |
|
) |
|
else: |
|
line = ( |
|
f"{label}: {human_bytes(downloaded)} " |
|
f"avg {human_rate(avg_bps)} cur {human_rate(cur_bps)}" |
|
) |
|
|
|
# Overwrite same line (stderr) |
|
# Pad to reduce leftover characters from previous longer line. |
|
pad = " " * 10 |
|
sys.stderr.write("\r" + line + pad) |
|
sys.stderr.flush() |
|
|
|
last_update = now |
|
# Reset "current speed" window periodically for responsiveness |
|
window_bytes = 0 |
|
window_start = now |
|
|
|
with urllib.request.urlopen(req) as resp: |
|
total = None |
|
try: |
|
cl = resp.headers.get("Content-Length") |
|
if cl is not None: |
|
total = int(cl) |
|
except Exception: |
|
total = None |
|
|
|
with dest.open("wb") as f: |
|
while True: |
|
chunk = resp.read(chunk_size) |
|
if not chunk: |
|
break |
|
f.write(chunk) |
|
downloaded += len(chunk) |
|
window_bytes += len(chunk) |
|
render_line(total) |
|
|
|
render_line(total, force=True) |
|
sys.stderr.write("\n") |
|
sys.stderr.flush() |
|
|
|
|
|
def _is_within_directory(base: Path, target: Path) -> bool: |
|
base_resolved = base.resolve() |
|
# If target doesn't exist yet, resolve its parent path safely |
|
try: |
|
target_resolved = target.resolve() |
|
except FileNotFoundError: |
|
target_resolved = (target.parent.resolve() / target.name) |
|
return ( |
|
str(target_resolved).startswith(str(base_resolved) + os.sep) |
|
or target_resolved == base_resolved |
|
) |
|
|
|
|
|
def safe_extract_tar_gz(tar_path: Path, extract_to: Path) -> None: |
|
""" |
|
Extract a .tar.gz safely (prevents path traversal). |
|
""" |
|
extract_to.mkdir(parents=True, exist_ok=True) |
|
with tarfile.open(tar_path, mode="r:gz") as tf: |
|
for member in tf.getmembers(): |
|
member_path = extract_to / member.name |
|
if not _is_within_directory(extract_to, member_path): |
|
raise RuntimeError(f"Refusing to extract outside target dir: {member.name}") |
|
tf.extractall(path=extract_to) |
|
|
|
|
|
def atomic_replace(src: Path, dst: Path) -> None: |
|
""" |
|
Atomically replace dst with src where possible. |
|
""" |
|
dst.parent.mkdir(parents=True, exist_ok=True) |
|
tmp_dst = dst.with_name(dst.name + ".tmp") |
|
if tmp_dst.exists(): |
|
if tmp_dst.is_dir(): |
|
shutil.rmtree(tmp_dst) |
|
else: |
|
tmp_dst.unlink() |
|
src.rename(tmp_dst) |
|
tmp_dst.replace(dst) |
|
|
|
|
|
def rm_rf(path: Path) -> None: |
|
if not path.exists(): |
|
return |
|
if path.is_symlink() or path.is_file(): |
|
path.unlink() |
|
else: |
|
shutil.rmtree(path) |
|
|
|
|
|
def chmod_x(path: Path) -> None: |
|
st = path.stat() |
|
path.chmod(st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) |
|
|
|
|
|
def ensure_minimal_cli_servers_state(cli_servers_dir: Path) -> None: |
|
""" |
|
Not strictly required, but matches what you showed: ~/.vscode-server/cli/servers/lru.json exists. |
|
Create if missing (minimal JSON). |
|
""" |
|
cli_servers_dir.mkdir(parents=True, exist_ok=True) |
|
lru = cli_servers_dir / "lru.json" |
|
if not lru.exists(): |
|
lru.write_text('{\n "entries": []\n}\n', encoding="utf-8") |
|
|
|
|
|
def find_extracted_code_binary(extract_dir: Path) -> Path: |
|
""" |
|
CLI tarball extracts to a file named 'code' at top-level (per your observation). |
|
Be robust and search for a regular file named 'code'. |
|
""" |
|
direct = extract_dir / "code" |
|
if direct.is_file(): |
|
return direct |
|
|
|
for p in extract_dir.rglob("code"): |
|
if p.is_file() and not p.is_symlink(): |
|
return p |
|
|
|
raise FileNotFoundError("Could not find extracted 'code' binary inside CLI tarball extraction.") |
|
|
|
|
|
def find_extracted_server_dir(extract_dir: Path) -> Path: |
|
""" |
|
Server tarball extracts to directory named 'vscode-server-linux-x64' (per your observation). |
|
Be robust and fall back if archive layout differs. |
|
""" |
|
expected = extract_dir / "vscode-server-linux-x64" |
|
if expected.is_dir(): |
|
return expected |
|
|
|
top_dirs = [p for p in extract_dir.iterdir() if p.is_dir()] |
|
if len(top_dirs) == 1: |
|
return top_dirs[0] |
|
|
|
for p in extract_dir.rglob("vscode-server-linux-x64"): |
|
if p.is_dir(): |
|
return p |
|
|
|
raise FileNotFoundError( |
|
"Could not find extracted 'vscode-server-linux-x64' directory inside server tarball extraction." |
|
) |
|
|
|
|
|
def main() -> int: |
|
ap = argparse.ArgumentParser() |
|
ap.add_argument("--version", required=True, help="VS Code version number, e.g. 1.107.1") |
|
ap.add_argument("--commit", required=True, help="VS Code commit hash, e.g. 994fd12f...") |
|
ap.add_argument( |
|
"--base-dir", |
|
default=str(Path.home() / ".vscode-server"), |
|
help="Base install dir (default: ~/.vscode-server)", |
|
) |
|
ap.add_argument("--force", action="store_true", help="Overwrite existing installed files") |
|
ap.add_argument( |
|
"--cli-platform", |
|
default="cli-linux-x64", |
|
choices=["cli-linux-x64", "cli-alpine-x64"], |
|
help="CLI platform segment for update URL (default: cli-linux-x64)", |
|
) |
|
args = ap.parse_args() |
|
|
|
version = args.version.strip() |
|
commit = args.commit.strip() |
|
base_dir = Path(args.base_dir).expanduser() |
|
|
|
if not version or not commit: |
|
eprint("Error: --version and --commit must be non-empty.") |
|
return 2 |
|
|
|
# Target paths matching your working layout |
|
cli_servers_dir = base_dir / "cli" / "servers" |
|
stable_dir = cli_servers_dir / f"Stable-{commit}" |
|
server_target_dir = stable_dir / "server" |
|
code_target = base_dir / f"code-{commit}" |
|
|
|
if code_target.exists() and not args.force: |
|
eprint(f"Refusing to overwrite existing: {code_target} (use --force)") |
|
return 3 |
|
if server_target_dir.exists() and not args.force: |
|
eprint(f"Refusing to overwrite existing: {server_target_dir} (use --force)") |
|
return 3 |
|
|
|
ensure_minimal_cli_servers_state(cli_servers_dir) |
|
|
|
cli_url = f"https://update.code.visualstudio.com/{version}/{args.cli_platform}/stable" |
|
srv_url = f"https://update.code.visualstudio.com/commit:{commit}/server-linux-x64/stable" |
|
|
|
eprint("Planned install:") |
|
eprint(f" CLI URL: {cli_url}") |
|
eprint(f" Server URL: {srv_url}") |
|
eprint(f" CLI -> {code_target}") |
|
eprint(f" Server -> {server_target_dir}") |
|
|
|
# Work in a temp dir under base_dir for easier atomic moves across same filesystem |
|
base_dir.mkdir(parents=True, exist_ok=True) |
|
with tempfile.TemporaryDirectory(prefix="vscode-seed-", dir=str(base_dir)) as td: |
|
td_path = Path(td) |
|
|
|
cli_tgz = td_path / f"vscode-cli-{version}-{args.cli_platform}.tar.gz" |
|
srv_tgz = td_path / f"vscode-server-{commit}-linux-x64.tar.gz" |
|
|
|
eprint("\nDownloading CLI tarball...") |
|
download_url_with_progress(cli_url, cli_tgz, label="CLI tarball") |
|
eprint(f" saved: {cli_tgz} (sha256={sha256_file(cli_tgz)})") |
|
|
|
eprint("Downloading Server tarball...") |
|
download_url_with_progress(srv_url, srv_tgz, label="Server tarball") |
|
eprint(f" saved: {srv_tgz} (sha256={sha256_file(srv_tgz)})") |
|
|
|
# Extract CLI |
|
cli_extract = td_path / "cli_extract" |
|
eprint("\nExtracting CLI tarball...") |
|
safe_extract_tar_gz(cli_tgz, cli_extract) |
|
extracted_code = find_extracted_code_binary(cli_extract) |
|
|
|
# Stage CLI binary |
|
staged_code = td_path / "code-staged" |
|
shutil.copy2(extracted_code, staged_code) |
|
chmod_x(staged_code) |
|
|
|
# Extract Server |
|
srv_extract = td_path / "srv_extract" |
|
eprint("Extracting Server tarball...") |
|
safe_extract_tar_gz(srv_tgz, srv_extract) |
|
extracted_server_dir = find_extracted_server_dir(srv_extract) |
|
|
|
# Stage server dir named exactly "server" |
|
staged_server = td_path / "server-staged" |
|
if staged_server.exists(): |
|
rm_rf(staged_server) |
|
shutil.copytree(extracted_server_dir, staged_server, symlinks=True) |
|
|
|
# Install (overwrite if --force) |
|
eprint("\nInstalling...") |
|
|
|
if args.force: |
|
rm_rf(code_target) |
|
atomic_replace(staged_code, code_target) |
|
|
|
if args.force: |
|
rm_rf(server_target_dir) |
|
stable_dir.mkdir(parents=True, exist_ok=True) |
|
atomic_replace(staged_server, server_target_dir) |
|
|
|
# Optional: create empty log/pid files (Remote-SSH will manage these anyway) |
|
(stable_dir / "log.txt").touch(exist_ok=True) |
|
(stable_dir / "pid.txt").touch(exist_ok=True) |
|
|
|
eprint("\nDone.") |
|
eprint("Quick checks you can run on the remote host:") |
|
eprint(f" {code_target} --version") |
|
eprint(f" {server_target_dir}/bin/code-server --version") |
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
raise SystemExit(main()) |