Last active
January 20, 2026 13:46
-
-
Save jay0lee/8e4b09fbd070d3655362f22948292f5e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Android ClientState Security Patch Updater | |
| ========================================== | |
| Function: | |
| This script syncs the Android Security Patch level from the device inventory | |
| (Google Endpoint Management) into a custom "Client State" attribute. | |
| It writes a single value to a custom Client State on every device: | |
| - securityPatchLevel (String): e.g., "2023-10-05T00:00:00Z" | |
| (Defaults to "1970-01-01T00:00:00Z" if the device reports no value) | |
| This allows administrators to create Context-Aware Access (CAA) levels using | |
| the CEL `timestamp()` function to compare dates directly. | |
| ------------------------------------------------------------------------------- | |
| 1. SETUP & REQUIREMENTS | |
| ------------------------------------------------------------------------------- | |
| Install dependencies: | |
| $ pip3 install google-auth google-api-python-client | |
| ------------------------------------------------------------------------------- | |
| 2. AUTHENTICATION (Application Default Credentials) | |
| ------------------------------------------------------------------------------- | |
| This script relies on Application Default Credentials (ADC). | |
| Please refer to Google's documentation for secure setup: | |
| https://cloud.google.com/docs/authentication/provide-credentials-adc | |
| ------------------------------------------------------------------------------- | |
| 3. REQUIRED ADMIN PERMISSIONS (Delegated Admin Role) | |
| ------------------------------------------------------------------------------- | |
| The Service Account must be assigned a Delegated Admin Role within the | |
| Google Admin Console that contains the "Manage Devices and Settings" privilege. | |
| Instructions: | |
| 1. Go to the Google Admin Console (admin.google.com). | |
| 2. Navigate to **Account > Admin roles**. | |
| 3. Create a new Custom Role (e.g., "Device API Editor"). | |
| 4. Check the required privilege: | |
| > Services > Mobile Device Management > Manage Devices and Settings | |
| 5. Save the role. | |
| 6. Click **Assign service accounts** (or "Admins > Assign..."). | |
| 7. Add your Service Account's email address to this role. | |
| Reference: https://support.google.com/a/answer/2405986 | |
| ------------------------------------------------------------------------------- | |
| 4. EXAMPLE CONTEXT-AWARE ACCESS (CAA) LEVEL | |
| ------------------------------------------------------------------------------- | |
| The following examples demonstrate how to use the synced attribute in your | |
| Access Levels. Note that the vendor key uses the static prefix "customer-" | |
| regardless of your actual Customer ID number. | |
| // Android Patch Level no older than 180 days (180d) | |
| request.time - timestamp(device.vendors["customer-android-patch-level"].data["securityPatchLevel"]) < duration("180d") | |
| // Android Patch Level must be 2025-06-05 or newer | |
| timestamp("2025-06-05T00:00:00Z") <= timestamp(device.vendors["customer-android-patch-level"].data["securityPatchLevel"]) | |
| ------------------------------------------------------------------------------- | |
| 5. INVOCATION | |
| ------------------------------------------------------------------------------- | |
| $ python3 update_patch_level.py --customer C0123456 | |
| ------------------------------------------------------------------------------- | |
| """ | |
| import google.auth | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| import logging | |
| import re | |
| import argparse | |
| import time | |
| import random | |
| import socket | |
| import http.client | |
| from collections import defaultdict | |
| import concurrent.futures | |
| # Configure basic logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%H:%M:%S') | |
| def parse_arguments(): | |
| parser = argparse.ArgumentParser(description='Android ClientState Security Patch Updater') | |
| parser.add_argument( | |
| '--customer', type=str, required=True, | |
| help='The customer ID (e.g., C0123456). The script will ensure "customers/" is prepended.' | |
| ) | |
| parser.add_argument( | |
| '--client-state', type=str, default='android-patch-level', | |
| help='The suffix for the Client State ID. Defaults to "android-patch-level".' | |
| ) | |
| parser.add_argument( | |
| '--batch-size', type=int, default=1000, | |
| help='Number of requests to bundle in a single batch (Default: 1000)' | |
| ) | |
| parser.add_argument( | |
| '--debug', action='store_true', | |
| help='Enable verbose logging of HTTP requests.' | |
| ) | |
| return parser.parse_args() | |
| # --- RETRY LOGIC --- | |
| def make_api_call_with_retry(func, max_retries=7): | |
| for n in range(max_retries): | |
| try: | |
| return func() | |
| except HttpError as e: | |
| if e.resp.status in [429, 500, 502, 503, 504]: | |
| sleep_time = (2 ** n) + random.uniform(0, 1) | |
| time.sleep(sleep_time) | |
| else: raise e | |
| except (socket.timeout, ConnectionError): | |
| time.sleep(2 ** n) | |
| return func() | |
| # --- BATCH EXECUTION --- | |
| def execute_batch_with_backoff(service, updates, customer_id, max_retries=7): | |
| current_batch_items = updates | |
| for n in range(max_retries + 1): | |
| if not current_batch_items: break | |
| logging.info(f"Executing batch of {len(current_batch_items)} items (Attempt {n+1})...") | |
| retry_items = [] | |
| def batch_callback(request_id, response, exception): | |
| idx = int(request_id) | |
| original_item = current_batch_items[idx] | |
| if exception: | |
| # Retry on rate limits or server errors | |
| if isinstance(exception, HttpError) and exception.resp.status in [429, 500, 502, 503, 504]: | |
| retry_items.append(original_item) | |
| else: | |
| logging.error(f"PERMANENT FAILURE for {original_item['resource_name']}: {exception}") | |
| batch = service.new_batch_http_request(callback=batch_callback) | |
| for i, item in enumerate(current_batch_items): | |
| batch.add( | |
| service.devices().deviceUsers().clientStates().patch( | |
| name=item['resource_name'], | |
| customer=customer_id, | |
| body=item['body'], | |
| updateMask='keyValuePairs' | |
| ), | |
| request_id=str(i) | |
| ) | |
| try: | |
| make_api_call_with_retry(lambda: batch.execute()) | |
| except Exception as e: | |
| logging.error(f"Critical Batch Failure: {e}") | |
| retry_items = current_batch_items | |
| if retry_items: | |
| if n < max_retries: | |
| sleep_time = (2 ** n) + random.uniform(0, 1) | |
| logging.info(f"{len(retry_items)} items failed. Retrying in {sleep_time:.2f}s...") | |
| time.sleep(sleep_time) | |
| current_batch_items = retry_items | |
| else: | |
| logging.error(f"Max retries reached. Dropping {len(retry_items)} items.") | |
| else: | |
| current_batch_items = [] | |
| # --- THREAD-SAFE WORKERS --- | |
| def get_device_patch_data(creds, customer_id): | |
| """ | |
| Worker: Fetch all devices and their patch levels. | |
| Returns { 'devices/ID': '2023-01-01T00:00:00Z' } | |
| """ | |
| service = build('cloudidentity', 'v1', credentials=creds, cache_discovery=False) | |
| device_map = {} | |
| page_token = None | |
| logging.info(f"[Thread: Devices] Fetching all Android devices...") | |
| while True: | |
| try: | |
| request = service.devices().list( | |
| filter='type:android', customer=customer_id, pageSize=100, | |
| fields="nextPageToken,devices(name,securityPatchTime)", pageToken=page_token | |
| ) | |
| response = make_api_call_with_retry(lambda: request.execute()) | |
| devices = response.get('devices', []) | |
| for device in devices: | |
| name = device.get('name') | |
| # UPDATED: Default to full Unix Epoch Timestamp if unset | |
| patch_time_str = device.get('securityPatchTime', "1970-01-01T00:00:00Z") | |
| if name: | |
| device_map[name] = patch_time_str | |
| page_token = response.get('nextPageToken') | |
| if not page_token: break | |
| except Exception as e: | |
| logging.error(f"[Thread: Devices] Error: {e}") | |
| break | |
| logging.info(f"[Thread: Devices] Mapped {len(device_map)} devices.") | |
| return device_map | |
| def get_device_users_map(creds, customer_id): | |
| """ | |
| Worker: Fetch all device users. | |
| Returns { 'devices/ID': ['devices/ID/deviceUsers/UID', ...] } | |
| """ | |
| service = build('cloudidentity', 'v1', credentials=creds, cache_discovery=False) | |
| device_to_users = defaultdict(list) | |
| page_token = None | |
| logging.info(f"[Thread: Users] Fetching all Android DeviceUsers...") | |
| while True: | |
| try: | |
| request = service.devices().deviceUsers().list( | |
| parent='devices/-', | |
| filter='type:android', | |
| customer=customer_id, | |
| pageSize=20, # API Limit | |
| pageToken=page_token | |
| ) | |
| response = make_api_call_with_retry(lambda: request.execute()) | |
| users = response.get('deviceUsers', []) | |
| for user in users: | |
| user_name = user.get('name') | |
| if user_name: | |
| # extract 'devices/ID' from 'devices/ID/deviceUsers/UID' | |
| match = re.match(r'(devices/[^/]+)/deviceUsers/.*', user_name) | |
| if match: | |
| parent_device = match.group(1) | |
| device_to_users[parent_device].append(user_name) | |
| page_token = response.get('nextPageToken') | |
| if not page_token: break | |
| except Exception as e: | |
| logging.error(f"[Thread: Users] Error: {e}") | |
| break | |
| logging.info(f"[Thread: Users] Mapped users for {len(device_to_users)} devices.") | |
| return device_to_users | |
| def get_existing_client_states_map(creds, customer_id, target_suffix): | |
| """ | |
| Worker: Fetch all existing ClientStates matching our suffix. | |
| Returns { 'resource_name': '2023-01-01T00:00:00Z' } | |
| """ | |
| service = build('cloudidentity', 'v1', credentials=creds, cache_discovery=False) | |
| states_map = {} | |
| page_token = None | |
| logging.info(f"[Thread: States] Fetching all ClientStates...") | |
| while True: | |
| try: | |
| request = service.devices().deviceUsers().clientStates().list( | |
| parent='devices/-/deviceUsers/-', | |
| filter='type:android', | |
| customer=customer_id, | |
| pageToken=page_token | |
| ) | |
| response = make_api_call_with_retry(lambda: request.execute()) | |
| batch = response.get('clientStates', []) | |
| for state in batch: | |
| name = state.get('name') | |
| if name and name.endswith(target_suffix): | |
| kv_pairs = state.get('keyValuePairs', {}) | |
| # Default to '0' here just to ensure we catch mismatches against the 1970 date | |
| stored_val = kv_pairs.get('securityPatchLevel', {}).get('stringValue', "0") | |
| states_map[name] = stored_val | |
| page_token = response.get('nextPageToken') | |
| if not page_token: break | |
| except Exception as e: | |
| logging.error(f"[Thread: States] Error: {e}") | |
| break | |
| logging.info(f"[Thread: States] Found {len(states_map)} existing target states.") | |
| return states_map | |
| def main(): | |
| args = parse_arguments() | |
| if args.debug: | |
| logging.getLogger().setLevel(logging.DEBUG) | |
| http.client.HTTPConnection.debuglevel = 1 | |
| logging.info("Verbose HTTP logging enabled.") | |
| # --- ID CLEANING --- | |
| input_id = args.customer | |
| if input_id.startswith('customers/'): | |
| input_id = input_id[len('customers/'):] | |
| if input_id.startswith('C'): | |
| raw_id_no_c = input_id[1:] | |
| else: | |
| raw_id_no_c = input_id | |
| api_customer_param = f'customers/{raw_id_no_c}' | |
| target_state_suffix = f"{raw_id_no_c}-{args.client_state}" | |
| logging.info(f"Targeting: {api_customer_param} | ClientState Suffix: {target_state_suffix}") | |
| # Build credentials ONCE | |
| creds, _ = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-identity.devices']) | |
| # --- 1. PARALLEL FETCH --- | |
| logging.info("Starting parallel data fetch...") | |
| start_time = time.time() | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: | |
| future_devices = executor.submit(get_device_patch_data, creds, api_customer_param) | |
| future_users = executor.submit(get_device_users_map, creds, api_customer_param) | |
| future_states = executor.submit(get_existing_client_states_map, creds, api_customer_param, target_state_suffix) | |
| device_patch_map = future_devices.result() | |
| device_users_map = future_users.result() | |
| existing_states_map = future_states.result() | |
| duration = time.time() - start_time | |
| logging.info(f"Data fetch completed in {duration:.2f} seconds.") | |
| if not device_patch_map: | |
| logging.info("No Android devices found. Exiting.") | |
| return | |
| # --- 2. CALCULATE UPDATES --- | |
| updates_queue = [] | |
| logging.info("Calculating required updates...") | |
| for device_name, patch_val in device_patch_map.items(): | |
| users = device_users_map.get(device_name, []) | |
| if not users: | |
| # Device has no users (e.g. newly enrolled or wiped). Cannot set ClientState yet. | |
| logging.debug(f"Device {device_name} has no users. Skipping.") | |
| continue | |
| for user_name in users: | |
| expected_state_name = f"{user_name}/clientStates/{target_state_suffix}" | |
| def create_update_entry(resource): | |
| return { | |
| 'resource_name': resource, | |
| 'body': { | |
| 'keyValuePairs': { | |
| 'securityPatchLevel': {'stringValue': patch_val} | |
| } | |
| } | |
| } | |
| if expected_state_name in existing_states_map: | |
| current_val = existing_states_map[expected_state_name] | |
| if current_val != patch_val: | |
| updates_queue.append(create_update_entry(expected_state_name)) | |
| else: | |
| # Create if state doesn't exist | |
| updates_queue.append(create_update_entry(expected_state_name)) | |
| # --- 3. BATCH EXECUTION --- | |
| main_service = build('cloudidentity', 'v1', credentials=creds, cache_discovery=False) | |
| if not updates_queue: | |
| logging.info("No updates needed.") | |
| return | |
| logging.info(f"Processing {len(updates_queue)} operations...") | |
| for i in range(0, len(updates_queue), args.batch_size): | |
| chunk = updates_queue[i : i + args.batch_size] | |
| execute_batch_with_backoff(main_service, chunk, api_customer_param) | |
| logging.info("Process complete.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment