Last active
January 21, 2026 20:33
-
-
Save rexwhitten/276a064b7eaa6b75802b15e2f12894a3 to your computer and use it in GitHub Desktop.
Inventory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import boto3 | |
| import logging | |
| import os | |
| from datetime import datetime, timezone | |
| from decimal import Decimal | |
| from typing import Any, Dict, List | |
| from boto3.dynamodb.conditions import Key | |
| # Configure structured logging | |
| logger = logging.getLogger() | |
| logger.setLevel(logging.INFO) | |
| # Initialize DynamoDB client | |
| dynamodb = boto3.resource("dynamodb") | |
| table_name = os.environ.get("CCM_INVENTORY_TABLE", "inventory") | |
| inventory_table = dynamodb.Table(table_name) | |
| class DecimalEncoder(json.JSONEncoder): | |
| """Helper class to convert DynamoDB Decimal types to JSON""" | |
| def default(self, obj: Any) -> Any: | |
| if isinstance(obj, Decimal): | |
| return float(obj) | |
| return super(DecimalEncoder, self).default(obj) | |
| def response(status_code: int, body: Any) -> Dict[str, Any]: | |
| """Generate API Gateway response""" | |
| return { | |
| "statusCode": status_code, | |
| "headers": { | |
| "Content-Type": "application/json", | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key", | |
| "Access-Control-Allow-Methods": "GET,POST,PUT,DELETE,OPTIONS", | |
| }, | |
| "body": json.dumps(body, cls=DecimalEncoder), | |
| } | |
| def create_item(event_body: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create a new inventory item (Account)""" | |
| try: | |
| # Validate inputs | |
| ma_name = event_body.get("ma_name") | |
| account_id = event_body.get("account_id") | |
| if not ma_name or not account_id: | |
| return response(400, {"error": "Missing required fields: ma_name, account_id"}) | |
| # Construct Keys | |
| pk = f"MA#{ma_name.lower()}" | |
| sk = f"ACCOUNT#{account_id}" | |
| item = { | |
| "pk": pk, | |
| "sk": sk, | |
| "ma_name": ma_name, | |
| "account_id": account_id, | |
| "account_alias": event_body.get("account_alias", ""), | |
| "provider": event_body.get("provider", "AWS"), | |
| "status": "active", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "updated_at": datetime.now(timezone.utc).isoformat(), | |
| "metadata": event_body.get("metadata", {}), | |
| } | |
| inventory_table.put_item(Item=item) | |
| logger.info(f"Created inventory item: {pk} / {sk}") | |
| return response(201, { | |
| "message": "Inventory item created successfully", | |
| "data": item | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error creating item: {str(e)}") | |
| return response(500, {"error": "Failed to create item", "details": str(e)}) | |
| def get_item(ma_name: str, account_id: str) -> Dict[str, Any]: | |
| """Get a specific inventory item""" | |
| try: | |
| pk = f"MA#{ma_name.lower()}" | |
| sk = f"ACCOUNT#{account_id}" | |
| result = inventory_table.get_item(Key={"pk": pk, "sk": sk}) | |
| item = result.get("Item") | |
| if not item: | |
| return response(404, {"error": "Item not found"}) | |
| return response(200, {"data": item}) | |
| except Exception as e: | |
| logger.error(f"Error getting item: {str(e)}") | |
| return response(500, {"error": "Failed to get item", "details": str(e)}) | |
| def list_items(query_params: Dict[str, str]) -> Dict[str, Any]: | |
| """List items, optionally filtered by ma_name""" | |
| try: | |
| ma_name = query_params.get("ma_name") | |
| if ma_name: | |
| # Query by PK (MA context) | |
| pk = f"MA#{ma_name.lower()}" | |
| query_kwargs = { | |
| "KeyConditionExpression": Key("pk").eq(pk) & Key("sk").begins_with("ACCOUNT#") | |
| } | |
| items = [] | |
| while True: | |
| result = inventory_table.query(**query_kwargs) | |
| items.extend(result.get("Items", [])) | |
| if "LastEvaluatedKey" in result: | |
| query_kwargs["ExclusiveStartKey"] = result["LastEvaluatedKey"] | |
| else: | |
| break | |
| return response(200, {"data": items, "count": len(items)}) | |
| else: | |
| # Scan all accounts? | |
| # For now, maybe just return empty or require ma_name. | |
| # Or scan filter begins_with(sk, "ACCOUNT#") | |
| # Let's implement full scan for now but it's expensive. | |
| scan_kwargs = { | |
| "FilterExpression": Key("sk").begins_with("ACCOUNT#") | |
| } # Note: Key conditions not allowed in FilterExpression for Scan, but begins_with is allowed function. | |
| # Wait, Key(...) constructs are for KeyConditionExpression. | |
| # For FilterExpression we use Attr(...) or string. | |
| # However, 'sk' is a key attribute. Scan can use FilterExpression on keys. | |
| # Actually, simpler to just scan and return everything if that's the intent, | |
| # or require ma_name. | |
| # Let's require ma_name for now to encourage good patterns, unless explicitly asked. | |
| # But the previous code supported full scan. So let's support it. | |
| params = {} | |
| # Simple scan without filter to get everything in table? | |
| # Or client side filter. | |
| # Let's just return everything for simplicity of this refactor unless specified. | |
| result = inventory_table.scan() | |
| items = result.get("Items", []) | |
| # Filter in memory for ACCOUNT# items just to be clean | |
| account_items = [i for i in items if i.get("sk", "").startswith("ACCOUNT#")] | |
| return response(200, {"data": account_items, "count": len(account_items)}) | |
| except Exception as e: | |
| logger.error(f"Error listing items: {str(e)}") | |
| return response(500, {"error": "Failed to list items", "details": str(e)}) | |
| def update_item(ma_name: str, account_id: str, event_body: Dict[str, Any]) -> Dict[str, Any]: | |
| """Update inventory item""" | |
| try: | |
| pk = f"MA#{ma_name.lower()}" | |
| sk = f"ACCOUNT#{account_id}" | |
| timestamp = datetime.now(timezone.utc).isoformat() | |
| update_parts = ["metadata.updated_at = :timestamp"] | |
| expression_values = {":timestamp": timestamp} | |
| expression_names = {} | |
| if "account_alias" in event_body: | |
| update_parts.append("account_alias = :alias") | |
| expression_values[":alias"] = event_body["account_alias"] | |
| if "provider" in event_body: | |
| update_parts.append("provider = :provider") | |
| expression_values[":provider"] = event_body["provider"] | |
| # Add other fields as needed | |
| update_expression = "SET " + ", ".join(update_parts) | |
| try: | |
| result = inventory_table.update_item( | |
| Key={"pk": pk, "sk": sk}, | |
| UpdateExpression=update_expression, | |
| ExpressionAttributeValues=expression_values, | |
| ConditionExpression="attribute_exists(pk) AND attribute_exists(sk)", | |
| ReturnValues="ALL_NEW" | |
| ) | |
| return response(200, {"message": "Item updated", "data": result.get("Attributes")}) | |
| except inventory_table.meta.client.exceptions.ConditionalCheckFailedException: | |
| return response(404, {"error": "Item not found"}) | |
| except Exception as e: | |
| logger.error(f"Error updating item: {str(e)}") | |
| return response(500, {"error": "Failed to update item", "details": str(e)}) | |
| def delete_item(ma_name: str, account_id: str) -> Dict[str, Any]: | |
| """Delete inventory item""" | |
| try: | |
| pk = f"MA#{ma_name.lower()}" | |
| sk = f"ACCOUNT#{account_id}" | |
| inventory_table.delete_item(Key={"pk": pk, "sk": sk}) | |
| return response(200, {"message": "Item deleted successfully"}) | |
| except Exception as e: | |
| logger.error(f"Error deleting item: {str(e)}") | |
| return response(500, {"error": "Failed to delete item", "details": str(e)}) | |
| def inventory_logic(event: Dict[str, Any], context: Any) -> Dict[str, Any]: | |
| """Main Handler""" | |
| try: | |
| http_method = event.get("httpMethod") | |
| if not http_method: | |
| http_method = event.get("requestContext", {}).get("http", {}).get("method", "") | |
| path = event.get("rawPath") or event.get("path") or "/" | |
| query_params = event.get("queryStringParameters") or {} | |
| body = {} | |
| if event.get("body"): | |
| body = json.loads(event.get("body")) | |
| # Extract params from path if RESTful | |
| # Expected: /inventory (POST/GET) | |
| # /inventory/{ma_name}/{account_id} (GET/PUT/DELETE) | |
| path_parts = path.strip("/").split("/") | |
| # 0=inventory | |
| ma_name = None | |
| account_id = None | |
| if len(path_parts) > 1: | |
| ma_name = path_parts[1] | |
| if len(path_parts) > 2: | |
| account_id = path_parts[2] | |
| # Also check parameters for flexibility if not in path | |
| if not ma_name: | |
| ma_name = query_params.get("ma_name") or body.get("ma_name") | |
| if not account_id: | |
| account_id = query_params.get("account_id") or body.get("account_id") | |
| if http_method == "OPTIONS": | |
| return response(200, {"message": "OK"}) | |
| if http_method == "POST": | |
| return create_item(body) | |
| elif http_method == "GET": | |
| if ma_name and account_id: | |
| return get_item(ma_name, account_id) | |
| else: | |
| return list_items(query_params) | |
| elif http_method == "PUT": | |
| if ma_name and account_id: | |
| return update_item(ma_name, account_id, body) | |
| else: | |
| return response(400, {"error": "ma_name and account_id required for update"}) | |
| elif http_method == "DELETE": | |
| if ma_name and account_id: | |
| return delete_item(ma_name, account_id) | |
| else: | |
| return response(400, {"error": "ma_name and account_id required for delete"}) | |
| else: | |
| return response(404, {"error": "Route not found"}) | |
| except Exception as e: | |
| logger.error(f"Unhandled error: {str(e)}") | |
| return response(500, {"error": "Internal server error", "details": str(e)}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| run "test_inventory_crud_function" { | |
| command = apply | |
| module { | |
| source = "./tests/function" | |
| } | |
| variables { | |
| function_name = "cmmxna-pr-inventory_crud" | |
| payload = jsonencode({ | |
| httpMethod = "POST" | |
| body = jsonencode({ | |
| ma_name = "TerraformTestMA" | |
| account_id = "123456789012" | |
| account_alias = "Terraform Test Account" | |
| provider = "AWS" | |
| }) | |
| }) | |
| } | |
| # ASSERTIONS | |
| assert { | |
| condition = output.is_not_implemented || output.status_code == 200 || output.status_code == 201 | |
| error_message = "MNS PAAS Item CRUD failed unexpectedly. Status: ${output.status_code}. Messages ${output.error_messages}." | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import boto3 | |
| import logging | |
| from datetime import datetime, timezone | |
| from decimal import Decimal | |
| from boto3.dynamodb.conditions import Key | |
| import os | |
| # Configure structured logging | |
| logger = logging.getLogger() | |
| logger.setLevel(logging.INFO) | |
| # Initialize DynamoDB client | |
| dynamodb = boto3.resource("dynamodb") | |
| context_table_name = os.environ.get("CCM_INVENTORY_TABLE", "inventory") | |
| context_table = dynamodb.Table(context_table_name) | |
| class DecimalEncoder(json.JSONEncoder): | |
| """Helper class to convert DynamoDB Decimal types to JSON""" | |
| def default(self, obj): | |
| if isinstance(obj, Decimal): | |
| return float(obj) | |
| return super(DecimalEncoder, self).default(obj) | |
| def response(status_code, body): | |
| """Generate API Gateway response""" | |
| return { | |
| "statusCode": status_code, | |
| "headers": { | |
| "Content-Type": "application/json", | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key", | |
| "Access-Control-Allow-Methods": "GET,POST,PUT,DELETE,OPTIONS", | |
| }, | |
| "body": json.dumps(body, cls=DecimalEncoder), | |
| } | |
| def validate_organization_data(data): | |
| """Validate required fields for organization onboarding""" | |
| required_fields = ["organization_name", "ma_name"] | |
| missing_fields = [field for field in required_fields if not data.get(field)] | |
| if missing_fields: | |
| error_message = f"Missing required fields: {', '.join(missing_fields)}" | |
| print(f"Validation error: {error_message}") | |
| return False, error_message | |
| return True, None | |
| def create_organization(event_body): | |
| """Create a new organization record in inventory table""" | |
| try: | |
| logger.info( | |
| "Creating new organization", | |
| extra={"event_body_keys": list(event_body.keys())}, | |
| ) | |
| # Validate input | |
| is_valid, error_msg = validate_organization_data(event_body) | |
| if not is_valid: | |
| logger.warning("Organization validation failed", extra={"error": error_msg}) | |
| return response(400, {"error": error_msg}) | |
| org_name = event_body.get("organization_name") | |
| ma_name = event_body.get("ma_name") | |
| item = { | |
| "pk": f"ORG#{org_name}", | |
| "sk": f"MA#{ma_name.lower()}", | |
| "ma_name": ma_name, | |
| "organization_name": org_name, | |
| "status": "active", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "updated_at": datetime.now(timezone.utc).isoformat(), | |
| "company_info": { | |
| "industry": event_body.get("industry", ""), | |
| }, | |
| "contacts": { | |
| "ciso": { | |
| "name": event_body.get("contact_name", ""), | |
| "email": event_body.get("contact_email", ""), | |
| "phone": event_body.get("contact_phone", ""), | |
| }, | |
| "remediation": event_body.get("remediation_contact", {}), | |
| }, | |
| "features": { | |
| "scoring_enabled": event_body.get("scoring_enabled", True), | |
| }, | |
| } | |
| context_table.put_item(Item=item) | |
| logger.info( | |
| "Organization created successfully", | |
| extra={"pk": item["pk"], "ma_name": ma_name}, | |
| ) | |
| return response( | |
| 201, | |
| { | |
| "message": "Organization created successfully", | |
| "pk": item["pk"], | |
| "sk": item["sk"], | |
| "data": item, | |
| }, | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| "Failed to create organization", | |
| exc_info=True, | |
| extra={"error_type": type(e).__name__, "error_message": str(e)}, | |
| ) | |
| return response( | |
| 500, {"error": "Failed to create organization", "details": str(e)} | |
| ) | |
| def get_organization(org_name, ma_name=None): | |
| """Retrieve organization records. If ma_name provided, fetch specific MA target.""" | |
| try: | |
| if ma_name: | |
| result = context_table.get_item( | |
| Key={ | |
| "pk": f"ORG#{org_name}", | |
| "sk": f"MA#{ma_name.lower()}" | |
| } | |
| ) | |
| item = result.get("Item") | |
| if not item: | |
| return response(404, {"error": "Organization/MA target not found"}) | |
| return response( | |
| 200, | |
| {"message": "Organization retrieved successfully", "data": item}, | |
| ) | |
| else: | |
| result = context_table.query( | |
| KeyConditionExpression=Key("pk").eq(f"ORG#{org_name}") & Key("sk").begins_with("MA#") | |
| ) | |
| items = result.get("Items", []) | |
| return response( | |
| 200, | |
| {"message": "Organizations retrieved successfully", "data": items}, | |
| ) | |
| except Exception as e: | |
| print(f"Error retrieving organization: {str(e)}") | |
| return response( | |
| 500, {"error": "Failed to retrieve organization", "details": str(e)} | |
| ) | |
| def update_organization(org_name, ma_name, event_body): | |
| """Update organization information""" | |
| if not ma_name: | |
| return response(400, {"error": "ma_name is required for update"}) | |
| try: | |
| sk = f"MA#{ma_name.lower()}" | |
| timestamp = datetime.now(timezone.utc).isoformat() | |
| update_parts = ["updated_at = :timestamp"] | |
| expression_values = {":timestamp": timestamp} | |
| expression_names = {} | |
| if "industry" in event_body: | |
| update_parts.append("company_info.industry = :industry") | |
| expression_values[":industry"] = event_body["industry"] | |
| if "scoring_enabled" in event_body: | |
| update_parts.append("features.scoring_enabled = :scoring_enabled") | |
| expression_values[":scoring_enabled"] = event_body["scoring_enabled"] | |
| if "contact_name" in event_body: | |
| update_parts.append("contacts.ciso.#name = :contact_name") | |
| expression_values[":contact_name"] = event_body["contact_name"] | |
| expression_names["#name"] = "name" | |
| update_expression = "SET " + ", ".join(update_parts) | |
| update_kwargs = { | |
| "Key": {"pk": f"ORG#{org_name}", "sk": sk}, | |
| "UpdateExpression": update_expression, | |
| "ExpressionAttributeValues": expression_values, | |
| "ConditionExpression": "attribute_exists(pk) AND attribute_exists(sk)", | |
| "ReturnValues": "ALL_NEW", | |
| } | |
| if expression_names: | |
| update_kwargs["ExpressionAttributeNames"] = expression_names | |
| try: | |
| updated_item = context_table.update_item(**update_kwargs) | |
| except context_table.meta.client.exceptions.ConditionalCheckFailedException: | |
| return response(404, {"error": "Organization/MA target not found"}) | |
| return response( | |
| 200, | |
| { | |
| "message": "Organization updated successfully", | |
| "data": updated_item["Attributes"], | |
| }, | |
| ) | |
| except Exception as e: | |
| print(f"Error updating organization: {str(e)}") | |
| return response( | |
| 500, {"error": "Failed to update organization", "details": str(e)} | |
| ) | |
| def list_organizations(query_params): | |
| """List all organization records (Scan)""" | |
| try: | |
| limit = int(query_params.get("limit", 50)) | |
| last_key = query_params.get("lastKey") | |
| scan_kwargs = {"Limit": limit} | |
| if last_key: | |
| try: | |
| scan_kwargs["ExclusiveStartKey"] = json.loads(last_key) | |
| except: | |
| pass | |
| result = context_table.scan(**scan_kwargs) | |
| response_data = { | |
| "message": "Organizations retrieved successfully", | |
| "data": result.get("Items", []), | |
| "count": len(result.get("Items", [])), | |
| } | |
| if "LastEvaluatedKey" in result: | |
| response_data["lastKey"] = json.dumps(result["LastEvaluatedKey"]) | |
| return response(200, response_data) | |
| except Exception as e: | |
| print(f"Error listing organizations: {str(e)}") | |
| return response( | |
| 500, {"error": "Failed to list organizations", "details": str(e)} | |
| ) | |
| def delete_organization(org_name, ma_name): | |
| """Delete an organization record""" | |
| if not ma_name: | |
| return response(400, {"error": "ma_name is required for delete"}) | |
| try: | |
| sk = f"MA#{ma_name.lower()}" | |
| try: | |
| context_table.delete_item( | |
| Key={"pk": f"ORG#{org_name}", "sk": sk}, | |
| ConditionExpression="attribute_exists(pk) AND attribute_exists(sk)" | |
| ) | |
| except context_table.meta.client.exceptions.ConditionalCheckFailedException: | |
| return response(404, {"error": "Organization/MA target not found"}) | |
| return response( | |
| 200, | |
| { | |
| "message": "Organization deleted successfully", | |
| "organization_name": org_name, | |
| "ma_name": ma_name | |
| }, | |
| ) | |
| except Exception as e: | |
| print(f"Error deleting organization: {str(e)}") | |
| return response( | |
| 500, {"error": "Failed to delete organization", "details": str(e)} | |
| ) | |
| def onboarding_logic(event, context): | |
| """Main Lambda handler""" | |
| try: | |
| print(f"Received event: {json.dumps(event)}") | |
| http_method = event.get("httpMethod") | |
| if not http_method: | |
| http_method = ( | |
| event.get("requestContext", {}).get("http", {}).get("method", "") | |
| ) | |
| path = event.get("rawPath") or event.get("path") or "/" | |
| query_params = event.get("queryStringParameters") or {} | |
| body = {} | |
| if event.get("body"): | |
| body = json.loads(event.get("body")) | |
| path_parts = path.strip("/").split("/") | |
| org_name = None | |
| ma_name = None | |
| if len(path_parts) > 1: | |
| org_name = path_parts[1] | |
| if len(path_parts) > 2: | |
| ma_name = path_parts[2] | |
| if not ma_name and query_params.get("ma_name"): | |
| ma_name = query_params.get("ma_name") | |
| if not ma_name and body.get("ma_name"): | |
| ma_name = body.get("ma_name") | |
| if http_method == "OPTIONS": | |
| return response(200, {"message": "OK"}) | |
| if http_method == "POST" and (path == "/onboarding" or path == "/"): | |
| return create_organization(body) | |
| elif http_method == "GET" and (path == "/onboarding" or path == "/"): | |
| return list_organizations(query_params) | |
| elif http_method == "GET" and org_name: | |
| return get_organization(org_name, ma_name) | |
| elif http_method == "PUT" and org_name: | |
| return update_organization(org_name, ma_name, body) | |
| elif http_method == "DELETE" and org_name: | |
| return delete_organization(org_name, ma_name) | |
| else: | |
| return response(404, {"error": "Route not found"}) | |
| except Exception as e: | |
| print(f"Unhandled error: {str(e)}") | |
| return response(500, {"error": "Internal server error", "details": str(e)}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| run "test_onboarding_function" { | |
| command = apply | |
| module { | |
| source = "./tests/function" | |
| } | |
| variables { | |
| function_name = "cmmxna-pr-onboarding" | |
| payload = jsonencode({ | |
| httpMethod = "POST" | |
| path = "/onboarding" | |
| body = jsonencode({ | |
| organization_name = "Fiserv" | |
| ma_name = "PayFare" | |
| industry = "Terraform Testing" | |
| contact_name = "Terraform CISO" | |
| contact_email = "tf-ciso@example.com" | |
| scoring_enabled = true | |
| }) | |
| }) | |
| } | |
| # ASSERTIONS | |
| assert { | |
| # Accept success (200/201) or not implemented (501) | |
| condition = output.is_not_implemented || output.status_code == 200 || output.status_code == 201 | |
| error_message = "Onboarding failed unexpectedly. Status: ${output.status_code}. Messages ${output.error_messages}." | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # ============================================================================= | |
| # LEVEL 1: The "Wild West" Acquisition | |
| # Context: Just acquired. No standards. Goal is simply to get visibility. | |
| # ============================================================================= | |
| resource "aws_dynamodb_table_item" "context_l1_loans" { | |
| table_name = aws_dynamodb_table.inventory.name | |
| hash_key = aws_dynamodb_table.inventory.hash_key | |
| range_key = aws_dynamodb_table.inventory.range_key | |
| item = jsonencode({ | |
| "pk": {"S": "ORG#LEVEL1-0000-0001-0000-000000000001"}, | |
| "sk": {"S": "MA#QuickCashLoans"}, | |
| "company_info": {"M": { | |
| "name": {"S": "QuickCash Loans (Legacy)"}, | |
| "industry": {"S": "Consumer Lending"}, | |
| "description": {"S": "Recent acquisition. High risk. No logging standards."} | |
| }}, | |
| "contacts": {"M": { | |
| "ciso": {"S": "interim-ciso@fiserv.com"}, | |
| "remediation": {"S": "mna-team-alpha@fiserv.com"} | |
| }}, | |
| "features": {"M": { | |
| "scoring_enabled": {"BOOL": true}, | |
| "target_maturity_level": {"N": "1"}, # Target: Just get Wiz installed | |
| "drift_alerting_enabled": {"BOOL": false} # Don't alert on noise yet | |
| }} | |
| }) | |
| } | |
| # ============================================================================= | |
| # LEVEL 2: The "Local Compliance" Shop | |
| # Context: Basic hygiene. Logging exists but is local. Scripts are manual. | |
| # ============================================================================= | |
| resource "aws_dynamodb_table_item" "context_l2_ledger" { | |
| table_name = aws_dynamodb_table.inventory.name | |
| hash_key = aws_dynamodb_table.inventory.hash_key | |
| range_key = aws_dynamodb_table.inventory.range_key | |
| item = jsonencode({ | |
| "pk": {"S": "ORG#LEVEL2-0000-0002-0000-000000000002"}, | |
| "sk": {"S": "MA#LedgerBlockInc"}, | |
| "company_info": {"M": { | |
| "name": {"S": "Ledger Block Inc"}, | |
| "industry": {"S": "Crypto / Blockchain"}, | |
| "description": {"S": "Remediation phase. Local logging enabled. Manual hardening."} | |
| }}, | |
| "contacts": {"M": { | |
| "ciso": {"S": "security@ledgerblock.io"}, | |
| "remediation": {"S": "devops@ledgerblock.io"} | |
| }}, | |
| "features": {"M": { | |
| "scoring_enabled": {"BOOL": true}, | |
| "target_maturity_level": {"N": "2"}, # Target: Local Logging & Scripts | |
| "drift_alerting_enabled": {"BOOL": true} | |
| }} | |
| }) | |
| } | |
| # ============================================================================= | |
| # LEVEL 3: The "Enterprise Alignment" Firm | |
| # Context: Day 0 Integration. Policies are switching to Enterprise management. | |
| # ============================================================================= | |
| resource "aws_dynamodb_table_item" "context_l3_wealth" { | |
| table_name = aws_dynamodb_table.inventory.name | |
| hash_key = aws_dynamodb_table.inventory.hash_key | |
| range_key = aws_dynamodb_table.inventory.range_key | |
| item = jsonencode({ | |
| "pk": {"S": "ORG#LEVEL3-0000-0003-0000-000000000003"}, | |
| "sk": {"S": "MA#WealthSafeAdvisors"}, | |
| "company_info": {"M": { | |
| "name": {"S": "WealthSafe Advisors"}, | |
| "industry": {"S": "Wealth Management"}, | |
| "description": {"S": "Pre-Merger Close. Enterprise Policy & FinOps integration."} | |
| }}, | |
| "contacts": {"M": { | |
| "ciso": {"S": "risk-officer@wealthsafe.com"}, | |
| "remediation": {"S": "cloud-arch@fiserv-corp.com"} | |
| }}, | |
| "features": {"M": { | |
| "scoring_enabled": {"BOOL": true}, | |
| "target_maturity_level": {"N": "3"}, # Target: Enterprise Policy | |
| "drift_alerting_enabled": {"BOOL": true} | |
| }} | |
| }) | |
| } | |
| # ============================================================================= | |
| # LEVEL 4: The "Network Integrated" Processor | |
| # Context: Post-Merger (Day 1). Transit Gateway connected. High traffic flow. | |
| # ============================================================================= | |
| resource "aws_dynamodb_table_item" "context_l4_payments" { | |
| table_name = aws_dynamodb_table.inventory.name | |
| hash_key = aws_dynamodb_table.inventory.hash_key | |
| range_key = aws_dynamodb_table.inventory.range_key | |
| item = jsonencode({ | |
| "pk": {"S": "ORG#LEVEL4-0000-0004-0000-000000000004"}, | |
| "sk": {"S": "MA#GlobalPayRails"}, | |
| "company_info": {"M": { | |
| "name": {"S": "Global Pay Rails"}, | |
| "industry": {"S": "Payment Processing"}, | |
| "description": {"S": "Connected to Transit Gateway. Production Traffic Live."} | |
| }}, | |
| "contacts": {"M": { | |
| "ciso": {"S": "ciso@globalpay.com"}, | |
| "remediation": {"S": "noc@fiserv-commercial.com"} | |
| }}, | |
| "features": {"M": { | |
| "scoring_enabled": {"BOOL": true}, | |
| "target_maturity_level": {"N": "4"}, # Target: Network Integration | |
| "drift_alerting_enabled": {"BOOL": true} | |
| }} | |
| }) | |
| } | |
| # ============================================================================= | |
| # LEVEL 5: The "Fortress" (End State) | |
| # Context: Service Restrictions Active. Immutable Infrastructure. | |
| # ============================================================================= | |
| resource "aws_dynamodb_table_item" "context_l5_surety" { | |
| table_name = aws_dynamodb_table.inventory.name | |
| hash_key = aws_dynamodb_table.inventory.hash_key | |
| range_key = aws_dynamodb_table.inventory.range_key | |
| item = jsonencode({ | |
| "pk": {"S": "ORG#LEVEL5-0000-0005-0000-000000000005"}, | |
| "sk": {"S": "MA#SuretyInsuranceGroup"}, | |
| "company_info": {"M": { | |
| "name": {"S": "Surety Insurance Group"}, | |
| "industry": {"S": "InsurTech"}, | |
| "description": {"S": "End State. Service Control Policies blocking unauthorized services."} | |
| }}, | |
| "contacts": {"M": { | |
| "ciso": {"S": "security@surety.com"}, | |
| "remediation": {"S": "sre-team@surety.com"} | |
| }}, | |
| "features": {"M": { | |
| "scoring_enabled": {"BOOL": true}, | |
| "target_maturity_level": {"N": "5"}, # Target: Service Restriction / Parity | |
| "drift_alerting_enabled": {"BOOL": true} | |
| }} | |
| }) | |
| } | |
| # ============================================================================= | |
| # INVENTORY ITEM EXAMPLE | |
| # Context: An AWS Account associated with a merged entity | |
| # ============================================================================= | |
| resource "aws_dynamodb_table_item" "inventory_l1_account" { | |
| table_name = aws_dynamodb_table.inventory.name | |
| hash_key = aws_dynamodb_table.inventory.hash_key | |
| range_key = aws_dynamodb_table.inventory.range_key | |
| item = jsonencode({ | |
| "pk": {"S": "MA#payfare"}, | |
| "sk": {"S": "ACCOUNT#123456789012"}, | |
| "ma_name": {"S": "PayFare"}, | |
| "account_id": {"S": "123456789012"}, | |
| "account_alias": {"S": "payfare-app1-prod"}, | |
| "provider": {"S": "AWS"}, | |
| "status": {"S": "active"}, | |
| "metadata": {"M": { | |
| "environment": {"S": "production"}, | |
| "region": {"S": "us-east-1"} | |
| }} | |
| }) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pytest | |
| from unittest.mock import MagicMock, patch | |
| import json | |
| import os | |
| import sys | |
| # Helper to ensure we start with a fresh module for top-level code execution | |
| def clean_imports(): | |
| if "lambdas.inventory_crud" in sys.modules: | |
| del sys.modules["lambdas.inventory_crud"] | |
| @patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"}) | |
| @patch("boto3.resource") | |
| def test_create_item(mock_boto_resource): | |
| clean_imports() | |
| mock_table = MagicMock() | |
| mock_dynamodb = MagicMock() | |
| mock_boto_resource.return_value = mock_dynamodb | |
| mock_dynamodb.Table.return_value = mock_table | |
| from lambdas.inventory_crud import inventory_logic | |
| event = { | |
| "httpMethod": "POST", | |
| "body": json.dumps( | |
| { | |
| "ma_name": "PayFare", | |
| "account_id": "123456789012", | |
| "account_alias": "payfare-app1-prod", | |
| "provider": "AWS" | |
| } | |
| ), | |
| } | |
| response = inventory_logic(event, {}) | |
| assert response["statusCode"] == 201 | |
| body = json.loads(response["body"]) | |
| assert body["data"]["pk"] == "MA#payfare" | |
| assert body["data"]["sk"] == "ACCOUNT#123456789012" | |
| assert body["data"]["account_alias"] == "payfare-app1-prod" | |
| mock_table.put_item.assert_called_once() | |
| @patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"}) | |
| @patch("boto3.resource") | |
| def test_get_items_by_ma(mock_boto_resource): | |
| clean_imports() | |
| mock_table = MagicMock() | |
| mock_dynamodb = MagicMock() | |
| mock_boto_resource.return_value = mock_dynamodb | |
| mock_dynamodb.Table.return_value = mock_table | |
| # Mock query response | |
| mock_table.query.return_value = { | |
| "Items": [{"pk": "MA#payfare", "sk": "ACCOUNT#123", "account_id": "123"}] | |
| } | |
| from lambdas.inventory_crud import inventory_logic | |
| event = {"httpMethod": "GET", "queryStringParameters": {"ma_name": "PayFare"}} | |
| response = inventory_logic(event, {}) | |
| assert response["statusCode"] == 200 | |
| body = json.loads(response["body"]) | |
| assert len(body["data"]) == 1 | |
| assert body["data"][0]["pk"] == "MA#payfare" | |
| mock_table.query.assert_called_once() | |
| @patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"}) | |
| @patch("boto3.resource") | |
| def test_get_specific_item(mock_boto_resource): | |
| clean_imports() | |
| mock_table = MagicMock() | |
| mock_dynamodb = MagicMock() | |
| mock_boto_resource.return_value = mock_dynamodb | |
| mock_dynamodb.Table.return_value = mock_table | |
| # Mock get response | |
| mock_table.get_item.return_value = { | |
| "Item": {"pk": "MA#payfare", "sk": "ACCOUNT#123", "account_alias": "Test"} | |
| } | |
| from lambdas.inventory_crud import inventory_logic | |
| event = {"httpMethod": "GET", "path": "/inventory/PayFare/123"} | |
| response = inventory_logic(event, {}) | |
| assert response["statusCode"] == 200 | |
| body = json.loads(response["body"]) | |
| assert body["data"]["account_alias"] == "Test" | |
| @patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"}) | |
| @patch("boto3.resource") | |
| def test_delete_item(mock_boto_resource): | |
| clean_imports() | |
| mock_table = MagicMock() | |
| mock_dynamodb = MagicMock() | |
| mock_boto_resource.return_value = mock_dynamodb | |
| mock_dynamodb.Table.return_value = mock_table | |
| from lambdas.inventory_crud import inventory_logic | |
| event = { | |
| "httpMethod": "DELETE", | |
| "path": "/inventory/PayFare/123", | |
| "queryStringParameters": {} # Explicitly ensure QS is not None if code doesn't handle None | |
| } | |
| response = inventory_logic(event, {}) | |
| assert response["statusCode"] == 200 | |
| mock_table.delete_item.assert_called_once() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pytest | |
| from unittest.mock import MagicMock, patch | |
| import json | |
| import os | |
| import sys | |
| # Helper to ensure we start with a fresh module for top-level code execution | |
| def clean_imports(): | |
| if "lambdas.onboarding" in sys.modules: | |
| del sys.modules["lambdas.onboarding"] | |
| @patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"}) | |
| @patch("boto3.resource") | |
| def test_onboarding_logic_list_records(mock_boto_resource): | |
| clean_imports() | |
| # Setup mock DynamoDB | |
| mock_table = MagicMock() | |
| mock_dynamodb = MagicMock() | |
| mock_boto_resource.return_value = mock_dynamodb | |
| mock_dynamodb.Table.return_value = mock_table | |
| # Mock scan response | |
| mock_table.scan.return_value = {"Items": []} | |
| # Import the module after mocking | |
| from lambdas.onboarding import onboarding_logic | |
| event = { | |
| "httpMethod": "GET", | |
| "path": "/onboarding", | |
| } | |
| # Call the handler | |
| response = onboarding_logic(event, {}) | |
| # Assertions | |
| assert response["statusCode"] == 200 | |
| body = json.loads(response["body"]) | |
| assert body["message"] == "Organizations retrieved successfully" | |
| assert body["data"] == [] | |
| @patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"}) | |
| @patch("boto3.resource") | |
| def test_onboarding_logic_create_organization(mock_boto_resource): | |
| clean_imports() | |
| mock_table = MagicMock() | |
| mock_dynamodb = MagicMock() | |
| mock_boto_resource.return_value = mock_dynamodb | |
| mock_dynamodb.Table.return_value = mock_table | |
| from lambdas.onboarding import onboarding_logic | |
| event = { | |
| "httpMethod": "POST", | |
| "body": json.dumps( | |
| { | |
| "organization_name": "Fiserv", | |
| "ma_name": "PayFare", | |
| "industry": "Technology", | |
| "contact_name": "John Doe", | |
| "contact_email": "john@test.com", | |
| "scoring_enabled": True, | |
| } | |
| ), | |
| } | |
| response = onboarding_logic(event, {}) | |
| assert response["statusCode"] == 201 | |
| body = json.loads(response["body"]) | |
| assert body["pk"] == "ORG#Fiserv" | |
| assert body["sk"] == "MA#payfare" | |
| assert body["data"]["ma_name"] == "PayFare" | |
| mock_table.put_item.assert_called_once() | |
| @patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"}) | |
| @patch("boto3.resource") | |
| def test_onboarding_logic_get_organization(mock_boto_resource): | |
| clean_imports() | |
| mock_table = MagicMock() | |
| mock_dynamodb = MagicMock() | |
| mock_boto_resource.return_value = mock_dynamodb | |
| mock_dynamodb.Table.return_value = mock_table | |
| # Mock query response | |
| test_org_records = [{ | |
| "pk": "ORG#Fiserv", | |
| "sk": "MA#payfare", | |
| "ma_name": "PayFare", | |
| "organization_name": "Fiserv" | |
| }] | |
| mock_table.query.return_value = {"Items": test_org_records} | |
| from lambdas.onboarding import onboarding_logic | |
| event = { | |
| "httpMethod": "GET", | |
| "path": "/onboarding/Fiserv", | |
| } | |
| response = onboarding_logic(event, {}) | |
| assert response["statusCode"] == 200 | |
| body = json.loads(response["body"]) | |
| assert len(body["data"]) == 1 | |
| assert body["data"][0]["ma_name"] == "PayFare" | |
| @patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"}) | |
| @patch("boto3.resource") | |
| def test_onboarding_logic_get_specific_ma(mock_boto_resource): | |
| clean_imports() | |
| mock_table = MagicMock() | |
| mock_dynamodb = MagicMock() | |
| mock_boto_resource.return_value = mock_dynamodb | |
| mock_dynamodb.Table.return_value = mock_table | |
| # Mock get_item response | |
| test_record = { | |
| "pk": "ORG#Fiserv", | |
| "sk": "MA#payfare", | |
| "ma_name": "PayFare", | |
| "organization_name": "Fiserv" | |
| } | |
| mock_table.get_item.return_value = {"Item": test_record} | |
| from lambdas.onboarding import onboarding_logic | |
| # Test path with MA name | |
| event = { | |
| "httpMethod": "GET", | |
| "path": "/onboarding/Fiserv/PayFare", | |
| } | |
| response = onboarding_logic(event, {}) | |
| assert response["statusCode"] == 200 | |
| body = json.loads(response["body"]) | |
| assert body["data"]["ma_name"] == "PayFare" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment