Created
January 12, 2026 21:32
-
-
Save keithrbennett/4f29f770dc9051bd86a2f0e93764d529 to your computer and use it in GitHub Desktop.
dnsruby script for discussion at https://github.com/alexdalitz/dnsruby/pull/219
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env ruby | |
| # frozen_string_literal: true | |
| # | |
| # Purpose | |
| # ------- | |
| # Demonstrate whether tcp-pipelined sockets are fully cleaned up in Dnsruby | |
| # when the remote end closes the connection early (EOF before a response). | |
| # | |
| # Why this matters | |
| # ---------------- | |
| # SelectThread keeps several global (class-level) hashes keyed by Socket | |
| # objects. If a socket is closed but its object remains referenced in these | |
| # hashes, it cannot be garbage-collected and will accumulate over time. | |
| # | |
| # This script: | |
| # 1) Starts a tiny TCP server that accepts a connection and then closes | |
| # immediately (no response), simulating an early EOF. | |
| # 2) Sends repeated TCP queries with tcp_pipelining enabled. | |
| # 3) After each query, dumps SelectThread internal state and ObjectSpace | |
| # socket counts, highlighting any closed sockets still referenced. | |
| # | |
| # Run: | |
| # ruby socket_cleanup_probe.rb | |
| # | |
| # Notes: | |
| # - This script uses local Dnsruby code via ./lib. | |
| # - Output is verbose by design to make object lifetimes visible. | |
| require 'socket' | |
| require 'timeout' | |
| require 'objspace' | |
| require 'yaml' | |
| $LOAD_PATH.unshift(File.expand_path('lib', __dir__)) | |
| require 'dnsruby' | |
| def log(msg) | |
| puts(msg) | |
| end | |
| def safe_closed?(sock) | |
| sock.closed? | |
| rescue StandardError | |
| true | |
| end | |
| def safe_fileno(sock) | |
| sock.fileno | |
| rescue StandardError | |
| nil | |
| end | |
| def select_thread_state | |
| { | |
| sockets: Dnsruby::SelectThread.class_variable_get(:@@sockets), | |
| socket_hash: Dnsruby::SelectThread.class_variable_get(:@@socket_hash), | |
| socket_is_persistent: Dnsruby::SelectThread.class_variable_get(:@@socket_is_persistent), | |
| socket_remaining_queries: Dnsruby::SelectThread.class_variable_get(:@@socket_remaining_queries), | |
| tcp_buffers: Dnsruby::SelectThread.class_variable_get(:@@tcp_buffers), | |
| query_hash: Dnsruby::SelectThread.class_variable_get(:@@query_hash), | |
| timeouts: Dnsruby::SelectThread.class_variable_get(:@@timeouts) | |
| } | |
| end | |
| def objectspace_socket_counts | |
| counts = { | |
| tcpsocket_total: 0, | |
| tcpsocket_closed: 0, | |
| udpsocket_total: 0 | |
| } | |
| ObjectSpace.each_object(TCPSocket) do |s| | |
| counts[:tcpsocket_total] += 1 | |
| counts[:tcpsocket_closed] += 1 if safe_closed?(s) | |
| end | |
| ObjectSpace.each_object(UDPSocket) do |_s| | |
| counts[:udpsocket_total] += 1 | |
| end | |
| counts | |
| end | |
| def dump_state(tag, previous_ids) | |
| state = select_thread_state | |
| socket_hash_keys = state[:socket_hash].keys | |
| socket_set = state[:sockets].to_a | |
| closed_in_socket_hash = socket_hash_keys.count { |s| safe_closed?(s) } | |
| current_ids = socket_hash_keys.map(&:object_id).sort | |
| added = current_ids - previous_ids | |
| removed = previous_ids - current_ids | |
| hash_only = socket_hash_keys - socket_set | |
| os_counts = objectspace_socket_counts | |
| closed_socket_details = [] | |
| if closed_in_socket_hash > 0 | |
| socket_hash_keys.each do |sock| | |
| next unless safe_closed?(sock) | |
| closed_socket_details << { | |
| object_id: sock.object_id, | |
| fileno: safe_fileno(sock), | |
| persistent: state[:socket_is_persistent][sock], | |
| remaining: state[:socket_remaining_queries][sock] | |
| } | |
| end | |
| end | |
| { | |
| tag: tag, | |
| ids: current_ids, | |
| closed_in_socket_hash: closed_in_socket_hash, | |
| hash_only_count: hash_only.size, | |
| deltas: { added: added.size, removed: removed.size }, | |
| select_thread_counts: { | |
| sockets: state[:sockets].size, | |
| socket_hash: state[:socket_hash].size, | |
| query_hash: state[:query_hash].size, | |
| timeouts: state[:timeouts].size, | |
| tcp_buffers: state[:tcp_buffers].size, | |
| socket_is_persistent: state[:socket_is_persistent].size, | |
| socket_remaining_queries: state[:socket_remaining_queries].size | |
| }, | |
| objectspace: os_counts, | |
| closed_socket_details: closed_socket_details | |
| } | |
| end | |
| # ----------------------------------------------------------------------------- | |
| # 1) Start a tiny TCP server that reads a query and then closes immediately. | |
| # ----------------------------------------------------------------------------- | |
| PORT = 53_353 | |
| server = TCPServer.new('127.0.0.1', PORT) | |
| server_abort = false | |
| server_thread = Thread.new do | |
| Thread.current.name = 'probe-server' if Thread.current.respond_to?(:name=) | |
| until server_abort | |
| begin | |
| client = server.accept | |
| rescue IOError | |
| break | |
| end | |
| Thread.new(client) do |sock| | |
| begin | |
| # DNS over TCP: 2-byte length prefix followed by the query bytes. | |
| # We consume the query and then close without responding. | |
| len_bytes = sock.read(2) | |
| if len_bytes && len_bytes.bytesize == 2 | |
| msg_len = len_bytes.unpack1('n') | |
| sock.read(msg_len) if msg_len && msg_len > 0 | |
| end | |
| rescue StandardError | |
| # Intentionally ignore errors; we are simulating a flaky server. | |
| ensure | |
| sock.close rescue nil | |
| end | |
| end | |
| end | |
| end | |
| log("Started probe server on 127.0.0.1:#{PORT} (reads query, then closes without reply)") | |
| # ----------------------------------------------------------------------------- | |
| # 2) Configure a resolver with TCP pipelining enabled. | |
| # ----------------------------------------------------------------------------- | |
| resolver = Dnsruby::Resolver.new( | |
| nameserver: '127.0.0.1', | |
| port: PORT, | |
| use_tcp: true, | |
| tcp_pipelining: true, | |
| tcp_pipelining_max_queries: 1 | |
| ) | |
| resolver.src_address = '127.0.0.1' | |
| resolver.packet_timeout = 0.5 | |
| resolver.query_timeout = 0.5 | |
| queue = Queue.new | |
| # Prime SelectThread singleton | |
| Dnsruby::SelectThread.instance | |
| log('Initial state:') | |
| snapshot = dump_state('initial', []) | |
| puts(YAML.dump(snapshot)) | |
| previous_ids = snapshot[:ids] | |
| max_closed_in_hash = snapshot[:closed_in_socket_hash] | |
| max_hash_only = snapshot[:hash_only_count] | |
| # ----------------------------------------------------------------------------- | |
| # 3) Send repeated queries; after each, inspect internal state. | |
| # ----------------------------------------------------------------------------- | |
| ITERATIONS = 10 | |
| event_counts = Hash.new(0) | |
| ITERATIONS.times do |i| | |
| log("--- iteration #{i + 1} / #{ITERATIONS} ---") | |
| msg = Dnsruby::Message.new('example.com', Dnsruby::Types::A) | |
| resolver.send_async(msg, queue) | |
| # Wait for the error response that SelectThread should emit after EOF. | |
| begin | |
| Timeout.timeout(2) do | |
| event = queue.pop | |
| kind = event[2].class.name rescue 'unknown' | |
| event_counts[kind] += 1 | |
| log("queue.pop -> #{kind}: #{event[2].inspect}") | |
| end | |
| rescue Timeout::Error | |
| log('queue.pop timed out (no response)') | |
| event_counts['timeout'] += 1 | |
| end | |
| # Give the select thread time to process cleanup paths. | |
| sleep(0.2) | |
| GC.start | |
| snapshot = dump_state("after iteration #{i + 1}", previous_ids) | |
| previous_ids = snapshot[:ids] | |
| max_closed_in_hash = [max_closed_in_hash, snapshot[:closed_in_socket_hash]].max | |
| max_hash_only = [max_hash_only, snapshot[:hash_only_count]].max | |
| snapshot[:event_counts] = event_counts.dup | |
| puts(YAML.dump(snapshot)) | |
| end | |
| # ----------------------------------------------------------------------------- | |
| # 4) Final cleanup and summary. | |
| # ----------------------------------------------------------------------------- | |
| resolver.close rescue nil | |
| server_abort = true | |
| server.close rescue nil | |
| server_thread.join(1) | |
| log('Final state after resolver.close and server shutdown:') | |
| final_snapshot = dump_state('final', previous_ids) | |
| final_snapshot[:event_counts] = event_counts.dup | |
| puts(YAML.dump(final_snapshot)) | |
| log("Event summary: #{event_counts.map { |k, v| "#{k}=#{v}" }.sort.join(', ')}") | |
| # Final verdict: a quick, human-readable conclusion. | |
| if max_closed_in_hash > 0 | |
| log("Conclusion: LEAK LIKELY - observed #{max_closed_in_hash} closed socket(s) still referenced in @@socket_hash.") | |
| elsif max_hash_only > 0 | |
| log("Conclusion: INCONCLUSIVE - sockets in @@socket_hash not present in @@sockets (max #{max_hash_only}).") | |
| elsif final_snapshot[:ids].empty? | |
| log('Conclusion: CLEAN - no sockets left in @@socket_hash.') | |
| else | |
| log("Conclusion: CLEAN-ish - @@socket_hash has #{final_snapshot[:ids].size} live socket(s), none closed.") | |
| end | |
| log('Done.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment