Last active
May 14, 2023 08:24
-
-
Save shimo164/3de25a148e06036d5db56bcc81ffbaa2 to your computer and use it in GitHub Desktop.
This script finds and prints AWS Lambda functions executed in a specific time range using boto3 and CloudWatch Logs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| This script finds and prints AWS Lambda functions executed in a specific time range using boto3 and CloudWatch Logs. | |
| Parameters: | |
| prefix (str): Log group name prefix to filter the desired log groups. | |
| Example: '/aws/lambda' to fetch log groups for Lambda functions. | |
| filter_pattern (str): The filter pattern to apply on the logs. | |
| Example: 'START RequestId' to find logs that contain the string 'START RequestId'. | |
| end_datetime (datetime): The end datetime of the time range in which you want to search for executed Lambda functions. | |
| Example: datetime(2023, 4, 27, 19, 10, 0, 0) for April 27, 2023 at 19:10:00. | |
| range_minutes (int): The duration in minutes to search for executed Lambda functions before the end_datetime. | |
| Example: 10 to search for Lambda functions executed in the 10 minutes before end_datetime. | |
| timezone (str, optional): The timezone to be used when calculating the time range. Defaults to "UTC". | |
| Example: "Asia/Tokyo" for Tokyo timezone. | |
| The script will: | |
| 1. Get the log groups with the given prefix. | |
| 2. Retrieve the events from these log groups within the specified time range. | |
| 3. Filter the events based on the provided filter pattern. | |
| 4. Print the log group name and the number of events that match the filter pattern. | |
| Usage: | |
| Ensure you have the AWS SDK for Python (Boto3) installed and properly configured with your AWS credentials before running the script. | |
| Run the script using `python <script_name>.py`. | |
| """ | |
| from datetime import datetime, timedelta | |
| from zoneinfo import ZoneInfo | |
| import boto3 | |
| def get_time_range(end_datetime, minutes, timezone="UTC"): | |
| tz = ZoneInfo(timezone) | |
| time_end = end_datetime.astimezone(tz) | |
| time_start = time_end - timedelta(minutes=minutes) | |
| return time_start, time_end | |
| def print_header(time_start, time_end): | |
| print(f'Start. {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n===') | |
| print("From", time_start) | |
| print("To", time_end) | |
| def print_events(group, events): | |
| print("---") | |
| print(group["logGroupName"]) | |
| print(len(events), "events") | |
| def retrieve_events(log_group_name, start_time, end_time, filter_pattern): | |
| client = boto3.client("logs") | |
| next_token = "" | |
| response = {} | |
| kwargs = { | |
| "logGroupName": log_group_name, | |
| "startTime": start_time, | |
| "endTime": end_time, | |
| "filterPattern": filter_pattern, | |
| } | |
| while True: | |
| if next_token: | |
| kwargs["nextToken"] = next_token | |
| response = client.filter_log_events(**kwargs) | |
| if "nextToken" in response: | |
| next_token = response["nextToken"] | |
| if len(response["events"]) > 0: | |
| yield response["events"] | |
| else: | |
| break | |
| def get_log_groups(prefix): | |
| client = boto3.client("logs") | |
| next_token = "" | |
| response = {} | |
| kwargs = {"logGroupNamePrefix": prefix} | |
| while True: | |
| if next_token: | |
| kwargs["nextToken"] = next_token | |
| response = client.describe_log_groups(**kwargs) | |
| if "nextToken" in response: | |
| next_token = response["nextToken"] | |
| yield response["logGroups"] | |
| else: | |
| yield response["logGroups"] | |
| break | |
| def main(): | |
| prefix = "/aws/lambda" | |
| filter_pattern = "START RequestId" | |
| end_datetime = datetime(2023, 4, 30, 21, 10, 0, 0) | |
| range_minutes = 60 | |
| timezone = "Asia/Tokyo" | |
| time_start, time_end = get_time_range(end_datetime, range_minutes, timezone) | |
| time_start_unix = int(time_start.timestamp() * 1000) | |
| time_end_unix = int(time_end.timestamp() * 1000) | |
| print_header(time_start, time_end) | |
| groups = [group for group_list in get_log_groups(prefix) for group in group_list] | |
| print(f"Check {len(groups)} log groups... wait...") | |
| for group in groups: | |
| for events in retrieve_events( | |
| group["logGroupName"], time_start_unix, time_end_unix, filter_pattern | |
| ): | |
| if events: | |
| print_events(group, events) | |
| print("===\nFinished. ", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment