Last active
September 25, 2025 21:18
-
-
Save nebukadhezer/136608889ccd8357288997050ca7c67b to your computer and use it in GitHub Desktop.
Oiio Multilayer to Multipart with bit depth conversion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import fnmatch | |
| import logging | |
| import os | |
| import re | |
| from collections import OrderedDict | |
| import OpenImageIO as oiio | |
| from OpenImageIO import ImageBuf, ImageBufAlgo, ImageInput, ImageOutput, ImageSpec | |
| LOGGER = logging.getLogger("oiio_helper") | |
| logging.basicConfig(level=logging.INFO) | |
| CHANNEL_RENAME_MAP = { | |
| "ViewLayer.": "", | |
| "ViewLayer_": "", | |
| "FinalImageMovieRenderQueue_": "", | |
| "position": "WorldPos", | |
| } | |
| FLOAT_LAYER_KEEPERS = ["crypto", "depth", "position", "actorhitproxy", "worldpos"] | |
| CRYPTO_METADATA_NAMES = ["crypto", "actorhitproxy"] | |
| RE_FRAMES = re.compile(r"(?:.*)([.|_][0-9]{2,10}[.|_])(?:.*)") | |
| OCIO_FALLBACK = "" | |
| def construct_out_path(path: str) -> str: | |
| """ | |
| method to add _mp for multipart before the frame padding | |
| so no outpath has to be specified | |
| :param path: | |
| :return: | |
| """ | |
| out_path = ".".join(path.split(".")[:-1]) | |
| out_path = out_path + "_mp." + path.split(".")[-1] | |
| frames = re.search(RE_FRAMES, path) | |
| if frames is not None: | |
| out_path = path.split(frames.groups()[0])[0] | |
| out_path = ( | |
| out_path + "_mp" + frames.groups()[0] + path.split(frames.groups()[0])[-1] | |
| ) | |
| LOGGER.info(out_path) | |
| return out_path | |
| def is_multipart(path: str) -> [bool, None]: | |
| """ | |
| check if this is a multipart image | |
| seeks for the "second" subimage, returns false if there is none | |
| :param path: | |
| :return: bool | |
| """ | |
| inp = ImageInput.open(path) | |
| if inp: | |
| check = inp.seek_subimage(1, 0) | |
| inp.close() | |
| return check | |
| # in case sth goes wrong | |
| LOGGER.error(f"Cannot properly seek subimages for {path}") | |
| inp.close() | |
| return None | |
| def extract_layers(path: str) -> OrderedDict: | |
| """ | |
| create an ordered dict of sanitized layer names and the corresponding channels from the | |
| exr | |
| the key is the new name of the layer/subimage and | |
| the value is the list of channels that make up that layer | |
| ('rgba', ['ViewLayer_Combined.red', | |
| 'ViewLayer_Combined.green', | |
| 'ViewLayer_Combined.blue', | |
| 'ViewLayer_Combined.alpha']) | |
| :param path: | |
| :return: | |
| """ | |
| img = ImageInput.open(path) | |
| channels = img.spec().channelnames | |
| layers = OrderedDict() | |
| for i in channels: | |
| layername = ".".join(i.split(".")[:-1]) | |
| for k, v in CHANNEL_RENAME_MAP.items(): | |
| layername = layername.replace(k, v) | |
| if layername in ("Combined", ""): | |
| layername = "rgba" | |
| # if layername.lower().__contains__("depth"): | |
| # layername = "Depth" | |
| if layername not in layers: | |
| layers[layername] = [i] | |
| else: | |
| layers[layername].append(i) | |
| img.close() | |
| LOGGER.debug(layers) | |
| return layers | |
| def fix_metadata_names(metadata: dict) -> dict: | |
| """ | |
| checks for keys named like in CRYPTO_METADATA_NAMES and | |
| then applies the CHANNEL_RENAME_MAP | |
| :param metadata: dict | |
| :return: dict | |
| """ | |
| for i in range(len(metadata)): | |
| for c in CRYPTO_METADATA_NAMES: | |
| if fnmatch.filter([metadata[i].name.lower()], c + "*name"): | |
| value = metadata[i].value | |
| for k, v in CHANNEL_RENAME_MAP.items(): | |
| value = value.replace(k, v) | |
| metadata.attribute(metadata[i].name, value) | |
| return metadata | |
| def create_lutted_jpg_from_buf( | |
| path, | |
| buf=None, | |
| display="Rec.1886 Rec.709 - Display", | |
| view="ACES 1.0 - SDR Video", | |
| fromspace="lin_rec709_srgb", | |
| subsampling="4:2:0", | |
| quality=93, | |
| ): | |
| if not os.getenv("OCIO", ""): | |
| os.environ["OCIO"] = OCIO_FALLBACK | |
| if not buf: | |
| # find index of the 'rgba' subimage | |
| img = ImageInput.open(path) | |
| if not img: | |
| raise Exception(f"Couldn't open {path} - {oiio.geterror()}") | |
| sub_index = 0 | |
| while True: | |
| ok = img.seek_subimage(sub_index, 0) | |
| if not ok: | |
| LOGGER.warning(f"Couldn't find rgba subimage in {path}") | |
| sub_index = 0 | |
| break | |
| # Todo need to fix this | |
| if img.spec().channel_name(0).startswith("rgba"): | |
| break | |
| sub_index += 1 | |
| buf = ImageBuf(path, sub_index, 0) | |
| jpg_path = path.replace("exr", "jpg") | |
| if not os.path.isdir(os.path.dirname(jpg_path)): | |
| try: | |
| os.makedirs(os.path.dirname(jpg_path), exist_ok=True) | |
| except: | |
| pass | |
| buf = ImageBufAlgo.ociodisplay(buf, display, view, fromspace) | |
| buf.spec().attribute("jpeg:subsampling", subsampling) | |
| buf.spec().attribute("Compression", f"jpeg:{quality}") | |
| if buf.has_error: | |
| LOGGER.error(f"Error writing {jpg_path}: {buf.geterror()}") | |
| else: | |
| buf.write(jpg_path, oiio.UINT8) | |
| LOGGER.info( | |
| f"created lutted jpg {jpg_path}, with {display}, {view} from {fromspace}" | |
| ) | |
| img.close() | |
| if img.has_error: | |
| LOGGER.error(f"Error closing {path}: {img.geterror()}") | |
| return jpg_path | |
| def convert_to_multipart( | |
| path: str, | |
| out_path: str = None, | |
| write: bool = True, | |
| replace: bool = False, | |
| create_jpg: bool = False, | |
| ) -> str: | |
| """ | |
| convert a given exr to a multipart exr | |
| """ | |
| if not path.lower().endswith(".exr"): | |
| # no exr bail out | |
| return "" | |
| if is_multipart(path): | |
| # check if it is already a mp | |
| if create_jpg: | |
| # still do the jpg conversion | |
| jpg = create_lutted_jpg_from_buf(path) | |
| LOGGER.info(f"{path} is a multipart image, skipping") | |
| return "" | |
| buf_spec = OrderedDict() | |
| layers = extract_layers(path) | |
| img = ImageBuf(path) | |
| for layer_name, channels in layers.items(): | |
| new_channels = [] | |
| for ch in channels: | |
| ext = ch.split(".")[-1] | |
| new_channels.append(layer_name + "." + ext) | |
| LOGGER.debug(new_channels) | |
| buf = oiio.ImageBuf() | |
| ImageBufAlgo.channels(buf, img, tuple(channels), tuple(new_channels)) | |
| bit_depth = oiio.HALF | |
| # check if we want to keeps this si as float | |
| for float_layer in FLOAT_LAYER_KEEPERS: | |
| if layer_name.lower().__contains__(float_layer): | |
| bit_depth = oiio.FLOAT | |
| break | |
| buf.set_write_format(bit_depth) | |
| # create the spec | |
| spec = oiio.ImageSpec( | |
| img.spec().width, img.spec().height, len(new_channels), bit_depth | |
| ) | |
| spec.channelnames = tuple(new_channels) | |
| spec.attribute("oiio:subimagename", layer_name) | |
| # Todo check roi and add it | |
| # fix metadata to make cryptos work | |
| if layer_name.lower().__contains__("rgb") or layer_name.lower() == "": | |
| extra_attribs = fix_metadata_names(img.spec().extra_attribs) | |
| spec.extra_attribs = extra_attribs | |
| buf_spec[buf] = spec | |
| if write: | |
| if not out_path: | |
| out_path = construct_out_path(path) | |
| if os.path.isfile(out_path): | |
| try: | |
| os.unlink(out_path) | |
| except: | |
| pass | |
| out = ImageOutput.create(out_path) | |
| count = 0 | |
| for b, s in buf_spec.items(): | |
| if count > 0: | |
| p = out.open(out_path, s, "AppendSubimage") | |
| else: | |
| p = out.open(out_path, tuple(buf_spec.values())) | |
| LOGGER.debug(p) | |
| count += 1 | |
| b.write(out) | |
| out.close() | |
| if replace: | |
| try: | |
| os.unlink(path) | |
| os.rename(out_path, path) | |
| except Exception as e: | |
| LOGGER.exception(e) | |
| else: | |
| LOGGER.info(f"replaced {path} with {out_path}") | |
| out_path = path | |
| if create_jpg: | |
| jpg = create_lutted_jpg_from_buf(out_path) | |
| return out_path |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment