Skip to content

Instantly share code, notes, and snippets.

@leontrolski
Created July 23, 2025 13:38
Show Gist options
  • Select an option

  • Save leontrolski/5dd975ccd659a3973f8e02f983a711f6 to your computer and use it in GitHub Desktop.

Select an option

Save leontrolski/5dd975ccd659a3973f8e02f983a711f6 to your computer and use it in GitHub Desktop.
"""Basically the same as pydifact, but with some changes:
- Doesn't magically turn `[value] -> value`, each segment is a `list[list[str]]`.
- Public singular `segment_to_raw()` function.
- Easier to work out how the `Characters` configuration actually gets passed around.
- Simplified and de-OO-ed code. Doesn't handle segment names any differently to other values.
- Simpler style should make a rewrite-it-in-Rust (+PyO3) very easy if required.
"""
from __future__ import annotations
from dataclasses import dataclass
from functools import cache
from typing import Iterable, Iterator, TypeVar
# fmt: off
T = TypeVar("T")
class ParseError(RuntimeError): ...
class Escaped(str): ...
class Special(str): ...
Token = Special | str
Value = list[str]
NameAndSegment = tuple[str, list[Value]]
UNA = "UNA"
# fmt: on
@dataclass(frozen=True)
class Characters:
component_separator: str # inner separator
data_separator: str # outer separator
decimal_point: str # not used in parsing
escape_character: str # escape character
reserved_character: str # not used in parsing
segment_terminator: str # segment (AKA line) separator
whitespace: frozenset[str] = frozenset((" ", "\r", "\n"))
@property
@cache
def raw(self) -> str:
return "".join(c for c, _ in zip(self.__dict__.values(), range(6)))
@property
@cache
def special(self) -> set[str]:
return {self.component_separator, self.data_separator, self.segment_terminator}
@property
@cache
def escape(self) -> set[str]:
return self.special | {self.escape_character}
@staticmethod
def default() -> Characters:
return Characters(*":+.? '")
def raw_to_segments(raw: str) -> list[NameAndSegment]:
chars, message = split_raw(raw)
segments = _tokens_to_segments(chars, _tokenize(chars, message))
return [(name, segment) for [name], *segment in segments]
def segment_to_raw(
name: str,
segment: list[Value],
chars: Characters = Characters.default(),
) -> str:
return (
chars.data_separator.join(
chars.component_separator.join(_escape(chars, v) for v in values) #
for values in [[name]] + segment
)
+ chars.segment_terminator
)
def segments_to_raw(
segments: Iterable[NameAndSegment],
chars: Characters = Characters.default(),
segment_separator: str = "\n",
) -> str:
joined = [UNA + chars.raw] + [segment_to_raw(name, segment, chars) for name, segment in segments]
return segment_separator.join(joined) + segment_separator
def split_raw(raw: str) -> tuple[Characters, Iterator[str]]:
if not raw[0:3] == UNA:
raise ParseError(f"Raw must start with {UNA}")
chars = Characters(*raw[3:9])
return chars, iter(raw[9:].lstrip("\r\n"))
def pop_empty_from_end(v: list[T]) -> list[T]:
while v and not v[-1]:
v.pop()
return v
def _escape(chars: Characters, s: str) -> str:
return "".join(chars.escape_character + c if c in chars.escape else c for c in s)
def _split(tokens: list[Token], token_type: str | None) -> list[list[Token]]:
out: list[list[Token]] = [[]]
for t in tokens:
if isinstance(t, Special) and t == token_type:
out.append([])
else:
out[-1].append(t)
return out
def _group_tokens_by_segment(chars: Characters, tokens: Iterator[Token]) -> Iterator[list[Token]]:
t = next(tokens, None)
segment = list[Token]()
while t is not None:
if isinstance(t, Special) and t == chars.segment_terminator:
yield segment
segment = []
else:
segment.append(t)
t = next(tokens, None)
def _single_token_to_str(tokens: list[list[Token]]) -> Value:
return pop_empty_from_end([ts[0] if ts else "" for ts in tokens])
def _tokens_to_segments(chars: Characters, tokens: Iterator[Token]) -> Iterator[list[Value]]:
for ts in _group_tokens_by_segment(chars, tokens):
yield [
_single_token_to_str(_split(outer, chars.component_separator)) #
for outer in _split(ts, chars.data_separator)
]
def _tokenize(chars: Characters, message: Iterator[str]) -> Iterator[Token]:
def next_c() -> str | Escaped | None:
c: str | Escaped | None = next(message, None)
if c == chars.escape_character:
c = Escaped(next(message))
return c
c = next_c()
def is_special() -> bool:
return not isinstance(c, Escaped) and c in chars.special
while c is not None:
if is_special():
token = Special(c)
c = next_c()
if token == chars.segment_terminator:
while c in chars.whitespace:
c = next_c()
yield token
else:
word = ""
while not is_special():
if c is None:
raise ParseError("Unexpected end of EDI message")
word += c
c = next_c()
yield word
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment