Skip to content

Instantly share code, notes, and snippets.

@ivoanjo
Created March 5, 2026 15:25
Show Gist options
  • Select an option

  • Save ivoanjo/b98d4288371c51af10958bea30c47884 to your computer and use it in GitHub Desktop.

Select an option

Save ivoanjo/b98d4288371c51af10958bea30c47884 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Pure Python port of example_ctx.c + otel_process_ctx.h/c.
Publishes an OpenTelemetry process context via a memfd-backed private mapping
that can be read by otel_process_ctx_dump.sh. No external dependencies.
"""
import ctypes
import os
import struct
import time
# ── Protobuf encoder ──────────────────────────────────────────────────────────
#
# Mirrors the compact hand-rolled encoder in otel_process_ctx.c exactly.
# Only supports 1- or 2-byte varints (values 0-16383, UINT14_MAX), matching
# the C implementation's limitation.
def _pb_varint(v):
"""Encode a protobuf varint (limited to 0..16383)."""
if v < 128:
return bytes([v])
if v > 16383:
raise ValueError(f"Value {v} exceeds UINT14_MAX (16383) limit")
return bytes([(v & 0x7F) | 0x80, v >> 7])
def _pb_tag(field):
"""LEN wire-type (2) tag for the given field number."""
return bytes([(field << 3) | 2])
def _pb_string(s):
"""Varint-prefixed UTF-8 string bytes (no tag)."""
b = s.encode()
return _pb_varint(len(b)) + b
def _str_size(s):
"""Total size of (tag + varint-length + data) for a string field.
Matches C's protobuf_string_size = protobuf_record_size(strlen(str)).
"""
n = len(s.encode())
return 1 + (1 if n < 128 else 2) + n
def _rec_size(content_len):
"""Size of tag(1) + varint(content_len) + content_len bytes.
Matches C's protobuf_record_size.
"""
return 1 + (1 if content_len < 128 else 2) + content_len
def _write_kv_string(field, key, value):
"""Encode field_tag + KeyValue{key, AnyValue{string_value=value}}.
Matches C's write_attribute.
"""
kv_size = _str_size(key) + _rec_size(_str_size(value))
out = _pb_tag(field) + _pb_varint(kv_size)
out += _pb_tag(1) + _pb_string(key) # KeyValue.key
out += _pb_tag(2) + _pb_varint(_str_size(value)) # KeyValue.value = AnyValue
out += _pb_tag(1) + _pb_string(value) # AnyValue.string_value
return out
def _arr_content_size(strings):
"""Size of ArrayValue content (sum of AnyValue entries).
Matches C's protobuf_otel_array_value_content_size.
"""
return sum(_rec_size(_str_size(s)) for s in strings)
def _write_kv_array(field, key, strings):
"""Encode field_tag + KeyValue{key, AnyValue{array_value=[strings]}}.
Matches C's write_array_attribute.
"""
arr_size = _arr_content_size(strings)
any_size = _rec_size(arr_size) # AnyValue content size
kv_size = _str_size(key) + _rec_size(any_size) # KeyValue content size
out = _pb_tag(field) + _pb_varint(kv_size)
out += _pb_tag(1) + _pb_string(key) # KeyValue.key
out += _pb_tag(2) + _pb_varint(any_size) # KeyValue.value = AnyValue
out += _pb_tag(5) + _pb_varint(arr_size) # AnyValue.array_value = ArrayValue
for s in strings: # ArrayValue.values (repeated AnyValue)
out += _pb_tag(1) + _pb_varint(_str_size(s)) # ArrayValue.values[i] tag + AnyValue size
out += _pb_tag(1) + _pb_string(s) # AnyValue.string_value
return out
def encode_process_context(
service_name, service_instance_id, deployment_environment_name, service_version,
telemetry_sdk_name, telemetry_sdk_language, telemetry_sdk_version,
resource_attributes=None, extra_attributes=None, thread_ctx_config=None,
):
"""Encode a ProcessContext protobuf payload.
Matches C's otel_process_ctx_encode_protobuf_payload.
Layout:
ProcessContext.resource (field 1) = Resource{attributes = [KeyValue...]}
ProcessContext.extra_attributes (field 2) = repeated KeyValue
"""
# Standard fields + optional extra resource attributes → Resource.attributes
resource = b''
for key, val in [
("deployment.environment.name", deployment_environment_name),
("service.instance.id", service_instance_id),
("service.name", service_name),
("service.version", service_version),
("telemetry.sdk.language", telemetry_sdk_language),
("telemetry.sdk.version", telemetry_sdk_version),
("telemetry.sdk.name", telemetry_sdk_name),
]:
resource += _write_kv_string(1, key, val)
for key, val in (resource_attributes or []):
resource += _write_kv_string(1, key, val)
# ProcessContext.resource (field 1)
payload = _pb_tag(1) + _pb_varint(len(resource)) + resource
# ProcessContext.extra_attributes (field 2)
for key, val in (extra_attributes or []):
payload += _write_kv_string(2, key, val)
# Thread context config encoded as extra_attributes (field 2)
if thread_ctx_config:
sv = thread_ctx_config.get("schema_version")
km = thread_ctx_config.get("attribute_key_map")
if sv:
payload += _write_kv_string(2, "threadlocal.schema_version", sv)
if km:
payload += _write_kv_array(2, "threadlocal.attribute_key_map", km)
return payload
# ── Linux syscall wrappers ─────────────────────────────────────────────────────
_libc = ctypes.CDLL("libc.so.6", use_errno=True)
_libc.memfd_create.argtypes = [ctypes.c_char_p, ctypes.c_uint]
_libc.memfd_create.restype = ctypes.c_int
_libc.ftruncate.argtypes = [ctypes.c_int, ctypes.c_long]
_libc.ftruncate.restype = ctypes.c_int
# Use c_size_t (unsigned) so MAP_FAILED=0xffff...ffff comes back as a large int,
# not None (which c_void_p returns for NULL).
_libc.mmap.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_int,
ctypes.c_int, ctypes.c_int, ctypes.c_long]
_libc.mmap.restype = ctypes.c_size_t
_libc.madvise.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_int]
_libc.madvise.restype = ctypes.c_int
_libc.munmap.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
_libc.munmap.restype = ctypes.c_int
_libc.close.argtypes = [ctypes.c_int]
_libc.close.restype = ctypes.c_int
_libc.prctl.argtypes = [ctypes.c_int, ctypes.c_ulong, ctypes.c_ulong,
ctypes.c_ulong, ctypes.c_ulong]
_libc.prctl.restype = ctypes.c_int
MFD_CLOEXEC = 0x0001
MFD_ALLOW_SEALING = 0x0002
MFD_NOEXEC_SEAL = 0x0008
PROT_READ = 0x1
PROT_WRITE = 0x2
MAP_PRIVATE = 0x02
MADV_DONTFORK = 10
MAP_FAILED = (1 << 64) - 1 # (void *)-1 on 64-bit
PR_SET_VMA = 0x53564D41
PR_SET_VMA_ANON_NAME = 0
# sizeof(otel_process_ctx_mapping) – packed struct:
# char[8] signature offset 0
# uint32 version offset 8
# uint32 payload_size offset 12
# uint64 published_at_ns offset 16
# uint64 payload_ptr offset 24
MAPPING_SIZE = 32
# Published state (module-level globals keep objects alive)
_mapping_addr = None
_payload_buffer = None # ctypes buffer; must stay alive while published
def publish_otel_ctx(payload_bytes):
"""Create a memfd-backed mapping and write the otel_process_ctx_mapping struct.
The mapping appears in /proc/<pid>/maps as '/memfd:OTEL_CTX (deleted)'
which otel_process_ctx_dump.sh finds with grep.
Matches the publish path in otel_process_ctx_publish().
"""
global _mapping_addr, _payload_buffer
# Allocate payload in a stable ctypes buffer.
# CPython never moves objects in memory, so addressof() stays valid.
_payload_buffer = ctypes.create_string_buffer(payload_bytes)
payload_addr = ctypes.addressof(_payload_buffer)
payload_size = len(payload_bytes)
# Create memfd named "OTEL_CTX"
fd = _libc.memfd_create(b"OTEL_CTX", MFD_CLOEXEC | MFD_ALLOW_SEALING | MFD_NOEXEC_SEAL)
if fd < 0:
# MFD_NOEXEC_SEAL requires kernel ≥ 6.3; retry without it
fd = _libc.memfd_create(b"OTEL_CTX", MFD_CLOEXEC | MFD_ALLOW_SEALING)
if fd < 0:
raise OSError(f"memfd_create failed (errno={ctypes.get_errno()})")
if _libc.ftruncate(fd, MAPPING_SIZE) != 0:
_libc.close(fd)
raise OSError("ftruncate failed")
addr = _libc.mmap(0, MAPPING_SIZE, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0)
_libc.close(fd) # fd can be closed; mapping keeps the memfd inode alive
if addr == MAP_FAILED or addr == 0:
raise OSError(f"mmap failed (errno={ctypes.get_errno()})")
_mapping_addr = addr
_libc.madvise(addr, MAPPING_SIZE, MADV_DONTFORK)
# Write struct with published_at_ns=0 first (signals "update in progress"),
# then set it last – matching the ordering in the C implementation.
hdr = struct.pack('<8sIIQQ',
b"OTEL_CTX", # signature
2, # version
payload_size,
0, # published_at_ns placeholder
payload_addr) # pointer into _payload_buffer
ctypes.memmove(addr, hdr, MAPPING_SIZE)
# "Fence": set published_at_ns last so readers see a consistent mapping.
published_at_ns = time.time_ns()
ctypes.memmove(addr + 16, struct.pack('<Q', published_at_ns), 8)
# Name the mapping (optional; may fail on kernels without CONFIG_ANON_VMA_NAME).
# Still called unconditionally so eBPF hooks on prctl can detect publication,
# even when the feature is unsupported.
name_buf = ctypes.create_string_buffer(b"OTEL_CTX")
_libc.prctl(PR_SET_VMA, PR_SET_VMA_ANON_NAME,
addr, MAPPING_SIZE, ctypes.addressof(name_buf))
def drop_otel_ctx():
"""Unmap the mapping and release the payload buffer."""
global _mapping_addr, _payload_buffer
if _mapping_addr is not None:
_libc.munmap(_mapping_addr, MAPPING_SIZE)
_mapping_addr = None
_payload_buffer = None
# ── Example program (mirrors example_ctx.c main()) ───────────────────────────
RESOURCE_ATTRIBUTES = [
("resource.key1", "resource.value1"),
("resource.key2", "resource.value2"),
]
EXTRA_ATTRIBUTES = [
("example_extra_attribute_foo", "example_extra_attribute_foo_value"),
]
THREAD_CTX_CONFIG = {
"schema_version": "tlsdesc_v1_dev",
"attribute_key_map": ["http_route", "http_method", "user_id"],
}
def main():
payload = encode_process_context(
service_name = "my-service",
service_instance_id = "123d8444-2c7e-46e3-89f6-6217880f7123",
deployment_environment_name = "prod",
service_version = "4.5.6",
telemetry_sdk_name = "example_ctx_python.py",
telemetry_sdk_language = "python",
telemetry_sdk_version = "1.2.3",
resource_attributes = RESOURCE_ATTRIBUTES,
extra_attributes = EXTRA_ATTRIBUTES,
thread_ctx_config = THREAD_CTX_CONFIG,
)
publish_otel_ctx(payload)
pid = os.getpid()
print(f"Published OTEL_CTX context (pid={pid})")
print(f"TIP: You can now run: sudo ./otel_process_ctx_dump.sh {pid}")
print("Press Ctrl+C to exit...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
drop_otel_ctx()
print("Context dropped.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment