Skip to content

Instantly share code, notes, and snippets.

@rexwhitten
Last active January 21, 2026 20:33
Show Gist options
  • Select an option

  • Save rexwhitten/276a064b7eaa6b75802b15e2f12894a3 to your computer and use it in GitHub Desktop.

Select an option

Save rexwhitten/276a064b7eaa6b75802b15e2f12894a3 to your computer and use it in GitHub Desktop.
Inventory
import json
import boto3
import logging
import os
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any, Dict, List
from boto3.dynamodb.conditions import Key
# Configure structured logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize DynamoDB client
dynamodb = boto3.resource("dynamodb")
table_name = os.environ.get("CCM_INVENTORY_TABLE", "inventory")
inventory_table = dynamodb.Table(table_name)
class DecimalEncoder(json.JSONEncoder):
"""Helper class to convert DynamoDB Decimal types to JSON"""
def default(self, obj: Any) -> Any:
if isinstance(obj, Decimal):
return float(obj)
return super(DecimalEncoder, self).default(obj)
def response(status_code: int, body: Any) -> Dict[str, Any]:
"""Generate API Gateway response"""
return {
"statusCode": status_code,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key",
"Access-Control-Allow-Methods": "GET,POST,PUT,DELETE,OPTIONS",
},
"body": json.dumps(body, cls=DecimalEncoder),
}
def create_item(event_body: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new inventory item (Account)"""
try:
# Validate inputs
ma_name = event_body.get("ma_name")
account_id = event_body.get("account_id")
if not ma_name or not account_id:
return response(400, {"error": "Missing required fields: ma_name, account_id"})
# Construct Keys
pk = f"MA#{ma_name.lower()}"
sk = f"ACCOUNT#{account_id}"
item = {
"pk": pk,
"sk": sk,
"ma_name": ma_name,
"account_id": account_id,
"account_alias": event_body.get("account_alias", ""),
"provider": event_body.get("provider", "AWS"),
"status": "active",
"created_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
"metadata": event_body.get("metadata", {}),
}
inventory_table.put_item(Item=item)
logger.info(f"Created inventory item: {pk} / {sk}")
return response(201, {
"message": "Inventory item created successfully",
"data": item
})
except Exception as e:
logger.error(f"Error creating item: {str(e)}")
return response(500, {"error": "Failed to create item", "details": str(e)})
def get_item(ma_name: str, account_id: str) -> Dict[str, Any]:
"""Get a specific inventory item"""
try:
pk = f"MA#{ma_name.lower()}"
sk = f"ACCOUNT#{account_id}"
result = inventory_table.get_item(Key={"pk": pk, "sk": sk})
item = result.get("Item")
if not item:
return response(404, {"error": "Item not found"})
return response(200, {"data": item})
except Exception as e:
logger.error(f"Error getting item: {str(e)}")
return response(500, {"error": "Failed to get item", "details": str(e)})
def list_items(query_params: Dict[str, str]) -> Dict[str, Any]:
"""List items, optionally filtered by ma_name"""
try:
ma_name = query_params.get("ma_name")
if ma_name:
# Query by PK (MA context)
pk = f"MA#{ma_name.lower()}"
query_kwargs = {
"KeyConditionExpression": Key("pk").eq(pk) & Key("sk").begins_with("ACCOUNT#")
}
items = []
while True:
result = inventory_table.query(**query_kwargs)
items.extend(result.get("Items", []))
if "LastEvaluatedKey" in result:
query_kwargs["ExclusiveStartKey"] = result["LastEvaluatedKey"]
else:
break
return response(200, {"data": items, "count": len(items)})
else:
# Scan all accounts?
# For now, maybe just return empty or require ma_name.
# Or scan filter begins_with(sk, "ACCOUNT#")
# Let's implement full scan for now but it's expensive.
scan_kwargs = {
"FilterExpression": Key("sk").begins_with("ACCOUNT#")
} # Note: Key conditions not allowed in FilterExpression for Scan, but begins_with is allowed function.
# Wait, Key(...) constructs are for KeyConditionExpression.
# For FilterExpression we use Attr(...) or string.
# However, 'sk' is a key attribute. Scan can use FilterExpression on keys.
# Actually, simpler to just scan and return everything if that's the intent,
# or require ma_name.
# Let's require ma_name for now to encourage good patterns, unless explicitly asked.
# But the previous code supported full scan. So let's support it.
params = {}
# Simple scan without filter to get everything in table?
# Or client side filter.
# Let's just return everything for simplicity of this refactor unless specified.
result = inventory_table.scan()
items = result.get("Items", [])
# Filter in memory for ACCOUNT# items just to be clean
account_items = [i for i in items if i.get("sk", "").startswith("ACCOUNT#")]
return response(200, {"data": account_items, "count": len(account_items)})
except Exception as e:
logger.error(f"Error listing items: {str(e)}")
return response(500, {"error": "Failed to list items", "details": str(e)})
def update_item(ma_name: str, account_id: str, event_body: Dict[str, Any]) -> Dict[str, Any]:
"""Update inventory item"""
try:
pk = f"MA#{ma_name.lower()}"
sk = f"ACCOUNT#{account_id}"
timestamp = datetime.now(timezone.utc).isoformat()
update_parts = ["metadata.updated_at = :timestamp"]
expression_values = {":timestamp": timestamp}
expression_names = {}
if "account_alias" in event_body:
update_parts.append("account_alias = :alias")
expression_values[":alias"] = event_body["account_alias"]
if "provider" in event_body:
update_parts.append("provider = :provider")
expression_values[":provider"] = event_body["provider"]
# Add other fields as needed
update_expression = "SET " + ", ".join(update_parts)
try:
result = inventory_table.update_item(
Key={"pk": pk, "sk": sk},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_values,
ConditionExpression="attribute_exists(pk) AND attribute_exists(sk)",
ReturnValues="ALL_NEW"
)
return response(200, {"message": "Item updated", "data": result.get("Attributes")})
except inventory_table.meta.client.exceptions.ConditionalCheckFailedException:
return response(404, {"error": "Item not found"})
except Exception as e:
logger.error(f"Error updating item: {str(e)}")
return response(500, {"error": "Failed to update item", "details": str(e)})
def delete_item(ma_name: str, account_id: str) -> Dict[str, Any]:
"""Delete inventory item"""
try:
pk = f"MA#{ma_name.lower()}"
sk = f"ACCOUNT#{account_id}"
inventory_table.delete_item(Key={"pk": pk, "sk": sk})
return response(200, {"message": "Item deleted successfully"})
except Exception as e:
logger.error(f"Error deleting item: {str(e)}")
return response(500, {"error": "Failed to delete item", "details": str(e)})
def inventory_logic(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""Main Handler"""
try:
http_method = event.get("httpMethod")
if not http_method:
http_method = event.get("requestContext", {}).get("http", {}).get("method", "")
path = event.get("rawPath") or event.get("path") or "/"
query_params = event.get("queryStringParameters") or {}
body = {}
if event.get("body"):
body = json.loads(event.get("body"))
# Extract params from path if RESTful
# Expected: /inventory (POST/GET)
# /inventory/{ma_name}/{account_id} (GET/PUT/DELETE)
path_parts = path.strip("/").split("/")
# 0=inventory
ma_name = None
account_id = None
if len(path_parts) > 1:
ma_name = path_parts[1]
if len(path_parts) > 2:
account_id = path_parts[2]
# Also check parameters for flexibility if not in path
if not ma_name:
ma_name = query_params.get("ma_name") or body.get("ma_name")
if not account_id:
account_id = query_params.get("account_id") or body.get("account_id")
if http_method == "OPTIONS":
return response(200, {"message": "OK"})
if http_method == "POST":
return create_item(body)
elif http_method == "GET":
if ma_name and account_id:
return get_item(ma_name, account_id)
else:
return list_items(query_params)
elif http_method == "PUT":
if ma_name and account_id:
return update_item(ma_name, account_id, body)
else:
return response(400, {"error": "ma_name and account_id required for update"})
elif http_method == "DELETE":
if ma_name and account_id:
return delete_item(ma_name, account_id)
else:
return response(400, {"error": "ma_name and account_id required for delete"})
else:
return response(404, {"error": "Route not found"})
except Exception as e:
logger.error(f"Unhandled error: {str(e)}")
return response(500, {"error": "Internal server error", "details": str(e)})
run "test_inventory_crud_function" {
command = apply
module {
source = "./tests/function"
}
variables {
function_name = "cmmxna-pr-inventory_crud"
payload = jsonencode({
httpMethod = "POST"
body = jsonencode({
ma_name = "TerraformTestMA"
account_id = "123456789012"
account_alias = "Terraform Test Account"
provider = "AWS"
})
})
}
# ASSERTIONS
assert {
condition = output.is_not_implemented || output.status_code == 200 || output.status_code == 201
error_message = "MNS PAAS Item CRUD failed unexpectedly. Status: ${output.status_code}. Messages ${output.error_messages}."
}
}
import json
import boto3
import logging
from datetime import datetime, timezone
from decimal import Decimal
from boto3.dynamodb.conditions import Key
import os
# Configure structured logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize DynamoDB client
dynamodb = boto3.resource("dynamodb")
context_table_name = os.environ.get("CCM_INVENTORY_TABLE", "inventory")
context_table = dynamodb.Table(context_table_name)
class DecimalEncoder(json.JSONEncoder):
"""Helper class to convert DynamoDB Decimal types to JSON"""
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return super(DecimalEncoder, self).default(obj)
def response(status_code, body):
"""Generate API Gateway response"""
return {
"statusCode": status_code,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Content-Type,X-Amz-Date,Authorization,X-Api-Key",
"Access-Control-Allow-Methods": "GET,POST,PUT,DELETE,OPTIONS",
},
"body": json.dumps(body, cls=DecimalEncoder),
}
def validate_organization_data(data):
"""Validate required fields for organization onboarding"""
required_fields = ["organization_name", "ma_name"]
missing_fields = [field for field in required_fields if not data.get(field)]
if missing_fields:
error_message = f"Missing required fields: {', '.join(missing_fields)}"
print(f"Validation error: {error_message}")
return False, error_message
return True, None
def create_organization(event_body):
"""Create a new organization record in inventory table"""
try:
logger.info(
"Creating new organization",
extra={"event_body_keys": list(event_body.keys())},
)
# Validate input
is_valid, error_msg = validate_organization_data(event_body)
if not is_valid:
logger.warning("Organization validation failed", extra={"error": error_msg})
return response(400, {"error": error_msg})
org_name = event_body.get("organization_name")
ma_name = event_body.get("ma_name")
item = {
"pk": f"ORG#{org_name}",
"sk": f"MA#{ma_name.lower()}",
"ma_name": ma_name,
"organization_name": org_name,
"status": "active",
"created_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
"company_info": {
"industry": event_body.get("industry", ""),
},
"contacts": {
"ciso": {
"name": event_body.get("contact_name", ""),
"email": event_body.get("contact_email", ""),
"phone": event_body.get("contact_phone", ""),
},
"remediation": event_body.get("remediation_contact", {}),
},
"features": {
"scoring_enabled": event_body.get("scoring_enabled", True),
},
}
context_table.put_item(Item=item)
logger.info(
"Organization created successfully",
extra={"pk": item["pk"], "ma_name": ma_name},
)
return response(
201,
{
"message": "Organization created successfully",
"pk": item["pk"],
"sk": item["sk"],
"data": item,
},
)
except Exception as e:
logger.error(
"Failed to create organization",
exc_info=True,
extra={"error_type": type(e).__name__, "error_message": str(e)},
)
return response(
500, {"error": "Failed to create organization", "details": str(e)}
)
def get_organization(org_name, ma_name=None):
"""Retrieve organization records. If ma_name provided, fetch specific MA target."""
try:
if ma_name:
result = context_table.get_item(
Key={
"pk": f"ORG#{org_name}",
"sk": f"MA#{ma_name.lower()}"
}
)
item = result.get("Item")
if not item:
return response(404, {"error": "Organization/MA target not found"})
return response(
200,
{"message": "Organization retrieved successfully", "data": item},
)
else:
result = context_table.query(
KeyConditionExpression=Key("pk").eq(f"ORG#{org_name}") & Key("sk").begins_with("MA#")
)
items = result.get("Items", [])
return response(
200,
{"message": "Organizations retrieved successfully", "data": items},
)
except Exception as e:
print(f"Error retrieving organization: {str(e)}")
return response(
500, {"error": "Failed to retrieve organization", "details": str(e)}
)
def update_organization(org_name, ma_name, event_body):
"""Update organization information"""
if not ma_name:
return response(400, {"error": "ma_name is required for update"})
try:
sk = f"MA#{ma_name.lower()}"
timestamp = datetime.now(timezone.utc).isoformat()
update_parts = ["updated_at = :timestamp"]
expression_values = {":timestamp": timestamp}
expression_names = {}
if "industry" in event_body:
update_parts.append("company_info.industry = :industry")
expression_values[":industry"] = event_body["industry"]
if "scoring_enabled" in event_body:
update_parts.append("features.scoring_enabled = :scoring_enabled")
expression_values[":scoring_enabled"] = event_body["scoring_enabled"]
if "contact_name" in event_body:
update_parts.append("contacts.ciso.#name = :contact_name")
expression_values[":contact_name"] = event_body["contact_name"]
expression_names["#name"] = "name"
update_expression = "SET " + ", ".join(update_parts)
update_kwargs = {
"Key": {"pk": f"ORG#{org_name}", "sk": sk},
"UpdateExpression": update_expression,
"ExpressionAttributeValues": expression_values,
"ConditionExpression": "attribute_exists(pk) AND attribute_exists(sk)",
"ReturnValues": "ALL_NEW",
}
if expression_names:
update_kwargs["ExpressionAttributeNames"] = expression_names
try:
updated_item = context_table.update_item(**update_kwargs)
except context_table.meta.client.exceptions.ConditionalCheckFailedException:
return response(404, {"error": "Organization/MA target not found"})
return response(
200,
{
"message": "Organization updated successfully",
"data": updated_item["Attributes"],
},
)
except Exception as e:
print(f"Error updating organization: {str(e)}")
return response(
500, {"error": "Failed to update organization", "details": str(e)}
)
def list_organizations(query_params):
"""List all organization records (Scan)"""
try:
limit = int(query_params.get("limit", 50))
last_key = query_params.get("lastKey")
scan_kwargs = {"Limit": limit}
if last_key:
try:
scan_kwargs["ExclusiveStartKey"] = json.loads(last_key)
except:
pass
result = context_table.scan(**scan_kwargs)
response_data = {
"message": "Organizations retrieved successfully",
"data": result.get("Items", []),
"count": len(result.get("Items", [])),
}
if "LastEvaluatedKey" in result:
response_data["lastKey"] = json.dumps(result["LastEvaluatedKey"])
return response(200, response_data)
except Exception as e:
print(f"Error listing organizations: {str(e)}")
return response(
500, {"error": "Failed to list organizations", "details": str(e)}
)
def delete_organization(org_name, ma_name):
"""Delete an organization record"""
if not ma_name:
return response(400, {"error": "ma_name is required for delete"})
try:
sk = f"MA#{ma_name.lower()}"
try:
context_table.delete_item(
Key={"pk": f"ORG#{org_name}", "sk": sk},
ConditionExpression="attribute_exists(pk) AND attribute_exists(sk)"
)
except context_table.meta.client.exceptions.ConditionalCheckFailedException:
return response(404, {"error": "Organization/MA target not found"})
return response(
200,
{
"message": "Organization deleted successfully",
"organization_name": org_name,
"ma_name": ma_name
},
)
except Exception as e:
print(f"Error deleting organization: {str(e)}")
return response(
500, {"error": "Failed to delete organization", "details": str(e)}
)
def onboarding_logic(event, context):
"""Main Lambda handler"""
try:
print(f"Received event: {json.dumps(event)}")
http_method = event.get("httpMethod")
if not http_method:
http_method = (
event.get("requestContext", {}).get("http", {}).get("method", "")
)
path = event.get("rawPath") or event.get("path") or "/"
query_params = event.get("queryStringParameters") or {}
body = {}
if event.get("body"):
body = json.loads(event.get("body"))
path_parts = path.strip("/").split("/")
org_name = None
ma_name = None
if len(path_parts) > 1:
org_name = path_parts[1]
if len(path_parts) > 2:
ma_name = path_parts[2]
if not ma_name and query_params.get("ma_name"):
ma_name = query_params.get("ma_name")
if not ma_name and body.get("ma_name"):
ma_name = body.get("ma_name")
if http_method == "OPTIONS":
return response(200, {"message": "OK"})
if http_method == "POST" and (path == "/onboarding" or path == "/"):
return create_organization(body)
elif http_method == "GET" and (path == "/onboarding" or path == "/"):
return list_organizations(query_params)
elif http_method == "GET" and org_name:
return get_organization(org_name, ma_name)
elif http_method == "PUT" and org_name:
return update_organization(org_name, ma_name, body)
elif http_method == "DELETE" and org_name:
return delete_organization(org_name, ma_name)
else:
return response(404, {"error": "Route not found"})
except Exception as e:
print(f"Unhandled error: {str(e)}")
return response(500, {"error": "Internal server error", "details": str(e)})
run "test_onboarding_function" {
command = apply
module {
source = "./tests/function"
}
variables {
function_name = "cmmxna-pr-onboarding"
payload = jsonencode({
httpMethod = "POST"
path = "/onboarding"
body = jsonencode({
organization_name = "Fiserv"
ma_name = "PayFare"
industry = "Terraform Testing"
contact_name = "Terraform CISO"
contact_email = "tf-ciso@example.com"
scoring_enabled = true
})
})
}
# ASSERTIONS
assert {
# Accept success (200/201) or not implemented (501)
condition = output.is_not_implemented || output.status_code == 200 || output.status_code == 201
error_message = "Onboarding failed unexpectedly. Status: ${output.status_code}. Messages ${output.error_messages}."
}
}
# =============================================================================
# LEVEL 1: The "Wild West" Acquisition
# Context: Just acquired. No standards. Goal is simply to get visibility.
# =============================================================================
resource "aws_dynamodb_table_item" "context_l1_loans" {
table_name = aws_dynamodb_table.inventory.name
hash_key = aws_dynamodb_table.inventory.hash_key
range_key = aws_dynamodb_table.inventory.range_key
item = jsonencode({
"pk": {"S": "ORG#LEVEL1-0000-0001-0000-000000000001"},
"sk": {"S": "MA#QuickCashLoans"},
"company_info": {"M": {
"name": {"S": "QuickCash Loans (Legacy)"},
"industry": {"S": "Consumer Lending"},
"description": {"S": "Recent acquisition. High risk. No logging standards."}
}},
"contacts": {"M": {
"ciso": {"S": "interim-ciso@fiserv.com"},
"remediation": {"S": "mna-team-alpha@fiserv.com"}
}},
"features": {"M": {
"scoring_enabled": {"BOOL": true},
"target_maturity_level": {"N": "1"}, # Target: Just get Wiz installed
"drift_alerting_enabled": {"BOOL": false} # Don't alert on noise yet
}}
})
}
# =============================================================================
# LEVEL 2: The "Local Compliance" Shop
# Context: Basic hygiene. Logging exists but is local. Scripts are manual.
# =============================================================================
resource "aws_dynamodb_table_item" "context_l2_ledger" {
table_name = aws_dynamodb_table.inventory.name
hash_key = aws_dynamodb_table.inventory.hash_key
range_key = aws_dynamodb_table.inventory.range_key
item = jsonencode({
"pk": {"S": "ORG#LEVEL2-0000-0002-0000-000000000002"},
"sk": {"S": "MA#LedgerBlockInc"},
"company_info": {"M": {
"name": {"S": "Ledger Block Inc"},
"industry": {"S": "Crypto / Blockchain"},
"description": {"S": "Remediation phase. Local logging enabled. Manual hardening."}
}},
"contacts": {"M": {
"ciso": {"S": "security@ledgerblock.io"},
"remediation": {"S": "devops@ledgerblock.io"}
}},
"features": {"M": {
"scoring_enabled": {"BOOL": true},
"target_maturity_level": {"N": "2"}, # Target: Local Logging & Scripts
"drift_alerting_enabled": {"BOOL": true}
}}
})
}
# =============================================================================
# LEVEL 3: The "Enterprise Alignment" Firm
# Context: Day 0 Integration. Policies are switching to Enterprise management.
# =============================================================================
resource "aws_dynamodb_table_item" "context_l3_wealth" {
table_name = aws_dynamodb_table.inventory.name
hash_key = aws_dynamodb_table.inventory.hash_key
range_key = aws_dynamodb_table.inventory.range_key
item = jsonencode({
"pk": {"S": "ORG#LEVEL3-0000-0003-0000-000000000003"},
"sk": {"S": "MA#WealthSafeAdvisors"},
"company_info": {"M": {
"name": {"S": "WealthSafe Advisors"},
"industry": {"S": "Wealth Management"},
"description": {"S": "Pre-Merger Close. Enterprise Policy & FinOps integration."}
}},
"contacts": {"M": {
"ciso": {"S": "risk-officer@wealthsafe.com"},
"remediation": {"S": "cloud-arch@fiserv-corp.com"}
}},
"features": {"M": {
"scoring_enabled": {"BOOL": true},
"target_maturity_level": {"N": "3"}, # Target: Enterprise Policy
"drift_alerting_enabled": {"BOOL": true}
}}
})
}
# =============================================================================
# LEVEL 4: The "Network Integrated" Processor
# Context: Post-Merger (Day 1). Transit Gateway connected. High traffic flow.
# =============================================================================
resource "aws_dynamodb_table_item" "context_l4_payments" {
table_name = aws_dynamodb_table.inventory.name
hash_key = aws_dynamodb_table.inventory.hash_key
range_key = aws_dynamodb_table.inventory.range_key
item = jsonencode({
"pk": {"S": "ORG#LEVEL4-0000-0004-0000-000000000004"},
"sk": {"S": "MA#GlobalPayRails"},
"company_info": {"M": {
"name": {"S": "Global Pay Rails"},
"industry": {"S": "Payment Processing"},
"description": {"S": "Connected to Transit Gateway. Production Traffic Live."}
}},
"contacts": {"M": {
"ciso": {"S": "ciso@globalpay.com"},
"remediation": {"S": "noc@fiserv-commercial.com"}
}},
"features": {"M": {
"scoring_enabled": {"BOOL": true},
"target_maturity_level": {"N": "4"}, # Target: Network Integration
"drift_alerting_enabled": {"BOOL": true}
}}
})
}
# =============================================================================
# LEVEL 5: The "Fortress" (End State)
# Context: Service Restrictions Active. Immutable Infrastructure.
# =============================================================================
resource "aws_dynamodb_table_item" "context_l5_surety" {
table_name = aws_dynamodb_table.inventory.name
hash_key = aws_dynamodb_table.inventory.hash_key
range_key = aws_dynamodb_table.inventory.range_key
item = jsonencode({
"pk": {"S": "ORG#LEVEL5-0000-0005-0000-000000000005"},
"sk": {"S": "MA#SuretyInsuranceGroup"},
"company_info": {"M": {
"name": {"S": "Surety Insurance Group"},
"industry": {"S": "InsurTech"},
"description": {"S": "End State. Service Control Policies blocking unauthorized services."}
}},
"contacts": {"M": {
"ciso": {"S": "security@surety.com"},
"remediation": {"S": "sre-team@surety.com"}
}},
"features": {"M": {
"scoring_enabled": {"BOOL": true},
"target_maturity_level": {"N": "5"}, # Target: Service Restriction / Parity
"drift_alerting_enabled": {"BOOL": true}
}}
})
}
# =============================================================================
# INVENTORY ITEM EXAMPLE
# Context: An AWS Account associated with a merged entity
# =============================================================================
resource "aws_dynamodb_table_item" "inventory_l1_account" {
table_name = aws_dynamodb_table.inventory.name
hash_key = aws_dynamodb_table.inventory.hash_key
range_key = aws_dynamodb_table.inventory.range_key
item = jsonencode({
"pk": {"S": "MA#payfare"},
"sk": {"S": "ACCOUNT#123456789012"},
"ma_name": {"S": "PayFare"},
"account_id": {"S": "123456789012"},
"account_alias": {"S": "payfare-app1-prod"},
"provider": {"S": "AWS"},
"status": {"S": "active"},
"metadata": {"M": {
"environment": {"S": "production"},
"region": {"S": "us-east-1"}
}}
})
}
import pytest
from unittest.mock import MagicMock, patch
import json
import os
import sys
# Helper to ensure we start with a fresh module for top-level code execution
def clean_imports():
if "lambdas.inventory_crud" in sys.modules:
del sys.modules["lambdas.inventory_crud"]
@patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"})
@patch("boto3.resource")
def test_create_item(mock_boto_resource):
clean_imports()
mock_table = MagicMock()
mock_dynamodb = MagicMock()
mock_boto_resource.return_value = mock_dynamodb
mock_dynamodb.Table.return_value = mock_table
from lambdas.inventory_crud import inventory_logic
event = {
"httpMethod": "POST",
"body": json.dumps(
{
"ma_name": "PayFare",
"account_id": "123456789012",
"account_alias": "payfare-app1-prod",
"provider": "AWS"
}
),
}
response = inventory_logic(event, {})
assert response["statusCode"] == 201
body = json.loads(response["body"])
assert body["data"]["pk"] == "MA#payfare"
assert body["data"]["sk"] == "ACCOUNT#123456789012"
assert body["data"]["account_alias"] == "payfare-app1-prod"
mock_table.put_item.assert_called_once()
@patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"})
@patch("boto3.resource")
def test_get_items_by_ma(mock_boto_resource):
clean_imports()
mock_table = MagicMock()
mock_dynamodb = MagicMock()
mock_boto_resource.return_value = mock_dynamodb
mock_dynamodb.Table.return_value = mock_table
# Mock query response
mock_table.query.return_value = {
"Items": [{"pk": "MA#payfare", "sk": "ACCOUNT#123", "account_id": "123"}]
}
from lambdas.inventory_crud import inventory_logic
event = {"httpMethod": "GET", "queryStringParameters": {"ma_name": "PayFare"}}
response = inventory_logic(event, {})
assert response["statusCode"] == 200
body = json.loads(response["body"])
assert len(body["data"]) == 1
assert body["data"][0]["pk"] == "MA#payfare"
mock_table.query.assert_called_once()
@patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"})
@patch("boto3.resource")
def test_get_specific_item(mock_boto_resource):
clean_imports()
mock_table = MagicMock()
mock_dynamodb = MagicMock()
mock_boto_resource.return_value = mock_dynamodb
mock_dynamodb.Table.return_value = mock_table
# Mock get response
mock_table.get_item.return_value = {
"Item": {"pk": "MA#payfare", "sk": "ACCOUNT#123", "account_alias": "Test"}
}
from lambdas.inventory_crud import inventory_logic
event = {"httpMethod": "GET", "path": "/inventory/PayFare/123"}
response = inventory_logic(event, {})
assert response["statusCode"] == 200
body = json.loads(response["body"])
assert body["data"]["account_alias"] == "Test"
@patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"})
@patch("boto3.resource")
def test_delete_item(mock_boto_resource):
clean_imports()
mock_table = MagicMock()
mock_dynamodb = MagicMock()
mock_boto_resource.return_value = mock_dynamodb
mock_dynamodb.Table.return_value = mock_table
from lambdas.inventory_crud import inventory_logic
event = {
"httpMethod": "DELETE",
"path": "/inventory/PayFare/123",
"queryStringParameters": {} # Explicitly ensure QS is not None if code doesn't handle None
}
response = inventory_logic(event, {})
assert response["statusCode"] == 200
mock_table.delete_item.assert_called_once()
import pytest
from unittest.mock import MagicMock, patch
import json
import os
import sys
# Helper to ensure we start with a fresh module for top-level code execution
def clean_imports():
if "lambdas.onboarding" in sys.modules:
del sys.modules["lambdas.onboarding"]
@patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"})
@patch("boto3.resource")
def test_onboarding_logic_list_records(mock_boto_resource):
clean_imports()
# Setup mock DynamoDB
mock_table = MagicMock()
mock_dynamodb = MagicMock()
mock_boto_resource.return_value = mock_dynamodb
mock_dynamodb.Table.return_value = mock_table
# Mock scan response
mock_table.scan.return_value = {"Items": []}
# Import the module after mocking
from lambdas.onboarding import onboarding_logic
event = {
"httpMethod": "GET",
"path": "/onboarding",
}
# Call the handler
response = onboarding_logic(event, {})
# Assertions
assert response["statusCode"] == 200
body = json.loads(response["body"])
assert body["message"] == "Organizations retrieved successfully"
assert body["data"] == []
@patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"})
@patch("boto3.resource")
def test_onboarding_logic_create_organization(mock_boto_resource):
clean_imports()
mock_table = MagicMock()
mock_dynamodb = MagicMock()
mock_boto_resource.return_value = mock_dynamodb
mock_dynamodb.Table.return_value = mock_table
from lambdas.onboarding import onboarding_logic
event = {
"httpMethod": "POST",
"body": json.dumps(
{
"organization_name": "Fiserv",
"ma_name": "PayFare",
"industry": "Technology",
"contact_name": "John Doe",
"contact_email": "john@test.com",
"scoring_enabled": True,
}
),
}
response = onboarding_logic(event, {})
assert response["statusCode"] == 201
body = json.loads(response["body"])
assert body["pk"] == "ORG#Fiserv"
assert body["sk"] == "MA#payfare"
assert body["data"]["ma_name"] == "PayFare"
mock_table.put_item.assert_called_once()
@patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"})
@patch("boto3.resource")
def test_onboarding_logic_get_organization(mock_boto_resource):
clean_imports()
mock_table = MagicMock()
mock_dynamodb = MagicMock()
mock_boto_resource.return_value = mock_dynamodb
mock_dynamodb.Table.return_value = mock_table
# Mock query response
test_org_records = [{
"pk": "ORG#Fiserv",
"sk": "MA#payfare",
"ma_name": "PayFare",
"organization_name": "Fiserv"
}]
mock_table.query.return_value = {"Items": test_org_records}
from lambdas.onboarding import onboarding_logic
event = {
"httpMethod": "GET",
"path": "/onboarding/Fiserv",
}
response = onboarding_logic(event, {})
assert response["statusCode"] == 200
body = json.loads(response["body"])
assert len(body["data"]) == 1
assert body["data"][0]["ma_name"] == "PayFare"
@patch.dict(os.environ, {"CCM_INVENTORY_TABLE": "test-inventory"})
@patch("boto3.resource")
def test_onboarding_logic_get_specific_ma(mock_boto_resource):
clean_imports()
mock_table = MagicMock()
mock_dynamodb = MagicMock()
mock_boto_resource.return_value = mock_dynamodb
mock_dynamodb.Table.return_value = mock_table
# Mock get_item response
test_record = {
"pk": "ORG#Fiserv",
"sk": "MA#payfare",
"ma_name": "PayFare",
"organization_name": "Fiserv"
}
mock_table.get_item.return_value = {"Item": test_record}
from lambdas.onboarding import onboarding_logic
# Test path with MA name
event = {
"httpMethod": "GET",
"path": "/onboarding/Fiserv/PayFare",
}
response = onboarding_logic(event, {})
assert response["statusCode"] == 200
body = json.loads(response["body"])
assert body["data"]["ma_name"] == "PayFare"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment