Created
December 6, 2025 15:16
-
-
Save takeru/b225d72185415edb3c3e876e9dad6ecd to your computer and use it in GitHub Desktop.
bedrock_server_upgrade.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # /// script | |
| # requires-python = ">=3.8" | |
| # dependencies = [ | |
| # "psutil>=5.9.0", | |
| # "pyyaml>=6.0", | |
| # "packaging>=21.0", | |
| # ] | |
| # /// | |
| """ | |
| Minecraft Bedrock Dedicated Server (BDS) Upgrade Script | |
| This script safely upgrades a Bedrock server by: | |
| 1. Checking that the server is stopped and has enough disk space | |
| 2. Validating the new server ZIP file | |
| 3. Creating a backup ZIP of the current server | |
| 4. Extracting the new server to a temporary directory | |
| 5. Copying world data and configuration files to the temporary directory | |
| 6. Updating configuration file | |
| 7. Swapping the servers (moving old to backup, new to current location) | |
| The upgrade process is designed to be safe: | |
| - The original server remains intact until the final swap | |
| - User confirmation is required before making changes | |
| - Automatic rollback on failure | |
| - Backup ZIP is always created | |
| Usage: | |
| uv run bedrock_server_upgrade.py <server_path> --bedrock-server-zip <zip_path> [options] | |
| Options: | |
| --dry-run Show what would be done without making any changes | |
| -y, --yes Automatically confirm upgrade without prompting | |
| Examples: | |
| # Dry run to see what would happen | |
| uv run bedrock_server_upgrade.py ./worldxyz --bedrock-server-zip bedrock-server-1.21.0.zip --dry-run | |
| # Actual upgrade with confirmation prompt | |
| uv run bedrock_server_upgrade.py ./worldxyz --bedrock-server-zip bedrock-server-1.21.0.zip | |
| # Automatic upgrade without confirmation | |
| uv run bedrock_server_upgrade.py ./worldxyz --bedrock-server-zip bedrock-server-1.21.0.zip -y | |
| """ | |
| import argparse | |
| import os | |
| import shutil | |
| import zipfile | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional | |
| import psutil | |
| import yaml | |
| from packaging import version | |
| CONFIG_FILE_NAME = "bedrock_server_upgrade_config.yaml" | |
| # Default items to copy during upgrade | |
| # NOTE: behavior_packs, resource_packs, and development packs are NOT included | |
| # by default because the new server version includes its own default packs. | |
| # If you have custom packs, add them to the 'copy' list in bedrock_server_upgrade_config.yaml | |
| DEFAULT_COPY_ITEMS = [ | |
| "worlds", | |
| "server.properties", | |
| "permissions.json", | |
| "allowlist.json", | |
| "whitelist.json", # Legacy whitelist (before allowlist) | |
| "config", # Custom server configuration | |
| "packetlimitconfig.json", # Packet limit settings | |
| ] | |
| class BedrockServerUpgrader: | |
| """Handles Bedrock Dedicated Server upgrades""" | |
| def __init__(self, server_path: Path, server_zip: Path, dry_run: bool = False, auto_confirm: bool = False): | |
| self.server_path = server_path | |
| self.server_zip = server_zip | |
| self.dry_run = dry_run | |
| self.auto_confirm = auto_confirm | |
| self.config: dict = {} | |
| # Pre-calculate paths for backup and temporary directories | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| server_dir_name = server_path.name | |
| server_parent_path = server_path.parent | |
| new_version = self.get_version_from_zip(server_zip) or "unknown" | |
| backup_base = server_parent_path / f"{server_dir_name}-backup" | |
| self.backup_dir = backup_base / f"{server_dir_name}-{timestamp}" | |
| self.backup_zip = backup_base / f"{server_dir_name}-{timestamp}.zip" | |
| self.tmp_server_path = server_parent_path / f"tmp-{server_dir_name}-{new_version}-{timestamp}" | |
| @staticmethod | |
| def check_disk_space(server_path: Path, required_multiplier: float = 2.5) -> None: | |
| """Check if there's enough disk space for the upgrade | |
| Args: | |
| server_path: Path to the server directory | |
| required_multiplier: Required free space as a multiplier of server size (default: 2.5x) | |
| This accounts for: original server + backup + new server | |
| """ | |
| print("Checking disk space...") | |
| # Calculate current server size | |
| server_size = 0 | |
| for root, dirs, files in os.walk(server_path, followlinks=False): | |
| for file in files: | |
| try: | |
| file_path = Path(root) / file | |
| server_size += file_path.stat().st_size | |
| except (OSError, FileNotFoundError): | |
| pass | |
| # Get available disk space | |
| stat = shutil.disk_usage(server_path.parent) | |
| available_space = stat.free | |
| required_space = int(server_size * required_multiplier) | |
| if available_space < required_space: | |
| server_size_gb = server_size / (1024**3) | |
| required_gb = required_space / (1024**3) | |
| available_gb = available_space / (1024**3) | |
| raise RuntimeError( | |
| f"Error: Insufficient disk space\n" | |
| f"Server size: {server_size_gb:.2f} GB\n" | |
| f"Required free space: {required_gb:.2f} GB\n" | |
| f"Available space: {available_gb:.2f} GB\n" | |
| f"Please free up at least {(required_gb - available_gb):.2f} GB" | |
| ) | |
| print(f"✓ Sufficient disk space available ({available_space / (1024**3):.2f} GB free)") | |
| @staticmethod | |
| def check_server_is_stopped(server_path: Path) -> None: | |
| """Check that the Bedrock server process is not running""" | |
| print("Checking if server process is stopped...") | |
| server_resolved = server_path.resolve() | |
| expected_exe = server_resolved / "bedrock_server" | |
| for proc in psutil.process_iter(['name']): | |
| try: | |
| if 'bedrock_server' in proc.info['name'].lower(): | |
| # Get the executable path | |
| proc_exe = Path(proc.exe()).resolve() | |
| # Check if the executable is the one in our server directory | |
| if proc_exe == expected_exe: | |
| raise RuntimeError( | |
| f"Error: Bedrock server is running (PID: {proc.pid})\n" | |
| f"Executable: {proc_exe}\n" | |
| "Please stop the server before running this script." | |
| ) | |
| except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): | |
| pass | |
| print("✓ Server is stopped") | |
| @staticmethod | |
| def get_version_from_zip(zip_path: Path) -> Optional[str]: | |
| """Extract version from ZIP file name""" | |
| name = zip_path.stem | |
| parts = name.split('-') | |
| return parts[-1] if len(parts) >= 3 else None | |
| @staticmethod | |
| def compare_versions(current_ver: Optional[str], new_ver: Optional[str]) -> None: | |
| """Compare versions and raise error if new version is not newer than current""" | |
| if not new_ver: | |
| raise ValueError("Could not extract version from ZIP file name") | |
| if not current_ver: | |
| print(f"No current version found. Proceeding with upgrade to version {new_ver}.") | |
| return | |
| try: | |
| current = version.parse(current_ver) | |
| new = version.parse(new_ver) | |
| if new < current: | |
| raise ValueError( | |
| f"Error: New version ({new_ver}) is older than current version ({current_ver}).\n" | |
| "Downgrade is not supported." | |
| ) | |
| elif new == current: | |
| raise ValueError( | |
| f"Error: New version ({new_ver}) is the same as current version ({current_ver}).\n" | |
| "Same version update is not needed." | |
| ) | |
| print(f"✓ Version check: {current_ver} -> {new_ver}") | |
| except version.InvalidVersion as e: | |
| print(f"Warning: Version comparison failed: {e}") | |
| print(f" Current: {current_ver}, New: {new_ver}") | |
| print(" Skipping version check and proceeding.") | |
| def load_config(self) -> None: | |
| """Load or create configuration file""" | |
| config_path = self.server_path / CONFIG_FILE_NAME | |
| if config_path.exists(): | |
| print(f"Loading configuration file: {config_path}") | |
| with open(config_path, 'r', encoding='utf-8') as f: | |
| self.config = yaml.safe_load(f) or {} | |
| else: | |
| print("Configuration file does not exist, creating new one") | |
| self.config = {'copy': []} | |
| def create_backup_zip(self) -> None: | |
| """Create a backup ZIP of the current server directory (without moving it)""" | |
| print(f"Creating backup of server directory: {self.server_path}") | |
| if self.dry_run: | |
| print(f"[DRY RUN] Would create backup directory: {self.backup_dir.parent}") | |
| print(f"[DRY RUN] Would create backup ZIP: {self.backup_zip}") | |
| return | |
| self.backup_dir.parent.mkdir(exist_ok=True) | |
| print(f"Creating backup ZIP: {self.backup_zip}") | |
| try: | |
| # Create ZIP file | |
| with zipfile.ZipFile(self.backup_zip, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for root, dirs, files in os.walk(self.server_path, followlinks=False): | |
| for file in files: | |
| file_path = Path(root) / file | |
| arcname = file_path.relative_to(self.server_path.parent) | |
| zipf.write(file_path, arcname) | |
| # Verify the backup ZIP file integrity | |
| print("Verifying backup ZIP integrity...") | |
| with zipfile.ZipFile(self.backup_zip, 'r') as zipf: | |
| bad_file = zipf.testzip() | |
| if bad_file: | |
| raise RuntimeError(f"Backup ZIP verification failed: corrupted file {bad_file}") | |
| print(f"✓ Backup ZIP created: {self.backup_zip}") | |
| except Exception as e: | |
| # If backup ZIP creation fails, clean up | |
| if self.backup_zip.exists(): | |
| print(f"Removing corrupted backup ZIP: {self.backup_zip}") | |
| self.backup_zip.unlink() | |
| raise RuntimeError(f"Backup creation failed: {e}") from e | |
| def validate_server_zip(self) -> None: | |
| """Validate the server ZIP file integrity""" | |
| print("Validating server ZIP file...") | |
| if self.dry_run: | |
| print(f"[DRY RUN] Would validate ZIP file: {self.server_zip}") | |
| return | |
| try: | |
| with zipfile.ZipFile(self.server_zip, 'r') as zipf: | |
| bad_file = zipf.testzip() | |
| if bad_file: | |
| raise RuntimeError(f"Server ZIP file is corrupted: {bad_file}") | |
| print(f"✓ Server ZIP file validated: {self.server_zip}") | |
| except zipfile.BadZipFile as e: | |
| raise RuntimeError(f"Invalid ZIP file: {e}") from e | |
| def extract_to_temp(self) -> None: | |
| """Extract new Bedrock server to temporary directory""" | |
| print(f"Extracting new server to temporary directory: {self.tmp_server_path}") | |
| if self.dry_run: | |
| print(f"[DRY RUN] Would create directory: {self.tmp_server_path}") | |
| print(f"[DRY RUN] Would extract ZIP file to: {self.tmp_server_path}") | |
| return | |
| try: | |
| # Create directory and extract | |
| self.tmp_server_path.mkdir(parents=True, exist_ok=True) | |
| print("Extracting files...") | |
| with zipfile.ZipFile(self.server_zip, 'r') as zipf: | |
| zipf.extractall(self.tmp_server_path) | |
| # Verify bedrock_server executable exists | |
| bedrock_server_exe = self.tmp_server_path / "bedrock_server" | |
| if not bedrock_server_exe.exists(): | |
| raise RuntimeError( | |
| f"bedrock_server executable not found in extracted files.\n" | |
| f"The ZIP file may not be a valid Bedrock server package." | |
| ) | |
| # Set executable permissions on bedrock_server | |
| print("Setting executable permissions on bedrock_server...") | |
| bedrock_server_exe.chmod(0o755) | |
| print(f"✓ Server extracted to: {self.tmp_server_path}") | |
| except Exception as e: | |
| # Clean up on failure | |
| if self.tmp_server_path.exists(): | |
| print(f"Cleaning up temporary directory: {self.tmp_server_path}") | |
| shutil.rmtree(self.tmp_server_path) | |
| raise RuntimeError(f"Server extraction failed: {e}") from e | |
| def copy_files_to_temp(self) -> None: | |
| """Copy files and directories from current server to temporary directory""" | |
| print("Copying files from current server to temporary directory...") | |
| additional_items = self.config.get('copy', []) | |
| all_items = DEFAULT_COPY_ITEMS + additional_items | |
| errors = [] | |
| for item_name in all_items: | |
| source_path = self.server_path / item_name | |
| dest_path = self.tmp_server_path / item_name | |
| if not source_path.exists(): | |
| print(f" Skipping (does not exist): {item_name}") | |
| continue | |
| if self.dry_run: | |
| item_type = "directory" if source_path.is_dir() else "file" | |
| print(f" [DRY RUN] Would copy {item_type}: {item_name}") | |
| continue | |
| try: | |
| if source_path.is_dir(): | |
| if dest_path.exists(): | |
| shutil.rmtree(dest_path) | |
| shutil.copytree(source_path, dest_path) | |
| print(f" ✓ Copied directory: {item_name}") | |
| else: | |
| shutil.copy2(source_path, dest_path) | |
| print(f" ✓ Copied file: {item_name}") | |
| except Exception as e: | |
| error_msg = f"Failed to copy {item_name}: {e}" | |
| print(f" ✗ {error_msg}") | |
| errors.append(error_msg) | |
| if errors: | |
| raise RuntimeError( | |
| f"File copying completed with {len(errors)} error(s):\n" + | |
| "\n".join(f" - {err}" for err in errors) | |
| ) | |
| print("✓ File copying completed") | |
| def swap_servers(self) -> None: | |
| """Swap current server with new server (atomic operation)""" | |
| print("Swapping servers...") | |
| if self.dry_run: | |
| print(f"[DRY RUN] Would move {self.server_path} to {self.backup_dir}") | |
| print(f"[DRY RUN] Would move {self.tmp_server_path} to {self.server_path}") | |
| return | |
| try: | |
| # Move current server to backup location | |
| print(f"Moving current server to backup: {self.backup_dir}") | |
| shutil.move(str(self.server_path), str(self.backup_dir)) | |
| # Move new server to current location | |
| print(f"Moving new server to: {self.server_path}") | |
| shutil.move(str(self.tmp_server_path), str(self.server_path)) | |
| print("✓ Server swap completed") | |
| except Exception as e: | |
| # Try to restore if move failed | |
| print(f"Error during server swap: {e}") | |
| if self.backup_dir.exists() and not self.server_path.exists(): | |
| print(f"Attempting to restore original server...") | |
| try: | |
| shutil.move(str(self.backup_dir), str(self.server_path)) | |
| print("✓ Original server restored") | |
| except Exception as restore_error: | |
| print(f"✗ Failed to restore: {restore_error}") | |
| print(f"Manual recovery needed:") | |
| print(f" Original server: {self.backup_dir}") | |
| print(f" New server: {self.tmp_server_path}") | |
| raise RuntimeError(f"Server swap failed: {e}") from e | |
| def confirm_upgrade(self) -> bool: | |
| """Ask user to confirm the upgrade""" | |
| if self.auto_confirm: | |
| print("Auto-confirm enabled, proceeding with upgrade...") | |
| return True | |
| if self.dry_run: | |
| print("[DRY RUN] Would ask for user confirmation") | |
| return True | |
| print("\n" + "=" * 60) | |
| print("Ready to proceed with upgrade") | |
| print("=" * 60) | |
| current_version = self.config.get('current', {}).get('bedrock_server', {}).get('version', 'unknown') | |
| new_version = self.get_version_from_zip(self.server_zip) or 'unknown' | |
| print(f"Current version: {current_version}") | |
| print(f"New version: {new_version}") | |
| print(f"Server path: {self.server_path}") | |
| print(f"Backup ZIP: {self.backup_zip}") | |
| print("=" * 60) | |
| while True: | |
| response = input("Proceed with upgrade? [y/N]: ").strip().lower() | |
| if response in ['y', 'yes']: | |
| return True | |
| elif response in ['n', 'no', '']: | |
| print("Upgrade cancelled by user") | |
| return False | |
| else: | |
| print("Please enter 'y' or 'n'") | |
| def update_config(self) -> None: | |
| """Update configuration file in the temporary directory (before swap)""" | |
| print("Updating configuration file...") | |
| timestamp = datetime.now().isoformat() | |
| new_version = self.get_version_from_zip(self.server_zip) | |
| if 'current' in self.config: | |
| self.config['previous'] = self.config['current'].copy() | |
| self.config['current'] = { | |
| 'timestamp': timestamp, | |
| 'bedrock_server': { | |
| 'version': new_version, | |
| 'zip': self.server_zip.name, | |
| }, | |
| 'backup': { | |
| 'zip': str(self.backup_zip), | |
| 'dir': str(self.backup_dir), | |
| } | |
| } | |
| config_path = self.tmp_server_path / CONFIG_FILE_NAME | |
| if self.dry_run: | |
| print(f"[DRY RUN] Would update configuration file: {config_path}") | |
| print(f"[DRY RUN] New config content:") | |
| print(yaml.dump(self.config, allow_unicode=True, default_flow_style=False, sort_keys=False)) | |
| return | |
| with open(config_path, 'w', encoding='utf-8') as f: | |
| yaml.dump(self.config, f, allow_unicode=True, default_flow_style=False, sort_keys=False) | |
| print(f"✓ Configuration file updated: {config_path}") | |
| def run(self) -> None: | |
| """Execute the upgrade process with the new safe workflow""" | |
| print("=" * 60) | |
| print("Minecraft BDS Upgrade Script") | |
| if self.dry_run: | |
| print("[DRY RUN MODE - No changes will be made]") | |
| print("=" * 60) | |
| print(f"Server path: {self.server_path}") | |
| print(f"Server ZIP: {self.server_zip}") | |
| print("=" * 60) | |
| try: | |
| # 1. Check that server process is not running | |
| if not self.dry_run: | |
| self.check_server_is_stopped(self.server_path) | |
| else: | |
| print("[DRY RUN] Skipping server process check") | |
| # 2. Check disk space | |
| if not self.dry_run: | |
| self.check_disk_space(self.server_path) | |
| else: | |
| print("[DRY RUN] Skipping disk space check") | |
| # 3. Validate server ZIP file | |
| self.validate_server_zip() | |
| # 4. Load configuration file | |
| self.load_config() | |
| # 5. Check versions | |
| current_version = self.config.get('current', {}).get('bedrock_server', {}).get('version') | |
| new_version = self.get_version_from_zip(self.server_zip) | |
| self.compare_versions(current_version, new_version) | |
| # 6. Create backup ZIP (non-destructive) | |
| self.create_backup_zip() | |
| # 7. Ask for user confirmation | |
| if not self.confirm_upgrade(): | |
| print("Upgrade aborted") | |
| return | |
| # 8. Extract new server to temporary directory | |
| self.extract_to_temp() | |
| # 9. Copy files from current server to temporary directory | |
| self.copy_files_to_temp() | |
| # 10. Update configuration file (in temporary directory, before swap) | |
| self.update_config() | |
| # 11. Swap servers (atomic operation) | |
| self.swap_servers() | |
| print("\n" + "=" * 60) | |
| if self.dry_run: | |
| print("✓ Dry run completed! No changes were made.") | |
| else: | |
| print("✓ Upgrade completed successfully!") | |
| print("=" * 60) | |
| print(f"New server: {self.server_path}") | |
| print(f"Old server backup: {self.backup_dir}") | |
| print(f"Backup ZIP: {self.backup_zip}") | |
| if not self.dry_run: | |
| print("\nNext steps:") | |
| print(f" 1. Test the server: cd {self.server_path}") | |
| print(f" 2. Start the server and verify it works correctly") | |
| print(f" 3. If working correctly, you can delete the backup directory:") | |
| print(f" rm -rf {self.backup_dir}") | |
| print("=" * 60) | |
| except Exception as e: | |
| print("\n" + "=" * 60) | |
| print("✗ Upgrade failed!") | |
| print("=" * 60) | |
| print(f"Error: {e}") | |
| # Cleanup temporary directory if it exists | |
| if self.tmp_server_path.exists(): | |
| print(f"\nCleaning up temporary directory: {self.tmp_server_path}") | |
| try: | |
| shutil.rmtree(self.tmp_server_path) | |
| print("✓ Temporary directory removed") | |
| except Exception as cleanup_error: | |
| print(f"✗ Failed to cleanup: {cleanup_error}") | |
| print("\nRecovery information:") | |
| if self.backup_zip.exists(): | |
| print(f" Backup ZIP: {self.backup_zip}") | |
| if self.backup_dir.exists(): | |
| print(f" Backup directory: {self.backup_dir}") | |
| print("=" * 60) | |
| raise | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='Minecraft Bedrock Dedicated Server (BDS) Upgrade Script' | |
| ) | |
| parser.add_argument( | |
| 'server_path', | |
| type=Path, | |
| help='Path to the server directory containing the world' | |
| ) | |
| parser.add_argument( | |
| '--bedrock-server-zip', | |
| type=Path, | |
| required=True, | |
| help='Path to the new Bedrock server ZIP file' | |
| ) | |
| parser.add_argument( | |
| '--dry-run', | |
| action='store_true', | |
| help='Show what would be done without making any changes' | |
| ) | |
| parser.add_argument( | |
| '-y', '--yes', | |
| action='store_true', | |
| dest='auto_confirm', | |
| help='Automatically confirm upgrade without prompting' | |
| ) | |
| args = parser.parse_args() | |
| # Validate paths | |
| server_path = args.server_path.resolve() | |
| server_zip = args.bedrock_server_zip.resolve() | |
| if not server_path.exists(): | |
| print(f"Error: Server path does not exist: {server_path}") | |
| return 1 | |
| if not server_zip.exists(): | |
| print(f"Error: Server ZIP file does not exist: {server_zip}") | |
| return 1 | |
| try: | |
| upgrader = BedrockServerUpgrader(server_path, server_zip, args.dry_run, args.auto_confirm) | |
| upgrader.run() | |
| return 0 | |
| except Exception as e: | |
| print(f"\nAn error occurred: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return 1 | |
| if __name__ == "__main__": | |
| exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment