Skip to content

Instantly share code, notes, and snippets.

@jay0lee
Last active January 20, 2026 13:46
Show Gist options
  • Select an option

  • Save jay0lee/8e4b09fbd070d3655362f22948292f5e to your computer and use it in GitHub Desktop.

Select an option

Save jay0lee/8e4b09fbd070d3655362f22948292f5e to your computer and use it in GitHub Desktop.
"""
Android ClientState Security Patch Updater
==========================================
Function:
This script syncs the Android Security Patch level from the device inventory
(Google Endpoint Management) into a custom "Client State" attribute.
It writes a single value to a custom Client State on every device:
- securityPatchLevel (String): e.g., "2023-10-05T00:00:00Z"
(Defaults to "1970-01-01T00:00:00Z" if the device reports no value)
This allows administrators to create Context-Aware Access (CAA) levels using
the CEL `timestamp()` function to compare dates directly.
-------------------------------------------------------------------------------
1. SETUP & REQUIREMENTS
-------------------------------------------------------------------------------
Install dependencies:
$ pip3 install google-auth google-api-python-client
-------------------------------------------------------------------------------
2. AUTHENTICATION (Application Default Credentials)
-------------------------------------------------------------------------------
This script relies on Application Default Credentials (ADC).
Please refer to Google's documentation for secure setup:
https://cloud.google.com/docs/authentication/provide-credentials-adc
-------------------------------------------------------------------------------
3. REQUIRED ADMIN PERMISSIONS (Delegated Admin Role)
-------------------------------------------------------------------------------
The Service Account must be assigned a Delegated Admin Role within the
Google Admin Console that contains the "Manage Devices and Settings" privilege.
Instructions:
1. Go to the Google Admin Console (admin.google.com).
2. Navigate to **Account > Admin roles**.
3. Create a new Custom Role (e.g., "Device API Editor").
4. Check the required privilege:
> Services > Mobile Device Management > Manage Devices and Settings
5. Save the role.
6. Click **Assign service accounts** (or "Admins > Assign...").
7. Add your Service Account's email address to this role.
Reference: https://support.google.com/a/answer/2405986
-------------------------------------------------------------------------------
4. EXAMPLE CONTEXT-AWARE ACCESS (CAA) LEVEL
-------------------------------------------------------------------------------
The following examples demonstrate how to use the synced attribute in your
Access Levels. Note that the vendor key uses the static prefix "customer-"
regardless of your actual Customer ID number.
// Android Patch Level no older than 180 days (180d)
request.time - timestamp(device.vendors["customer-android-patch-level"].data["securityPatchLevel"]) < duration("180d")
// Android Patch Level must be 2025-06-05 or newer
timestamp("2025-06-05T00:00:00Z") <= timestamp(device.vendors["customer-android-patch-level"].data["securityPatchLevel"])
-------------------------------------------------------------------------------
5. INVOCATION
-------------------------------------------------------------------------------
$ python3 update_patch_level.py --customer C0123456
-------------------------------------------------------------------------------
"""
import google.auth
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import logging
import re
import argparse
import time
import random
import socket
import http.client
from collections import defaultdict
import concurrent.futures
# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%H:%M:%S')
def parse_arguments():
parser = argparse.ArgumentParser(description='Android ClientState Security Patch Updater')
parser.add_argument(
'--customer', type=str, required=True,
help='The customer ID (e.g., C0123456). The script will ensure "customers/" is prepended.'
)
parser.add_argument(
'--client-state', type=str, default='android-patch-level',
help='The suffix for the Client State ID. Defaults to "android-patch-level".'
)
parser.add_argument(
'--batch-size', type=int, default=1000,
help='Number of requests to bundle in a single batch (Default: 1000)'
)
parser.add_argument(
'--debug', action='store_true',
help='Enable verbose logging of HTTP requests.'
)
return parser.parse_args()
# --- RETRY LOGIC ---
def make_api_call_with_retry(func, max_retries=7):
for n in range(max_retries):
try:
return func()
except HttpError as e:
if e.resp.status in [429, 500, 502, 503, 504]:
sleep_time = (2 ** n) + random.uniform(0, 1)
time.sleep(sleep_time)
else: raise e
except (socket.timeout, ConnectionError):
time.sleep(2 ** n)
return func()
# --- BATCH EXECUTION ---
def execute_batch_with_backoff(service, updates, customer_id, max_retries=7):
current_batch_items = updates
for n in range(max_retries + 1):
if not current_batch_items: break
logging.info(f"Executing batch of {len(current_batch_items)} items (Attempt {n+1})...")
retry_items = []
def batch_callback(request_id, response, exception):
idx = int(request_id)
original_item = current_batch_items[idx]
if exception:
# Retry on rate limits or server errors
if isinstance(exception, HttpError) and exception.resp.status in [429, 500, 502, 503, 504]:
retry_items.append(original_item)
else:
logging.error(f"PERMANENT FAILURE for {original_item['resource_name']}: {exception}")
batch = service.new_batch_http_request(callback=batch_callback)
for i, item in enumerate(current_batch_items):
batch.add(
service.devices().deviceUsers().clientStates().patch(
name=item['resource_name'],
customer=customer_id,
body=item['body'],
updateMask='keyValuePairs'
),
request_id=str(i)
)
try:
make_api_call_with_retry(lambda: batch.execute())
except Exception as e:
logging.error(f"Critical Batch Failure: {e}")
retry_items = current_batch_items
if retry_items:
if n < max_retries:
sleep_time = (2 ** n) + random.uniform(0, 1)
logging.info(f"{len(retry_items)} items failed. Retrying in {sleep_time:.2f}s...")
time.sleep(sleep_time)
current_batch_items = retry_items
else:
logging.error(f"Max retries reached. Dropping {len(retry_items)} items.")
else:
current_batch_items = []
# --- THREAD-SAFE WORKERS ---
def get_device_patch_data(creds, customer_id):
"""
Worker: Fetch all devices and their patch levels.
Returns { 'devices/ID': '2023-01-01T00:00:00Z' }
"""
service = build('cloudidentity', 'v1', credentials=creds, cache_discovery=False)
device_map = {}
page_token = None
logging.info(f"[Thread: Devices] Fetching all Android devices...")
while True:
try:
request = service.devices().list(
filter='type:android', customer=customer_id, pageSize=100,
fields="nextPageToken,devices(name,securityPatchTime)", pageToken=page_token
)
response = make_api_call_with_retry(lambda: request.execute())
devices = response.get('devices', [])
for device in devices:
name = device.get('name')
# UPDATED: Default to full Unix Epoch Timestamp if unset
patch_time_str = device.get('securityPatchTime', "1970-01-01T00:00:00Z")
if name:
device_map[name] = patch_time_str
page_token = response.get('nextPageToken')
if not page_token: break
except Exception as e:
logging.error(f"[Thread: Devices] Error: {e}")
break
logging.info(f"[Thread: Devices] Mapped {len(device_map)} devices.")
return device_map
def get_device_users_map(creds, customer_id):
"""
Worker: Fetch all device users.
Returns { 'devices/ID': ['devices/ID/deviceUsers/UID', ...] }
"""
service = build('cloudidentity', 'v1', credentials=creds, cache_discovery=False)
device_to_users = defaultdict(list)
page_token = None
logging.info(f"[Thread: Users] Fetching all Android DeviceUsers...")
while True:
try:
request = service.devices().deviceUsers().list(
parent='devices/-',
filter='type:android',
customer=customer_id,
pageSize=20, # API Limit
pageToken=page_token
)
response = make_api_call_with_retry(lambda: request.execute())
users = response.get('deviceUsers', [])
for user in users:
user_name = user.get('name')
if user_name:
# extract 'devices/ID' from 'devices/ID/deviceUsers/UID'
match = re.match(r'(devices/[^/]+)/deviceUsers/.*', user_name)
if match:
parent_device = match.group(1)
device_to_users[parent_device].append(user_name)
page_token = response.get('nextPageToken')
if not page_token: break
except Exception as e:
logging.error(f"[Thread: Users] Error: {e}")
break
logging.info(f"[Thread: Users] Mapped users for {len(device_to_users)} devices.")
return device_to_users
def get_existing_client_states_map(creds, customer_id, target_suffix):
"""
Worker: Fetch all existing ClientStates matching our suffix.
Returns { 'resource_name': '2023-01-01T00:00:00Z' }
"""
service = build('cloudidentity', 'v1', credentials=creds, cache_discovery=False)
states_map = {}
page_token = None
logging.info(f"[Thread: States] Fetching all ClientStates...")
while True:
try:
request = service.devices().deviceUsers().clientStates().list(
parent='devices/-/deviceUsers/-',
filter='type:android',
customer=customer_id,
pageToken=page_token
)
response = make_api_call_with_retry(lambda: request.execute())
batch = response.get('clientStates', [])
for state in batch:
name = state.get('name')
if name and name.endswith(target_suffix):
kv_pairs = state.get('keyValuePairs', {})
# Default to '0' here just to ensure we catch mismatches against the 1970 date
stored_val = kv_pairs.get('securityPatchLevel', {}).get('stringValue', "0")
states_map[name] = stored_val
page_token = response.get('nextPageToken')
if not page_token: break
except Exception as e:
logging.error(f"[Thread: States] Error: {e}")
break
logging.info(f"[Thread: States] Found {len(states_map)} existing target states.")
return states_map
def main():
args = parse_arguments()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
http.client.HTTPConnection.debuglevel = 1
logging.info("Verbose HTTP logging enabled.")
# --- ID CLEANING ---
input_id = args.customer
if input_id.startswith('customers/'):
input_id = input_id[len('customers/'):]
if input_id.startswith('C'):
raw_id_no_c = input_id[1:]
else:
raw_id_no_c = input_id
api_customer_param = f'customers/{raw_id_no_c}'
target_state_suffix = f"{raw_id_no_c}-{args.client_state}"
logging.info(f"Targeting: {api_customer_param} | ClientState Suffix: {target_state_suffix}")
# Build credentials ONCE
creds, _ = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-identity.devices'])
# --- 1. PARALLEL FETCH ---
logging.info("Starting parallel data fetch...")
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_devices = executor.submit(get_device_patch_data, creds, api_customer_param)
future_users = executor.submit(get_device_users_map, creds, api_customer_param)
future_states = executor.submit(get_existing_client_states_map, creds, api_customer_param, target_state_suffix)
device_patch_map = future_devices.result()
device_users_map = future_users.result()
existing_states_map = future_states.result()
duration = time.time() - start_time
logging.info(f"Data fetch completed in {duration:.2f} seconds.")
if not device_patch_map:
logging.info("No Android devices found. Exiting.")
return
# --- 2. CALCULATE UPDATES ---
updates_queue = []
logging.info("Calculating required updates...")
for device_name, patch_val in device_patch_map.items():
users = device_users_map.get(device_name, [])
if not users:
# Device has no users (e.g. newly enrolled or wiped). Cannot set ClientState yet.
logging.debug(f"Device {device_name} has no users. Skipping.")
continue
for user_name in users:
expected_state_name = f"{user_name}/clientStates/{target_state_suffix}"
def create_update_entry(resource):
return {
'resource_name': resource,
'body': {
'keyValuePairs': {
'securityPatchLevel': {'stringValue': patch_val}
}
}
}
if expected_state_name in existing_states_map:
current_val = existing_states_map[expected_state_name]
if current_val != patch_val:
updates_queue.append(create_update_entry(expected_state_name))
else:
# Create if state doesn't exist
updates_queue.append(create_update_entry(expected_state_name))
# --- 3. BATCH EXECUTION ---
main_service = build('cloudidentity', 'v1', credentials=creds, cache_discovery=False)
if not updates_queue:
logging.info("No updates needed.")
return
logging.info(f"Processing {len(updates_queue)} operations...")
for i in range(0, len(updates_queue), args.batch_size):
chunk = updates_queue[i : i + args.batch_size]
execute_batch_with_backoff(main_service, chunk, api_customer_param)
logging.info("Process complete.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment