Created
July 23, 2025 13:38
-
-
Save leontrolski/5dd975ccd659a3973f8e02f983a711f6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Basically the same as pydifact, but with some changes: | |
| - Doesn't magically turn `[value] -> value`, each segment is a `list[list[str]]`. | |
| - Public singular `segment_to_raw()` function. | |
| - Easier to work out how the `Characters` configuration actually gets passed around. | |
| - Simplified and de-OO-ed code. Doesn't handle segment names any differently to other values. | |
| - Simpler style should make a rewrite-it-in-Rust (+PyO3) very easy if required. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from functools import cache | |
| from typing import Iterable, Iterator, TypeVar | |
| # fmt: off | |
| T = TypeVar("T") | |
| class ParseError(RuntimeError): ... | |
| class Escaped(str): ... | |
| class Special(str): ... | |
| Token = Special | str | |
| Value = list[str] | |
| NameAndSegment = tuple[str, list[Value]] | |
| UNA = "UNA" | |
| # fmt: on | |
| @dataclass(frozen=True) | |
| class Characters: | |
| component_separator: str # inner separator | |
| data_separator: str # outer separator | |
| decimal_point: str # not used in parsing | |
| escape_character: str # escape character | |
| reserved_character: str # not used in parsing | |
| segment_terminator: str # segment (AKA line) separator | |
| whitespace: frozenset[str] = frozenset((" ", "\r", "\n")) | |
| @property | |
| @cache | |
| def raw(self) -> str: | |
| return "".join(c for c, _ in zip(self.__dict__.values(), range(6))) | |
| @property | |
| @cache | |
| def special(self) -> set[str]: | |
| return {self.component_separator, self.data_separator, self.segment_terminator} | |
| @property | |
| @cache | |
| def escape(self) -> set[str]: | |
| return self.special | {self.escape_character} | |
| @staticmethod | |
| def default() -> Characters: | |
| return Characters(*":+.? '") | |
| def raw_to_segments(raw: str) -> list[NameAndSegment]: | |
| chars, message = split_raw(raw) | |
| segments = _tokens_to_segments(chars, _tokenize(chars, message)) | |
| return [(name, segment) for [name], *segment in segments] | |
| def segment_to_raw( | |
| name: str, | |
| segment: list[Value], | |
| chars: Characters = Characters.default(), | |
| ) -> str: | |
| return ( | |
| chars.data_separator.join( | |
| chars.component_separator.join(_escape(chars, v) for v in values) # | |
| for values in [[name]] + segment | |
| ) | |
| + chars.segment_terminator | |
| ) | |
| def segments_to_raw( | |
| segments: Iterable[NameAndSegment], | |
| chars: Characters = Characters.default(), | |
| segment_separator: str = "\n", | |
| ) -> str: | |
| joined = [UNA + chars.raw] + [segment_to_raw(name, segment, chars) for name, segment in segments] | |
| return segment_separator.join(joined) + segment_separator | |
| def split_raw(raw: str) -> tuple[Characters, Iterator[str]]: | |
| if not raw[0:3] == UNA: | |
| raise ParseError(f"Raw must start with {UNA}") | |
| chars = Characters(*raw[3:9]) | |
| return chars, iter(raw[9:].lstrip("\r\n")) | |
| def pop_empty_from_end(v: list[T]) -> list[T]: | |
| while v and not v[-1]: | |
| v.pop() | |
| return v | |
| def _escape(chars: Characters, s: str) -> str: | |
| return "".join(chars.escape_character + c if c in chars.escape else c for c in s) | |
| def _split(tokens: list[Token], token_type: str | None) -> list[list[Token]]: | |
| out: list[list[Token]] = [[]] | |
| for t in tokens: | |
| if isinstance(t, Special) and t == token_type: | |
| out.append([]) | |
| else: | |
| out[-1].append(t) | |
| return out | |
| def _group_tokens_by_segment(chars: Characters, tokens: Iterator[Token]) -> Iterator[list[Token]]: | |
| t = next(tokens, None) | |
| segment = list[Token]() | |
| while t is not None: | |
| if isinstance(t, Special) and t == chars.segment_terminator: | |
| yield segment | |
| segment = [] | |
| else: | |
| segment.append(t) | |
| t = next(tokens, None) | |
| def _single_token_to_str(tokens: list[list[Token]]) -> Value: | |
| return pop_empty_from_end([ts[0] if ts else "" for ts in tokens]) | |
| def _tokens_to_segments(chars: Characters, tokens: Iterator[Token]) -> Iterator[list[Value]]: | |
| for ts in _group_tokens_by_segment(chars, tokens): | |
| yield [ | |
| _single_token_to_str(_split(outer, chars.component_separator)) # | |
| for outer in _split(ts, chars.data_separator) | |
| ] | |
| def _tokenize(chars: Characters, message: Iterator[str]) -> Iterator[Token]: | |
| def next_c() -> str | Escaped | None: | |
| c: str | Escaped | None = next(message, None) | |
| if c == chars.escape_character: | |
| c = Escaped(next(message)) | |
| return c | |
| c = next_c() | |
| def is_special() -> bool: | |
| return not isinstance(c, Escaped) and c in chars.special | |
| while c is not None: | |
| if is_special(): | |
| token = Special(c) | |
| c = next_c() | |
| if token == chars.segment_terminator: | |
| while c in chars.whitespace: | |
| c = next_c() | |
| yield token | |
| else: | |
| word = "" | |
| while not is_special(): | |
| if c is None: | |
| raise ParseError("Unexpected end of EDI message") | |
| word += c | |
| c = next_c() | |
| yield word | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment