|
#!/usr/bin/env python3 |
|
""" |
|
Fetch IP delegations from all 5 RIRs and extract CIDRs for a specific country. |
|
Handles non-power-of-2 allocations by expanding into multiple CIDRs. |
|
|
|
Usage: |
|
python get_country_cidrs.py IR # IPv4 to stdout |
|
python get_country_cidrs.py IR -o IR.txt # IPv4 to file |
|
python get_country_cidrs.py US -6 -o US_v6.txt # IPv6 only |
|
python get_country_cidrs.py DE -4 -6 # Both IPv4 and IPv6 |
|
python get_country_cidrs.py CN -q # Quiet mode |
|
""" |
|
|
|
import sys |
|
import argparse |
|
import urllib.request |
|
from ipaddress import summarize_address_range, ip_address |
|
|
|
RIRS = [ |
|
"https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-latest", |
|
"https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest", |
|
"https://ftp.apnic.net/pub/stats/apnic/delegated-apnic-latest", |
|
"https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-latest", |
|
"https://ftp.afrinic.net/pub/stats/afrinic/delegated-afrinic-latest", |
|
] |
|
|
|
|
|
def fetch_delegations(log=print): |
|
"""Fetch and combine all RIR delegation files.""" |
|
lines = [] |
|
for url in RIRS: |
|
log(f"Fetching {url.split('/')[2]}...") |
|
try: |
|
with urllib.request.urlopen(url, timeout=30) as resp: |
|
lines.extend(resp.read().decode('utf-8', errors='ignore').splitlines()) |
|
except Exception as e: |
|
log(f" Warning: {e}") |
|
return lines |
|
|
|
|
|
def parse_delegations(lines, country_code, include_v4=True, include_v6=False): |
|
"""Parse delegation lines and yield CIDRs for the given country.""" |
|
country_code = country_code.upper() |
|
|
|
for line in lines: |
|
if line.startswith('#') or '|' not in line: |
|
continue |
|
|
|
parts = line.split('|') |
|
if len(parts) < 5: |
|
continue |
|
|
|
# Format: registry|CC|type|start|value|date|status |
|
cc = parts[1] |
|
ip_type = parts[2] |
|
start = parts[3] |
|
value = parts[4] |
|
|
|
if cc != country_code: |
|
continue |
|
|
|
if ip_type == 'ipv4' and include_v4: |
|
try: |
|
count = int(value) |
|
start_ip = ip_address(start) |
|
end_ip = start_ip + count - 1 |
|
# summarize_address_range handles non-power-of-2 correctly |
|
for cidr in summarize_address_range(start_ip, end_ip): |
|
yield str(cidr) |
|
except (ValueError, TypeError): |
|
continue |
|
|
|
elif ip_type == 'ipv6' and include_v6: |
|
try: |
|
prefix_len = int(value) |
|
yield f"{start}/{prefix_len}" |
|
except (ValueError, TypeError): |
|
continue |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description='Fetch IP delegations from all 5 RIRs and extract CIDRs for a specific country.', |
|
epilog='Example: %(prog)s IR -o IR.txt' |
|
) |
|
parser.add_argument('country', metavar='CC', help='ISO 3166-1 alpha-2 country code (e.g., IR, US, DE)') |
|
parser.add_argument('-4', '--ipv4', action='store_true', help='Include IPv4 (default if neither specified)') |
|
parser.add_argument('-6', '--ipv6', action='store_true', help='Include IPv6') |
|
parser.add_argument('-o', '--output', metavar='FILE', help='Output file (default: stdout)') |
|
parser.add_argument('-q', '--quiet', action='store_true', help='Suppress progress messages') |
|
|
|
args = parser.parse_args() |
|
|
|
# Default to IPv4 if neither specified |
|
include_v4 = args.ipv4 or not args.ipv6 |
|
include_v6 = args.ipv6 |
|
|
|
log = (lambda *a: None) if args.quiet else (lambda *a: print(*a, file=sys.stderr)) |
|
|
|
lines = fetch_delegations(log) |
|
log(f"Parsing for {args.country.upper()}...") |
|
|
|
cidrs = list(parse_delegations(lines, args.country, include_v4, include_v6)) |
|
log(f"Found {len(cidrs)} CIDRs") |
|
|
|
output = open(args.output, 'w') if args.output else sys.stdout |
|
try: |
|
for cidr in cidrs: |
|
output.write(cidr + '\n') |
|
finally: |
|
if args.output: |
|
output.close() |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |