Created
March 5, 2026 15:25
-
-
Save ivoanjo/b98d4288371c51af10958bea30c47884 to your computer and use it in GitHub Desktop.
Claude port of https://github.com/open-telemetry/sig-profiling/tree/main/process-context/c-and-cpp to pure python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Pure Python port of example_ctx.c + otel_process_ctx.h/c. | |
| Publishes an OpenTelemetry process context via a memfd-backed private mapping | |
| that can be read by otel_process_ctx_dump.sh. No external dependencies. | |
| """ | |
| import ctypes | |
| import os | |
| import struct | |
| import time | |
| # ── Protobuf encoder ────────────────────────────────────────────────────────── | |
| # | |
| # Mirrors the compact hand-rolled encoder in otel_process_ctx.c exactly. | |
| # Only supports 1- or 2-byte varints (values 0-16383, UINT14_MAX), matching | |
| # the C implementation's limitation. | |
| def _pb_varint(v): | |
| """Encode a protobuf varint (limited to 0..16383).""" | |
| if v < 128: | |
| return bytes([v]) | |
| if v > 16383: | |
| raise ValueError(f"Value {v} exceeds UINT14_MAX (16383) limit") | |
| return bytes([(v & 0x7F) | 0x80, v >> 7]) | |
| def _pb_tag(field): | |
| """LEN wire-type (2) tag for the given field number.""" | |
| return bytes([(field << 3) | 2]) | |
| def _pb_string(s): | |
| """Varint-prefixed UTF-8 string bytes (no tag).""" | |
| b = s.encode() | |
| return _pb_varint(len(b)) + b | |
| def _str_size(s): | |
| """Total size of (tag + varint-length + data) for a string field. | |
| Matches C's protobuf_string_size = protobuf_record_size(strlen(str)). | |
| """ | |
| n = len(s.encode()) | |
| return 1 + (1 if n < 128 else 2) + n | |
| def _rec_size(content_len): | |
| """Size of tag(1) + varint(content_len) + content_len bytes. | |
| Matches C's protobuf_record_size. | |
| """ | |
| return 1 + (1 if content_len < 128 else 2) + content_len | |
| def _write_kv_string(field, key, value): | |
| """Encode field_tag + KeyValue{key, AnyValue{string_value=value}}. | |
| Matches C's write_attribute. | |
| """ | |
| kv_size = _str_size(key) + _rec_size(_str_size(value)) | |
| out = _pb_tag(field) + _pb_varint(kv_size) | |
| out += _pb_tag(1) + _pb_string(key) # KeyValue.key | |
| out += _pb_tag(2) + _pb_varint(_str_size(value)) # KeyValue.value = AnyValue | |
| out += _pb_tag(1) + _pb_string(value) # AnyValue.string_value | |
| return out | |
| def _arr_content_size(strings): | |
| """Size of ArrayValue content (sum of AnyValue entries). | |
| Matches C's protobuf_otel_array_value_content_size. | |
| """ | |
| return sum(_rec_size(_str_size(s)) for s in strings) | |
| def _write_kv_array(field, key, strings): | |
| """Encode field_tag + KeyValue{key, AnyValue{array_value=[strings]}}. | |
| Matches C's write_array_attribute. | |
| """ | |
| arr_size = _arr_content_size(strings) | |
| any_size = _rec_size(arr_size) # AnyValue content size | |
| kv_size = _str_size(key) + _rec_size(any_size) # KeyValue content size | |
| out = _pb_tag(field) + _pb_varint(kv_size) | |
| out += _pb_tag(1) + _pb_string(key) # KeyValue.key | |
| out += _pb_tag(2) + _pb_varint(any_size) # KeyValue.value = AnyValue | |
| out += _pb_tag(5) + _pb_varint(arr_size) # AnyValue.array_value = ArrayValue | |
| for s in strings: # ArrayValue.values (repeated AnyValue) | |
| out += _pb_tag(1) + _pb_varint(_str_size(s)) # ArrayValue.values[i] tag + AnyValue size | |
| out += _pb_tag(1) + _pb_string(s) # AnyValue.string_value | |
| return out | |
| def encode_process_context( | |
| service_name, service_instance_id, deployment_environment_name, service_version, | |
| telemetry_sdk_name, telemetry_sdk_language, telemetry_sdk_version, | |
| resource_attributes=None, extra_attributes=None, thread_ctx_config=None, | |
| ): | |
| """Encode a ProcessContext protobuf payload. | |
| Matches C's otel_process_ctx_encode_protobuf_payload. | |
| Layout: | |
| ProcessContext.resource (field 1) = Resource{attributes = [KeyValue...]} | |
| ProcessContext.extra_attributes (field 2) = repeated KeyValue | |
| """ | |
| # Standard fields + optional extra resource attributes → Resource.attributes | |
| resource = b'' | |
| for key, val in [ | |
| ("deployment.environment.name", deployment_environment_name), | |
| ("service.instance.id", service_instance_id), | |
| ("service.name", service_name), | |
| ("service.version", service_version), | |
| ("telemetry.sdk.language", telemetry_sdk_language), | |
| ("telemetry.sdk.version", telemetry_sdk_version), | |
| ("telemetry.sdk.name", telemetry_sdk_name), | |
| ]: | |
| resource += _write_kv_string(1, key, val) | |
| for key, val in (resource_attributes or []): | |
| resource += _write_kv_string(1, key, val) | |
| # ProcessContext.resource (field 1) | |
| payload = _pb_tag(1) + _pb_varint(len(resource)) + resource | |
| # ProcessContext.extra_attributes (field 2) | |
| for key, val in (extra_attributes or []): | |
| payload += _write_kv_string(2, key, val) | |
| # Thread context config encoded as extra_attributes (field 2) | |
| if thread_ctx_config: | |
| sv = thread_ctx_config.get("schema_version") | |
| km = thread_ctx_config.get("attribute_key_map") | |
| if sv: | |
| payload += _write_kv_string(2, "threadlocal.schema_version", sv) | |
| if km: | |
| payload += _write_kv_array(2, "threadlocal.attribute_key_map", km) | |
| return payload | |
| # ── Linux syscall wrappers ───────────────────────────────────────────────────── | |
| _libc = ctypes.CDLL("libc.so.6", use_errno=True) | |
| _libc.memfd_create.argtypes = [ctypes.c_char_p, ctypes.c_uint] | |
| _libc.memfd_create.restype = ctypes.c_int | |
| _libc.ftruncate.argtypes = [ctypes.c_int, ctypes.c_long] | |
| _libc.ftruncate.restype = ctypes.c_int | |
| # Use c_size_t (unsigned) so MAP_FAILED=0xffff...ffff comes back as a large int, | |
| # not None (which c_void_p returns for NULL). | |
| _libc.mmap.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_int, | |
| ctypes.c_int, ctypes.c_int, ctypes.c_long] | |
| _libc.mmap.restype = ctypes.c_size_t | |
| _libc.madvise.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_int] | |
| _libc.madvise.restype = ctypes.c_int | |
| _libc.munmap.argtypes = [ctypes.c_void_p, ctypes.c_size_t] | |
| _libc.munmap.restype = ctypes.c_int | |
| _libc.close.argtypes = [ctypes.c_int] | |
| _libc.close.restype = ctypes.c_int | |
| _libc.prctl.argtypes = [ctypes.c_int, ctypes.c_ulong, ctypes.c_ulong, | |
| ctypes.c_ulong, ctypes.c_ulong] | |
| _libc.prctl.restype = ctypes.c_int | |
| MFD_CLOEXEC = 0x0001 | |
| MFD_ALLOW_SEALING = 0x0002 | |
| MFD_NOEXEC_SEAL = 0x0008 | |
| PROT_READ = 0x1 | |
| PROT_WRITE = 0x2 | |
| MAP_PRIVATE = 0x02 | |
| MADV_DONTFORK = 10 | |
| MAP_FAILED = (1 << 64) - 1 # (void *)-1 on 64-bit | |
| PR_SET_VMA = 0x53564D41 | |
| PR_SET_VMA_ANON_NAME = 0 | |
| # sizeof(otel_process_ctx_mapping) – packed struct: | |
| # char[8] signature offset 0 | |
| # uint32 version offset 8 | |
| # uint32 payload_size offset 12 | |
| # uint64 published_at_ns offset 16 | |
| # uint64 payload_ptr offset 24 | |
| MAPPING_SIZE = 32 | |
| # Published state (module-level globals keep objects alive) | |
| _mapping_addr = None | |
| _payload_buffer = None # ctypes buffer; must stay alive while published | |
| def publish_otel_ctx(payload_bytes): | |
| """Create a memfd-backed mapping and write the otel_process_ctx_mapping struct. | |
| The mapping appears in /proc/<pid>/maps as '/memfd:OTEL_CTX (deleted)' | |
| which otel_process_ctx_dump.sh finds with grep. | |
| Matches the publish path in otel_process_ctx_publish(). | |
| """ | |
| global _mapping_addr, _payload_buffer | |
| # Allocate payload in a stable ctypes buffer. | |
| # CPython never moves objects in memory, so addressof() stays valid. | |
| _payload_buffer = ctypes.create_string_buffer(payload_bytes) | |
| payload_addr = ctypes.addressof(_payload_buffer) | |
| payload_size = len(payload_bytes) | |
| # Create memfd named "OTEL_CTX" | |
| fd = _libc.memfd_create(b"OTEL_CTX", MFD_CLOEXEC | MFD_ALLOW_SEALING | MFD_NOEXEC_SEAL) | |
| if fd < 0: | |
| # MFD_NOEXEC_SEAL requires kernel ≥ 6.3; retry without it | |
| fd = _libc.memfd_create(b"OTEL_CTX", MFD_CLOEXEC | MFD_ALLOW_SEALING) | |
| if fd < 0: | |
| raise OSError(f"memfd_create failed (errno={ctypes.get_errno()})") | |
| if _libc.ftruncate(fd, MAPPING_SIZE) != 0: | |
| _libc.close(fd) | |
| raise OSError("ftruncate failed") | |
| addr = _libc.mmap(0, MAPPING_SIZE, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0) | |
| _libc.close(fd) # fd can be closed; mapping keeps the memfd inode alive | |
| if addr == MAP_FAILED or addr == 0: | |
| raise OSError(f"mmap failed (errno={ctypes.get_errno()})") | |
| _mapping_addr = addr | |
| _libc.madvise(addr, MAPPING_SIZE, MADV_DONTFORK) | |
| # Write struct with published_at_ns=0 first (signals "update in progress"), | |
| # then set it last – matching the ordering in the C implementation. | |
| hdr = struct.pack('<8sIIQQ', | |
| b"OTEL_CTX", # signature | |
| 2, # version | |
| payload_size, | |
| 0, # published_at_ns placeholder | |
| payload_addr) # pointer into _payload_buffer | |
| ctypes.memmove(addr, hdr, MAPPING_SIZE) | |
| # "Fence": set published_at_ns last so readers see a consistent mapping. | |
| published_at_ns = time.time_ns() | |
| ctypes.memmove(addr + 16, struct.pack('<Q', published_at_ns), 8) | |
| # Name the mapping (optional; may fail on kernels without CONFIG_ANON_VMA_NAME). | |
| # Still called unconditionally so eBPF hooks on prctl can detect publication, | |
| # even when the feature is unsupported. | |
| name_buf = ctypes.create_string_buffer(b"OTEL_CTX") | |
| _libc.prctl(PR_SET_VMA, PR_SET_VMA_ANON_NAME, | |
| addr, MAPPING_SIZE, ctypes.addressof(name_buf)) | |
| def drop_otel_ctx(): | |
| """Unmap the mapping and release the payload buffer.""" | |
| global _mapping_addr, _payload_buffer | |
| if _mapping_addr is not None: | |
| _libc.munmap(_mapping_addr, MAPPING_SIZE) | |
| _mapping_addr = None | |
| _payload_buffer = None | |
| # ── Example program (mirrors example_ctx.c main()) ─────────────────────────── | |
| RESOURCE_ATTRIBUTES = [ | |
| ("resource.key1", "resource.value1"), | |
| ("resource.key2", "resource.value2"), | |
| ] | |
| EXTRA_ATTRIBUTES = [ | |
| ("example_extra_attribute_foo", "example_extra_attribute_foo_value"), | |
| ] | |
| THREAD_CTX_CONFIG = { | |
| "schema_version": "tlsdesc_v1_dev", | |
| "attribute_key_map": ["http_route", "http_method", "user_id"], | |
| } | |
| def main(): | |
| payload = encode_process_context( | |
| service_name = "my-service", | |
| service_instance_id = "123d8444-2c7e-46e3-89f6-6217880f7123", | |
| deployment_environment_name = "prod", | |
| service_version = "4.5.6", | |
| telemetry_sdk_name = "example_ctx_python.py", | |
| telemetry_sdk_language = "python", | |
| telemetry_sdk_version = "1.2.3", | |
| resource_attributes = RESOURCE_ATTRIBUTES, | |
| extra_attributes = EXTRA_ATTRIBUTES, | |
| thread_ctx_config = THREAD_CTX_CONFIG, | |
| ) | |
| publish_otel_ctx(payload) | |
| pid = os.getpid() | |
| print(f"Published OTEL_CTX context (pid={pid})") | |
| print(f"TIP: You can now run: sudo ./otel_process_ctx_dump.sh {pid}") | |
| print("Press Ctrl+C to exit...") | |
| try: | |
| while True: | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| pass | |
| drop_otel_ctx() | |
| print("Context dropped.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment