Last active
January 1, 2022 20:39
-
-
Save Ichunjo/0de58e574672cbe18d63a2c90dce0c97 to your computer and use it in GitHub Desktop.
AssWipeMT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Set | |
| from pyonfx import Ass, Convert, Line | |
| def compose_ass_line(line: Line) -> str: | |
| return ( | |
| "Comment: " if line.comment else "Dialogue: " | |
| + str(line.layer) + ',' | |
| + str(Convert.time(max(0, int(line.start_time)))) + ',' | |
| + str(Convert.time(max(0, int(line.end_time)))) + ',' | |
| + line.style.name + ',' + line.actor + ',' | |
| + str(line.margin_l) + ',' + str(line.margin_r) + ',' + str(line.margin_v) + ',' | |
| + line.effect + ',' + (line.text if hasattr(line, 'text') else line.raw_text) | |
| + '\n' | |
| ) | |
| class SubProcessAsync: | |
| index = 0 | |
| sem: asyncio.Semaphore | |
| fails: List[str] | |
| def __init__(self, cmds: List[str], /, *, nb_cpus: Optional[int] = os.cpu_count()) -> None: | |
| if nb_cpus: | |
| self.sem = asyncio.Semaphore(nb_cpus) | |
| else: | |
| raise ValueError | |
| loop = asyncio.get_event_loop() | |
| try: | |
| loop.run_until_complete(self._processing(cmds)) | |
| finally: | |
| loop.run_until_complete(loop.shutdown_asyncgens()) | |
| loop.close() | |
| async def _processing(self, all_cmds: List[str]) -> None: | |
| gathered = asyncio.gather( | |
| *[asyncio.create_task(self._safe_processing(cmd)) for cmd in all_cmds], | |
| ) | |
| await gathered | |
| async def _safe_processing(self, cmd: str) -> None: | |
| async with self.sem: | |
| return await self._run_cmd(cmd) | |
| @staticmethod | |
| async def _run_cmd(cmd: str) -> None: | |
| proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.DEVNULL) | |
| await proc.communicate() | |
| if proc.returncode: | |
| while proc.returncode == 2: | |
| proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.DEVNULL) | |
| await proc.communicate() | |
| # _TAGSORTORDER = """ | |
| # \\an, \\pos, \\move, \\org, \\fscx, \\fscy, \\frz, \\fry, \\frx, \\fax, \\fay, \\fn, \\fs, \\fsp, \\b, | |
| # \\i, \\u, \\s, \\bord, \\xbord, \\ybord, \\shad, \\xshad, \\yshad, \\1c, \\2c, \\3c, \\4c, \\alpha, \\1a, \\2a, \\3a, \\4a, | |
| # \\blur, \\be, \\fad, \\fade, clip_rect, iclip_rect, clip_vect, iclip_vect, \\q, \\p, \\k, \\kf, \\K, \\ko, junk, unknown | |
| # """ | |
| AW_CONF: Dict[str, Any] = { | |
| 'button': 0, | |
| 'values': { | |
| 'removeInvisible': True, | |
| 'combineLines': True, | |
| 'mergeConsecutive': True, | |
| # 'mergeConsecutiveExcept': '\\kf, \\k, \\ko', | |
| 'cleanLevel': 4, | |
| 'tagsToKeep': '\\pos', | |
| 'stripComments': True, | |
| 'removeJunk': True, | |
| # 'tagSortOrder': _TAGSORTORDER.replace('\\r\\n', ''), | |
| 'filterClips': True, | |
| 'scale2float': True, | |
| 'fixDrawings': True, | |
| 'purgeContoursDraw': True, | |
| 'purgeContoursClip': True, | |
| 'purgeContoursIgnoreHashMismatch': False, | |
| 'extraDataMode': 'Remove all' | |
| } | |
| } | |
| class AssWipeMT: | |
| input_file: Path | |
| io: Ass | |
| work_dir_trims: Path = Path('_tmp_asswipemt_trims') | |
| work_dir_clean: Path = Path('_tmp_asswipemt_clean') | |
| tmp_files: Set[Path] | |
| tmp_files_out: Set[Path] | |
| def __init__(self) -> None: | |
| self.input_file = Path(sys.argv[1]) | |
| self.io = Ass(str(self.input_file), '', comment_original=False, extended=False) | |
| # Work dir creation | |
| if self.work_dir_trims.exists(): | |
| shutil.rmtree(self.work_dir_trims, True) | |
| if self.work_dir_clean.exists(): | |
| shutil.rmtree(self.work_dir_clean, True) | |
| self.work_dir_trims.mkdir() | |
| self.work_dir_clean.mkdir() | |
| self.split_file() | |
| SubProcessAsync(self.make_aw_cmds(), nb_cpus=12) | |
| self.merge_file() | |
| def split_file(self) -> None: | |
| file_num = 0 | |
| wlines = [ | |
| self.io.lines[i:min(i+100, len(self.io.lines) - 1)] | |
| for i in range(0, len(self.io.lines), 100) | |
| ] | |
| self.tmp_files = set() | |
| for wline in wlines: | |
| path = Path(self.input_file).with_stem(self.input_file.name + f'_tmp_{file_num}') | |
| self.tmp_files.add(path) | |
| with (self.work_dir_trims / path).open('w', encoding="utf-8-sig") as file: | |
| file.writelines( | |
| self.io._output | |
| + ['\n'] | |
| + [compose_ass_line(line) for line in wline if not line.comment] | |
| ) | |
| file_num += 1 | |
| def make_aw_cmds(self) -> List[str]: | |
| cmds: List[List[str]] = [] | |
| self.tmp_files_out = set() | |
| for f in self.tmp_files: | |
| output = self.work_dir_clean / f.with_stem(f.stem + '_out') | |
| self.tmp_files_out.add(output) | |
| cmds.append(self._make_aw_cmd(Path(self.io.meta.video), self.work_dir_trims / f, output)) | |
| return [' '.join(cmd) for cmd in cmds] | |
| def merge_file(self) -> None: | |
| ios = [Ass(str(f), '', comment_original=False, extended=False) for f in sorted(self.tmp_files_out)] | |
| flines = [line for slines in [io.lines for io in ios] for line in slines] | |
| tmp = self.input_file.with_stem(self.input_file.stem + '_tmp') | |
| with tmp.open('w', encoding="utf-8-sig") as file: | |
| file.writelines( | |
| self.io._output | |
| + ['\n'] | |
| + [compose_ass_line(line) for line in flines if not line.comment] | |
| ) | |
| subprocess.run( | |
| ' '.join(self._make_aw_cmd( | |
| Path(self.io.meta.video), tmp, self.input_file.with_stem(self.input_file.stem + '_out'), | |
| ['--selected-lines', ','.join([f'{i-1},{i},{i+1}' for i in range(0, len(flines), 100) if i != 0])] | |
| ) | |
| ), check=True, text=True, encoding='utf-8' | |
| ) | |
| @staticmethod | |
| def _make_aw_cmd(video: Path, input_p: Path, output_p: Path, acli_args: Optional[List[str]] = None) -> List[str]: | |
| if not acli_args: | |
| acli_args = [] | |
| return [ | |
| 'aegisub-cli', | |
| *acli_args, | |
| '--dialog', | |
| '"' + json.dumps(AW_CONF).replace('"', '\\"') + '"', | |
| '--video', '"' + str(video) + '"', | |
| # '--video', '?dummy:23.976024:400000:1920:1080:47:163:254:c', | |
| '--automation', 'l0.ASSWipe.moon', | |
| '"' + str(input_p) + '"', | |
| '"' + str(output_p) + '"', | |
| 'ASSWipe' | |
| ] | |
| if __name__ == '__main__': | |
| AssWipeMT() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment