Skip to content

Instantly share code, notes, and snippets.

@mzhang77
Created January 19, 2026 01:57
Show Gist options
  • Select an option

  • Save mzhang77/99fd9b779a7c36476b828187d97d004a to your computer and use it in GitHub Desktop.

Select an option

Save mzhang77/99fd9b779a7c36476b828187d97d004a to your computer and use it in GitHub Desktop.
import pymysql
import random
import time
import binascii
from datetime import datetime, timedelta
# --- Database Configuration ---
DB_CONFIG = {
'host': '127.0.0.1',
'port': 4000,
'user': 'root',
'password': '',
'database': 'test',
'autocommit': True
}
# --- Settings ---
TOTAL_ROWS = 500_000 # Sufficient to show performance gap
BATCH_SIZE = 2000
# --- Hardcoded Values (Sanitized) ---
# We use specific values to simulate "hot" records that match the WHERE clause
TARGET_USER_UUID = binascii.unhexlify('11111111222233334444555566667777')
TARGET_DEVICE_CODES = ['device_code_A_001', 'device_code_B_002']
TARGET_GROUP_UUID = binascii.unhexlify('AAAAAAAABBBBCCCCDDDDEEEEFFFF0000')
TARGET_SOURCE_ID = binascii.unhexlify('1234567890AB')
TARGET_SOURCE_HASH = binascii.unhexlify('ABCDEF1234567890' * 4) # 64 bytes
TARGET_REF_ID = 88888888
def get_connection():
return pymysql.connect(**DB_CONFIG)
def random_hex_bytes(length):
return random.randbytes(length)
def random_string(length):
# Simple alphanumeric string
chars = 'abcdefghijklmnopqrstuvwxyz0123456789'
return ''.join(random.choices(chars, k=length))
def generate_batch(start_time, batch_size):
batch_data = []
for i in range(batch_size):
# Random timestamp within last 30 days
delta = random.randint(0, 30 * 24 * 3600)
ts = start_time - timedelta(seconds=delta)
ts_str = ts.strftime('%Y-%m-%d %H:%M:%S.%f')
event_uuid = random_hex_bytes(16)
# --- Injection Logic ---
# 20% chance to match one of the OR conditions to create high scan volume
dice = random.random()
user_uuid = random_hex_bytes(16)
device_code = random_string(32)
group_uuid = random_hex_bytes(16)
source_id = random_hex_bytes(12)
source_hash = random_hex_bytes(64)
ref_id = random.randint(1, 1000000)
if dice < 0.20:
user_uuid = TARGET_USER_UUID
elif dice < 0.40:
device_code = random.choice(TARGET_DEVICE_CODES)
elif dice < 0.50:
group_uuid = TARGET_GROUP_UUID
elif dice < 0.60:
source_id = TARGET_SOURCE_ID
source_hash = TARGET_SOURCE_HASH
elif dice < 0.70:
ref_id = TARGET_REF_ID
# Padding fields
session_uuid = random_hex_bytes(16)
batch_data.append((
event_uuid, ts_str, user_uuid, source_hash, device_code,
group_uuid, source_id, ref_id, session_uuid
))
return batch_data
def main():
conn = get_connection()
cursor = conn.cursor()
print("Preparing sanitized reproduction environment...")
try:
# 1. Setup Table
cursor.execute("DROP TABLE IF EXISTS audit_log_events")
# Use the sanitized schema provided above
create_sql = """
CREATE TABLE `audit_log_events` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`event_uuid` varbinary(16) NOT NULL,
`created_at` timestamp(6) NOT NULL,
`user_uuid` binary(16) DEFAULT NULL,
`source_hash` binary(64) DEFAULT NULL,
`device_code` varchar(32) DEFAULT NULL,
`group_uuid` binary(16) DEFAULT NULL,
`source_id` binary(12) DEFAULT NULL,
`external_ref_id` bigint(20) unsigned DEFAULT NULL,
`trust_score` smallint(5) unsigned DEFAULT NULL,
`session_uuid` binary(16) NOT NULL,
`ip_address` varchar(16) DEFAULT NULL,
`user_agent` varchar(1024) DEFAULT NULL,
`ref_id_1` bigint(20) unsigned DEFAULT NULL,
`meta_id_1` varchar(16) DEFAULT NULL,
`meta_id_2` varchar(16) DEFAULT NULL,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */,
UNIQUE KEY `uk_time_event` (`created_at`,`event_uuid`),
KEY `idx_created_at` (`created_at`),
KEY `idx_user_ts` (`user_uuid`,`created_at`,`event_uuid`),
KEY `idx_device_ts` (`device_code`,`created_at`,`event_uuid`),
KEY `idx_group_ts` (`group_uuid`,`created_at`,`event_uuid`),
KEY `idx_source_ts` (`source_id`,`source_hash`,`created_at`,`event_uuid`),
KEY `idx_ref_ts` (`external_ref_id`,`created_at`,`event_uuid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
"""
cursor.execute(create_sql)
print("Table `audit_log_events` created.")
# 2. Insert Data
print(f"Inserting {TOTAL_ROWS} rows...")
start_time = time.time()
now = datetime.now()
insert_sql = """
INSERT INTO audit_log_events
(event_uuid, created_at, user_uuid, source_hash, device_code,
group_uuid, source_id, external_ref_id, session_uuid)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
for i in range(0, TOTAL_ROWS, BATCH_SIZE):
batch = generate_batch(now, BATCH_SIZE)
cursor.executemany(insert_sql, batch)
if (i + BATCH_SIZE) % 50000 == 0:
print(f" Inserted {i + BATCH_SIZE} rows...")
print(f"Data insertion complete. Time: {time.time() - start_time:.2f}s")
# 3. Analyze Table
print("Analyzing table statistics...")
cursor.execute("ANALYZE TABLE audit_log_events")
print("Done.")
except Exception as e:
print(f"Error: {e}")
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment