Skip to content

Instantly share code, notes, and snippets.

@Ichunjo
Last active January 1, 2022 20:39
Show Gist options
  • Select an option

  • Save Ichunjo/0de58e574672cbe18d63a2c90dce0c97 to your computer and use it in GitHub Desktop.

Select an option

Save Ichunjo/0de58e574672cbe18d63a2c90dce0c97 to your computer and use it in GitHub Desktop.
AssWipeMT
import asyncio
import json
import os
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from pyonfx import Ass, Convert, Line
def compose_ass_line(line: Line) -> str:
return (
"Comment: " if line.comment else "Dialogue: "
+ str(line.layer) + ','
+ str(Convert.time(max(0, int(line.start_time)))) + ','
+ str(Convert.time(max(0, int(line.end_time)))) + ','
+ line.style.name + ',' + line.actor + ','
+ str(line.margin_l) + ',' + str(line.margin_r) + ',' + str(line.margin_v) + ','
+ line.effect + ',' + (line.text if hasattr(line, 'text') else line.raw_text)
+ '\n'
)
class SubProcessAsync:
index = 0
sem: asyncio.Semaphore
fails: List[str]
def __init__(self, cmds: List[str], /, *, nb_cpus: Optional[int] = os.cpu_count()) -> None:
if nb_cpus:
self.sem = asyncio.Semaphore(nb_cpus)
else:
raise ValueError
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(self._processing(cmds))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
async def _processing(self, all_cmds: List[str]) -> None:
gathered = asyncio.gather(
*[asyncio.create_task(self._safe_processing(cmd)) for cmd in all_cmds],
)
await gathered
async def _safe_processing(self, cmd: str) -> None:
async with self.sem:
return await self._run_cmd(cmd)
@staticmethod
async def _run_cmd(cmd: str) -> None:
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.DEVNULL)
await proc.communicate()
if proc.returncode:
while proc.returncode == 2:
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.DEVNULL)
await proc.communicate()
# _TAGSORTORDER = """
# \\an, \\pos, \\move, \\org, \\fscx, \\fscy, \\frz, \\fry, \\frx, \\fax, \\fay, \\fn, \\fs, \\fsp, \\b,
# \\i, \\u, \\s, \\bord, \\xbord, \\ybord, \\shad, \\xshad, \\yshad, \\1c, \\2c, \\3c, \\4c, \\alpha, \\1a, \\2a, \\3a, \\4a,
# \\blur, \\be, \\fad, \\fade, clip_rect, iclip_rect, clip_vect, iclip_vect, \\q, \\p, \\k, \\kf, \\K, \\ko, junk, unknown
# """
AW_CONF: Dict[str, Any] = {
'button': 0,
'values': {
'removeInvisible': True,
'combineLines': True,
'mergeConsecutive': True,
# 'mergeConsecutiveExcept': '\\kf, \\k, \\ko',
'cleanLevel': 4,
'tagsToKeep': '\\pos',
'stripComments': True,
'removeJunk': True,
# 'tagSortOrder': _TAGSORTORDER.replace('\\r\\n', ''),
'filterClips': True,
'scale2float': True,
'fixDrawings': True,
'purgeContoursDraw': True,
'purgeContoursClip': True,
'purgeContoursIgnoreHashMismatch': False,
'extraDataMode': 'Remove all'
}
}
class AssWipeMT:
input_file: Path
io: Ass
work_dir_trims: Path = Path('_tmp_asswipemt_trims')
work_dir_clean: Path = Path('_tmp_asswipemt_clean')
tmp_files: Set[Path]
tmp_files_out: Set[Path]
def __init__(self) -> None:
self.input_file = Path(sys.argv[1])
self.io = Ass(str(self.input_file), '', comment_original=False, extended=False)
# Work dir creation
if self.work_dir_trims.exists():
shutil.rmtree(self.work_dir_trims, True)
if self.work_dir_clean.exists():
shutil.rmtree(self.work_dir_clean, True)
self.work_dir_trims.mkdir()
self.work_dir_clean.mkdir()
self.split_file()
SubProcessAsync(self.make_aw_cmds(), nb_cpus=12)
self.merge_file()
def split_file(self) -> None:
file_num = 0
wlines = [
self.io.lines[i:min(i+100, len(self.io.lines) - 1)]
for i in range(0, len(self.io.lines), 100)
]
self.tmp_files = set()
for wline in wlines:
path = Path(self.input_file).with_stem(self.input_file.name + f'_tmp_{file_num}')
self.tmp_files.add(path)
with (self.work_dir_trims / path).open('w', encoding="utf-8-sig") as file:
file.writelines(
self.io._output
+ ['\n']
+ [compose_ass_line(line) for line in wline if not line.comment]
)
file_num += 1
def make_aw_cmds(self) -> List[str]:
cmds: List[List[str]] = []
self.tmp_files_out = set()
for f in self.tmp_files:
output = self.work_dir_clean / f.with_stem(f.stem + '_out')
self.tmp_files_out.add(output)
cmds.append(self._make_aw_cmd(Path(self.io.meta.video), self.work_dir_trims / f, output))
return [' '.join(cmd) for cmd in cmds]
def merge_file(self) -> None:
ios = [Ass(str(f), '', comment_original=False, extended=False) for f in sorted(self.tmp_files_out)]
flines = [line for slines in [io.lines for io in ios] for line in slines]
tmp = self.input_file.with_stem(self.input_file.stem + '_tmp')
with tmp.open('w', encoding="utf-8-sig") as file:
file.writelines(
self.io._output
+ ['\n']
+ [compose_ass_line(line) for line in flines if not line.comment]
)
subprocess.run(
' '.join(self._make_aw_cmd(
Path(self.io.meta.video), tmp, self.input_file.with_stem(self.input_file.stem + '_out'),
['--selected-lines', ','.join([f'{i-1},{i},{i+1}' for i in range(0, len(flines), 100) if i != 0])]
)
), check=True, text=True, encoding='utf-8'
)
@staticmethod
def _make_aw_cmd(video: Path, input_p: Path, output_p: Path, acli_args: Optional[List[str]] = None) -> List[str]:
if not acli_args:
acli_args = []
return [
'aegisub-cli',
*acli_args,
'--dialog',
'"' + json.dumps(AW_CONF).replace('"', '\\"') + '"',
'--video', '"' + str(video) + '"',
# '--video', '?dummy:23.976024:400000:1920:1080:47:163:254:c',
'--automation', 'l0.ASSWipe.moon',
'"' + str(input_p) + '"',
'"' + str(output_p) + '"',
'ASSWipe'
]
if __name__ == '__main__':
AssWipeMT()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment