Created
August 15, 2025 04:17
-
-
Save Malayke/434a0b2a1c4946f8540a8b376f4c6b82 to your computer and use it in GitHub Desktop.
aws secrets manager auto rotate lambda function
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/bash | |
| # put policy.json to location secretsManager/policy.json | |
| # Create inline policy directly on the role | |
| aws iam put-role-policy \ | |
| --role-name $ROLE_NAME \ | |
| --policy-name SecretsManagerAccess \ | |
| --policy-document file://secretsManager/policy.json | |
| # Extract values from your JSON file | |
| LAMBDA_FUNCTION_NAME="LAMBDA_FUNCTION_NAME" | |
| SECRET_ARN="arn:aws:secretsmanager:REGION:ACCOUNT_ID:secret:SECRET_NAME" | |
| # Apply the permission using the values from your JSON policy | |
| aws lambda add-permission \ | |
| --function-name $LAMBDA_FUNCTION_NAME \ | |
| --statement-id SecretsManagerInvokePermission \ | |
| --action lambda:InvokeFunction \ | |
| --principal secretsmanager.amazonaws.com \ | |
| --source-arn $SECRET_ARN |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "Version": "2012-10-17", | |
| "Id": "default", | |
| "Statement": [ | |
| { | |
| "Sid": "SecretsManagerInvokePermission", | |
| "Effect": "Allow", | |
| "Principal": { | |
| "Service": "secretsmanager.amazonaws.com" | |
| }, | |
| "Action": "lambda:InvokeFunction", | |
| "Resource": "arn:aws:lambda:us-west-1:588656319433:function:secretManagerRotation", | |
| "Condition": { | |
| "StringEquals": { | |
| "AWS:SourceArn": "arn:aws:secretsmanager:REGION:ACCOUNT_ID:secret:SECRET_NAME" | |
| } | |
| } | |
| } | |
| ] | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
| # SPDX-License-Identifier: MIT-0 | |
| import boto3 | |
| import logging | |
| import os | |
| logger = logging.getLogger() | |
| logger.setLevel(logging.INFO) | |
| def lambda_handler(event, context): | |
| """Secrets Manager Rotation Template | |
| This is a template for creating an AWS Secrets Manager rotation lambda | |
| Args: | |
| event (dict): Lambda dictionary of event parameters. These keys must include the following: | |
| - SecretId: The secret ARN or identifier | |
| - ClientRequestToken: The ClientRequestToken of the secret version | |
| - Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret) | |
| context (LambdaContext): The Lambda runtime information | |
| Raises: | |
| ResourceNotFoundException: If the secret with the specified arn and stage does not exist | |
| ValueError: If the secret is not properly configured for rotation | |
| KeyError: If the event parameters do not contain the expected keys | |
| """ | |
| arn = event['SecretId'] | |
| token = event['ClientRequestToken'] | |
| step = event['Step'] | |
| logger.info(f"SecretId: {arn}, ClientRequestToken: {token}, Step: {step}") | |
| # Setup the client | |
| service_client = boto3.client('secretsmanager') | |
| # Make sure the version is staged correctly | |
| metadata = service_client.describe_secret(SecretId=arn) | |
| if not metadata['RotationEnabled']: | |
| logger.error("Secret %s is not enabled for rotation" % arn) | |
| raise ValueError("Secret %s is not enabled for rotation" % arn) | |
| versions = metadata['VersionIdsToStages'] | |
| if token not in versions: | |
| logger.error("Secret version %s has no stage for rotation of secret %s." % (token, arn)) | |
| raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn)) | |
| if "AWSCURRENT" in versions[token]: | |
| logger.info("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn)) | |
| return | |
| elif "AWSPENDING" not in versions[token]: | |
| logger.error("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) | |
| raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) | |
| if step == "createSecret": | |
| create_secret(service_client, arn, token) | |
| elif step == "setSecret": | |
| set_secret(service_client, arn, token) | |
| elif step == "testSecret": | |
| test_secret(service_client, arn, token) | |
| elif step == "finishSecret": | |
| finish_secret(service_client, arn, token) | |
| else: | |
| raise ValueError("Invalid step parameter") | |
| def create_secret(service_client, arn, token): | |
| """Create the secret | |
| This method first checks if a secret exists by calling get_secret_value with the passed-in ClientRequestToken. | |
| If there's no secret, it creates a new secret version with the token as the VersionId. Then it generates a new | |
| secret value with get_random_password. Next it calls put_secret_value to store it with the staging label AWSPENDING. | |
| Args: | |
| service_client (client): The secrets manager service client | |
| arn (string): The secret ARN or other identifier | |
| token (string): The ClientRequestToken associated with the secret version | |
| Raises: | |
| ResourceNotFoundException: If the secret with the specified arn and stage does not exist | |
| """ | |
| # Make sure the current secret exists | |
| try: | |
| service_client.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT") | |
| except service_client.exceptions.ResourceNotFoundException: | |
| logger.error("createSecret: Secret %s does not exist." % arn) | |
| raise | |
| # Now try to get the secret version with AWSPENDING stage, if that fails, put a new secret | |
| try: | |
| service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING") | |
| logger.info("createSecret: Successfully retrieved AWSPENDING secret for %s." % arn) | |
| except service_client.exceptions.ResourceNotFoundException: | |
| # Get exclude characters from environment variable - ensure valid characters for the database/service | |
| exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\' | |
| # Generate a random password with excluded characters to ensure compatibility | |
| passwd = service_client.get_random_password(ExcludeCharacters=exclude_characters) | |
| # Put the secret with AWSPENDING staging label for idempotency | |
| service_client.put_secret_value( | |
| SecretId=arn, | |
| ClientRequestToken=token, | |
| SecretString=passwd['RandomPassword'], | |
| VersionStages=['AWSPENDING'] | |
| ) | |
| logger.info("createSecret: Successfully put secret for ARN %s and version %s." % (arn, token)) | |
| def set_secret(service_client, arn, token): | |
| """Set the secret | |
| This method changes the credentials in the database or service to match the new secret value in the AWSPENDING version. | |
| It includes security checks to prevent confused deputy attacks by validating that AWSCURRENT and AWSPENDING | |
| credentials are for the same resource. | |
| Args: | |
| service_client (client): The secrets manager service client | |
| arn (string): The secret ARN or other identifier | |
| token (string): The ClientRequestToken associated with the secret version | |
| Raises: | |
| ValueError: If security validation fails | |
| ResourceNotFoundException: If the secret versions don't exist | |
| """ | |
| logger.info("setSecret: Setting secret for ARN %s with token %s." % (arn, token)) | |
| def test_secret(service_client, arn, token): | |
| """Test the secret | |
| This method tests the AWSPENDING version of the secret by using it to access the database or service. | |
| Rotation functions based on templates test the new secret by using read access to validate that | |
| the user can login with the new password and has the expected permissions. | |
| Args: | |
| service_client (client): The secrets manager service client | |
| arn (string): The secret ARN or other identifier | |
| token (string): The ClientRequestToken associated with the secret version | |
| Raises: | |
| ValueError: If the secret test fails | |
| ResourceNotFoundException: If the AWSPENDING secret doesn't exist | |
| """ | |
| logger.info("testSecret: Testing secret for ARN %s with token %s." % (arn, token)) | |
| def finish_secret(service_client, arn, token): | |
| """Finish the secret | |
| This method finalizes the rotation process by moving the AWSCURRENT label from the previous secret version | |
| to the new version. This single API call also removes the AWSPENDING label. Secrets Manager automatically | |
| adds the AWSPREVIOUS staging label to the previous version to retain the last known good version. | |
| Args: | |
| service_client (client): The secrets manager service client | |
| arn (string): The secret ARN or other identifier | |
| token (string): The ClientRequestToken associated with the secret version | |
| Raises: | |
| ResourceNotFoundException: If the secret with the specified arn does not exist | |
| ValueError: If the token version doesn't have AWSPENDING stage | |
| """ | |
| # First describe the secret to get the current version and validate stages | |
| metadata = service_client.describe_secret(SecretId=arn) | |
| versions = metadata["VersionIdsToStages"] | |
| # Validate that the token version has AWSPENDING stage | |
| if token not in versions: | |
| logger.error("finishSecret: Version %s not found in secret %s" % (token, arn)) | |
| raise ValueError("Version %s not found in secret" % token) | |
| if "AWSPENDING" not in versions[token]: | |
| logger.error("finishSecret: Version %s does not have AWSPENDING stage" % token) | |
| raise ValueError("Version %s does not have AWSPENDING stage" % token) | |
| # Check if this version is already AWSCURRENT | |
| if "AWSCURRENT" in versions[token]: | |
| logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (token, arn)) | |
| return | |
| # Find the current version that has AWSCURRENT stage | |
| current_version = None | |
| for version in versions: | |
| if "AWSCURRENT" in versions[version]: | |
| current_version = version | |
| logger.info("finishSecret: Found current version %s with AWSCURRENT stage" % version) | |
| break | |
| if current_version is None: | |
| logger.error("finishSecret: No version found with AWSCURRENT stage") | |
| raise ValueError("No version found with AWSCURRENT stage") | |
| # Move AWSCURRENT stage to the new version and remove AWSPENDING | |
| # This single API call: | |
| # 1. Moves AWSCURRENT from current_version to token version | |
| # 2. Removes AWSPENDING from token version | |
| # 3. Secrets Manager automatically adds AWSPREVIOUS to the previous current_version | |
| logger.info("finishSecret: Moving AWSCURRENT from version %s to version %s" % (current_version, token)) | |
| service_client.update_secret_version_stage( | |
| SecretId=arn, | |
| VersionStage="AWSCURRENT", | |
| MoveToVersionId=token, | |
| RemoveFromVersionId=current_version | |
| ) | |
| logger.info("finishSecret: Successfully moved AWSCURRENT stage to version %s for secret %s." % (token, arn)) | |
| logger.info("finishSecret: AWSPENDING label removed from version %s and AWSPREVIOUS automatically added to version %s" % (token, current_version)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "Version": "2012-10-17", | |
| "Statement": [ | |
| { | |
| "Effect": "Allow", | |
| "Action": [ | |
| "secretsmanager:DescribeSecret", | |
| "secretsmanager:GetSecretValue", | |
| "secretsmanager:PutSecretValue", | |
| "secretsmanager:UpdateSecretVersionStage" | |
| ], | |
| "Resource": "arn:aws:secretsmanager:REGION:ACCOUNT_ID:secret:SECRET_NAME" | |
| }, | |
| { | |
| "Effect": "Allow", | |
| "Action": [ | |
| "secretsmanager:GetRandomPassword" | |
| ], | |
| "Resource": "*" | |
| } | |
| ] | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment