diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eb347b5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +# Python +__pycache__ +.venv +.pytest_cache + +# SAM +samconfig.toml +.aws-sam + +# Other ignores +hello_world_function +README.md diff --git a/adjust_schedule_function/adjust_schedule/__init__.py b/adjust_schedule_function/adjust_schedule/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/adjust_schedule_function/adjust_schedule/app.py b/adjust_schedule_function/adjust_schedule/app.py new file mode 100644 index 0000000..c29ff1a --- /dev/null +++ b/adjust_schedule_function/adjust_schedule/app.py @@ -0,0 +1,84 @@ +import boto3 +from lib.autoscaling import AutoScalingClient +from lib.events import EventBus +from lib.recurrence import RecurrenceCalculator +from lib import utils +import logging +import os + +# The tag that determines whether the ASG should be processed by this script. +ENABLED_TAG = 'scheduled-scaling-adjuster:enabled' + +# The prefix of the tag that determines the local time at which the scaling +# policy is expected to run. +LOCAL_TIME_TAG_PREFIX = 'scheduled-scaling-adjuster:local-time:' + +# The tag that determines the timezone of the local time. +LOCAL_TIMEZONE_TAG = 'scheduled-scaling-adjuster:local-timezone' + +asg_client = AutoScalingClient(boto3.client('autoscaling')) +bus = EventBus(boto3.client('events')) +recurrence_calculator = RecurrenceCalculator() + + +def process_asg(asg, local_timezone): + asg_name = asg['AutoScalingGroupName'] + scheduled_actions = asg_client.get_asg_scheduled_actions(asg_name=asg_name) + scheduled_action_updates = [] + + for action in scheduled_actions['ScheduledUpdateGroupActions']: + action_name = action['ScheduledActionName'] + current_recurrence = action['Recurrence'] + + local_time = utils.get_tag_by_key(asg['Tags'], LOCAL_TIME_TAG_PREFIX + action_name) + if not local_time: + print("Skipping: action '{}' does not have local time tag (missing tag '{}')".format(action_name, LOCAL_TIME_TAG_PREFIX + action_name)) + continue + + print("Processing action '{}'".format(action_name)) + + correct_recurrence = recurrence_calculator.calculate_recurrence(action, local_time, local_timezone) + if correct_recurrence != current_recurrence: + print("Calculated recurrence '{}' does not match current recurrence '{}'. This action will be updated.".format(correct_recurrence, current_recurrence)) + scheduled_action_updates.append({ + 'ScheduledActionName': action_name, + 'Recurrence': correct_recurrence, + # Need to specify one of min, max or desired + 'DesiredCapacity': action['DesiredCapacity'] + }) + + if not len(scheduled_action_updates): + print("No scheduled actions need to be updated for ASG '{}'".format(asg_name)) + else: + print("There are actions which need to be updated. Updating them now.") + update_response = asg_client.update_asg_scheduled_actions(asg_name, + scheduled_action_updates) + + if len(update_response['FailedScheduledUpdateGroupActions']): + print(update_response['FailedScheduledUpdateGroupActions']) + raise Exception('{} actions failed to update'.format(len(update_response['FailedScheduledUpdateGroupActions']))) + + +def lambda_handler(event, context): + for asg in asg_client.get_asgs()['AutoScalingGroups']: + asg_name = asg['AutoScalingGroupName'] + + if not utils.get_tag_by_key(asg['Tags'], ENABLED_TAG): + print("Skipping: ASG '{}' is not enabled (missing tag '{}')".format(asg_name, ENABLED_TAG)) + continue + + local_timezone = utils.get_tag_by_key(asg['Tags'], LOCAL_TIMEZONE_TAG) + if not local_timezone: + print("Skipping: ASG '{}' has no timezone defined (missing tag '{}')".format(asg_name, LOCAL_TIMEZONE_TAG)) + continue + + try: + print("Processing ASG '{}'".format(asg_name)) + process_asg(asg, local_timezone) + print("ASG '{}' has been processed successfully".format(asg_name)) + except: + print("ASG '{}' failed to be processed".format(asg_name)) + raise + + print(bus.emit_process_completed({'dummy': 'payload'})) + print("All ASGs have been processed. No further work to do.") diff --git a/adjust_schedule_function/lib/__init__.py b/adjust_schedule_function/lib/__init__.py new file mode 100644 index 0000000..77479f7 --- /dev/null +++ b/adjust_schedule_function/lib/__init__.py @@ -0,0 +1,7 @@ +# coding: utf-8 + +from __future__ import absolute_import + +from lib import events +from lib import recurrence +from lib import utils diff --git a/adjust_schedule_function/lib/autoscaling.py b/adjust_schedule_function/lib/autoscaling.py new file mode 100644 index 0000000..a5e20d9 --- /dev/null +++ b/adjust_schedule_function/lib/autoscaling.py @@ -0,0 +1,19 @@ +class AutoScalingClient(object): + + def __init__(self, client): + self._client = client + + def get_asgs(self): + # To-Do: use paginators instead + return self._client.describe_auto_scaling_groups() + + def get_asg_scheduled_actions(self, asg_name): + return self._client.describe_scheduled_actions( + AutoScalingGroupName=asg_name + ) + + def update_asg_scheduled_actions(self, asg_name, action_updates): + return self._client.batch_put_scheduled_update_group_action( + AutoScalingGroupName=asg_name, + ScheduledUpdateGroupActions=action_updates + ) diff --git a/adjust_schedule_function/lib/events.py b/adjust_schedule_function/lib/events.py new file mode 100644 index 0000000..f542610 --- /dev/null +++ b/adjust_schedule_function/lib/events.py @@ -0,0 +1,23 @@ +import json + +class Event: + PROCESS_COMPLETED = 'ProcessCompleted' + +class EventBus: + SOURCE = 'scheduled-scaling-adjuster' + BUS_NAME = 'default' + + def __init__(self, eventbridge_client): + self._eventbridge_client = eventbridge_client + + def emit_process_completed(self, data): + return self._eventbridge_client.put_events( + Entries=[ + { + 'Source': self.SOURCE, + 'DetailType': Event.PROCESS_COMPLETED, + 'Detail': json.dumps(data), + 'EventBusName': self.BUS_NAME + } + ] + ) diff --git a/adjust_schedule_function/lib/recurrence.py b/adjust_schedule_function/lib/recurrence.py new file mode 100644 index 0000000..b1b168d --- /dev/null +++ b/adjust_schedule_function/lib/recurrence.py @@ -0,0 +1,134 @@ +from crontab import CronTab +from dateutil import parser +from datetime import datetime, timedelta +import pytz +import re + +def parse_cron_expression(expression): + pattern = r'^([^\s]+)\s+([^\s]+)((?:\s+[^\s]+){3,4})$' + match = re.match(pattern, expression) + + if not match: + raise Exception("String '{}' is not a valid cron expression".format(expression)) + + return { + 'minute': match.group(1), + 'hour': match.group(2), + 'rest': match.group(3).strip() + } + + +class MockTimeSource: + """A time source that allows mocking the current date and time.""" + def __init__(self, dt): + self._dt = dt + + def get_current_utc_datetime(self): + return self._dt + + +class DefaultTimeSource: + """A time source that uses the standard datetime module.""" + def get_current_utc_datetime(self): + return pytz.utc.localize(datetime.utcnow()) + + +class RecurrenceCalculator: + """A recurrence calculator for Auto Scaling Group scheduled scaling + actions. + """ + def __init__(self, time_source=None): + if not time_source: + self._time_source = DefaultTimeSource() + else: + self._time_source = time_source + + def calculate_recurrence(self, scheduled_action, expected_time, timezone): + """Calculates the correct recurrence expression for the given scheduled + scaling action, expected local time and local timezone. + + The recurrence must be defined as a cron expression. + + If the action's recurrence is not selective on the hour, or if the + action's next run will occur in more than a day in the future, this + method will return the current recurrence. + + Args: + scheduled_action: A dictionary with the scheduled action + definition, as returned by boto3. + expected_time: A string with the local time at which the action is + expected to run. + timezone: The name of the timezone (e.g., 'Europe/Madrid') of the + local time. + + Returns: + A string with the appropriate recurrence, formatted as a cron + expression. + + Raises: + NotImplementedError: The original recurrence contains anything + other than single hours or minutes (e.g., ranges). These are + currently not supported by this implementation.""" + + parsed_recurrence = parse_cron_expression(scheduled_action['Recurrence']) + + # For the time being, we don't handle cron expressions which specify + # anything but a specific hour and minute (i.e., ranges, multiple + # hours, etc.). This is a feature that will be implemented in the + # future. + if not re.match(r'^\d+$', parsed_recurrence['hour']): + raise NotImplementedError("This script cannot yet handle multiple hours in cron expressions: '{}'".format(scheduled_action['Recurrence'])) + if not re.match(r'^\d+$', parsed_recurrence['minute']): + raise NotImplementedError("This script cannot yet handle multiple minutes in cron expressions: '{}'".format(scheduled_action['Recurrence'])) + + # If the action's cron expression is not selective on the hour, it does not + # make sense to keep going. + if parsed_recurrence['hour'] == '*': + print("Action '{}' has a cron expression ('{}') which is not selective on the hour. Leaving recurrence as is.".format(scheduled_action['ScheduledActionName'], scheduled_action['Recurrence'])) + return scheduled_action['Recurrence'] + + utc_now = self._time_source.get_current_utc_datetime() + + # If the action's start date is over a day in the future, skip it. (This + # decision might not belong to this function though.) + if (scheduled_action['StartTime'] - utc_now).days > 1: + print("Action '{}' next run is over a day away. Leaving recurrence as is.".format(scheduled_action['ScheduledActionName'])) + return scheduled_action['Recurrence'] + + # Determine when the action will run next, and compare the time with + # the expected local time at the specified timezone. If they match, + # then we're all good. If they don't, we need to update the recurrence. + # + # Note that we're adding one extra second to the time delta, to account + # for precision errors which might produce incorrect results. (See for + # example when the expected time is 14:00:00 and the delta causes us to + # see 13:59:59.998). This is pretty hacky and I should revisit this, + # for sure. + + recurrence = CronTab(scheduled_action['Recurrence']) + utc_now = pytz.utc.localize(datetime.utcnow()) + delta = timedelta(seconds = recurrence.next(default_utc=True) + 1) + utc_next_run = utc_now + delta + local_next_run = utc_next_run.astimezone(pytz.timezone(timezone)) + local_next_run_time = local_next_run.strftime('%H:%M') + + print("This action should run at '{}' local time. The next run will occur at '{}', which is '{}' at specified local timezone '{}'.".format(expected_time, utc_next_run.isoformat(), local_next_run_time, timezone)) + + if local_next_run_time == expected_time: + print("Times match. Current recurrence is correct.") + return scheduled_action['Recurrence'] + + print("Times don't match. Current recurrence must be recalculated.") + local_expected_run = local_next_run.replace(hour=parser.parse(expected_time).hour, + minute=parser.parse(expected_time).minute) + utc_expected_run = local_expected_run.astimezone(pytz.timezone('UTC')) + + # We should only change the hour and minute parts of the cron + # expression. The rest should be left as it was originally. The reason + # we're changing the minutes too is because some timezones don't have + # whole offsets. E.g., see "Indian Standard Time". + new_recurrence = '{} {} {}'.format(utc_expected_run.minute, + utc_expected_run.hour, + parsed_recurrence['rest']) + + return new_recurrence diff --git a/adjust_schedule_function/lib/utils.py b/adjust_schedule_function/lib/utils.py new file mode 100644 index 0000000..900469b --- /dev/null +++ b/adjust_schedule_function/lib/utils.py @@ -0,0 +1,5 @@ +def get_tag_by_key(tags, key): + tags = list(filter(lambda t: t['Key'] == key, tags)) + if len(tags): + return tags[0]['Value'] + return None diff --git a/adjust_schedule_function/requirements.txt b/adjust_schedule_function/requirements.txt new file mode 100644 index 0000000..3b2346e --- /dev/null +++ b/adjust_schedule_function/requirements.txt @@ -0,0 +1,4 @@ +boto3==1.16.4 +crontab==0.22.9 +python-dateutil==2.8.1 +pytz==2020.1 diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..51ae4b7 --- /dev/null +++ b/conftest.py @@ -0,0 +1,3 @@ +import sys, os + +sys.path.insert(0, os.path.abspath("adjust_schedule_function")) diff --git a/events/event.json b/events/event.json new file mode 100644 index 0000000..321b013 --- /dev/null +++ b/events/event.json @@ -0,0 +1,13 @@ +{ + "version": "0", + "id": "3d956a44-5259-406c-c9e6-ea038c7877e6", + "detail-type": "Scheduled Event", + "source": "aws.events", + "account": "012345678901", + "time": "2020-10-21T17:54:15Z", + "region": "eu-west-1", + "resources": [ + "arn:aws:events:eu-west-1:012345678901:rule/scheduled" + ], + "detail": {} +} diff --git a/send-event.sh b/send-event.sh new file mode 100755 index 0000000..cbd6069 --- /dev/null +++ b/send-event.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +aws events put-events \ + --entries '[ + { + "EventBusName": "default", + "Source": "scheduled-scaling-adjuster", + "DetailType": "ManualTrigger", + "Detail": "{}" + } + ]' | jq . diff --git a/template.yaml b/template.yaml new file mode 100644 index 0000000..35a54ec --- /dev/null +++ b/template.yaml @@ -0,0 +1,102 @@ +AWSTemplateFormatVersion: '2010-09-09' + +Transform: AWS::Serverless-2016-10-31 + +Description: > + scheduled-scaling-adjuster + + This solution adjusts the schedules of scheduled Auto Scaling Group scaling + actions to account for hour changes due to Daylight Saving Time. + +Parameters: + NotificationEmail: + Type: String + Description: The email address that should be notified when relevant events + occur. + +Globals: + Function: + Timeout: 10 + +Resources: + AdjustScheduleFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: adjust_schedule_function + Handler: adjust_schedule/app.lambda_handler + Runtime: python3.7 + Policies: + - Version: '2012-10-17' + Statement: + - Effect: 'Allow' + Action: + - 'autoscaling:DescribeAutoScalingGroups' + - 'autoscaling:DescribeScheduledActions' + - 'autoscaling:BatchPutScheduledUpdateGroupAction' + Resource: '*' + - EventBridgePutEventsPolicy: + EventBusName: default + + Trigger: + Type: AWS::Events::Rule + Properties: + EventPattern: + source: + - scheduled-scaling-adjuster + detail-type: + - 'ManualTrigger' + ScheduleExpression: cron(0 0,12 * * ? *) + State: ENABLED + Targets: + - Arn: !GetAtt AdjustScheduleFunction.Arn + Id: adjust-schedule-lambda-function + + TriggerPermission: + Type: AWS::Lambda::Permission + Properties: + Action: lambda:InvokeFunction + FunctionName: !GetAtt AdjustScheduleFunction.Arn + Principal: events.amazonaws.com + SourceArn: !GetAtt Trigger.Arn + + # + # EventBridge subscription to notify about a completion via SNS + # + CompletionNotificationTopic: + Type: AWS::SNS::Topic + Properties: + Subscription: + - Endpoint: !Ref NotificationEmail + Protocol: email + + CompletionNotificationTopicPolicy: + Type: AWS::SNS::TopicPolicy + Properties: + PolicyDocument: + Id: 'CompletionNotificationTopicPolicy' + Version: '2012-10-17' + Statement: + - Sid: 'AllowPublishingFromEventBridge' + Effect: 'Allow' + Principal: + Service: 'events.amazonaws.com' + Action: 'sns:Publish' + Resource: !Ref CompletionNotificationTopic + Topics: + - !Ref CompletionNotificationTopic + + NotifyOnCompletion: + Type: AWS::Events::Rule + Properties: + EventPattern: + source: + - scheduled-scaling-adjuster + detail-type: + - 'ProcessCompleted' + State: ENABLED + Targets: + - Arn: !Ref CompletionNotificationTopic + Id: completion-notification-sns-topic + # + # End of subscription + # diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 0000000..8832db9 --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,5 @@ +requests +six +regex +pytest +pytest-mock \ No newline at end of file diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py new file mode 100644 index 0000000..0c3b61d --- /dev/null +++ b/tests/unit/test_events.py @@ -0,0 +1,33 @@ +import boto3 +from botocore.stub import Stubber +from lib.events import EventBus +import pytest + + +def test_emit_process_completed(): + mocked_response = { + 'FailedEntryCount': 0, + 'Entries': [ + { + 'EventId': '00000000-0000-0000-0000-000000000000' + } + ] + } + expected_params = { + 'Entries': [ + { + 'Source': 'scheduled-scaling-adjuster', + 'DetailType': 'ProcessCompleted', + 'Detail': '{"foo": "bar"}', + 'EventBusName': 'default' + } + ] + } + eventbridge_client = boto3.client('events') + stubber = Stubber(eventbridge_client) + stubber.add_response('put_events', mocked_response, expected_params) + bus = EventBus(eventbridge_client) + + with stubber: + response = bus.emit_process_completed({'foo': 'bar'}) + assert response == mocked_response diff --git a/tests/unit/test_recurrence.py b/tests/unit/test_recurrence.py new file mode 100644 index 0000000..de017b3 --- /dev/null +++ b/tests/unit/test_recurrence.py @@ -0,0 +1,32 @@ +import pytest +from lib import recurrence + + +def get_valid_expressions(): + return [ + ('30 15 * * * *', {'minute': '30', 'hour': '15', 'rest': '* * * *'}), + ('30 15 * * *', {'minute': '30', 'hour': '15', 'rest': '* * *'}) + ] + + +def get_invalid_expressions(): + return [ + '30 15 * *', + '30 15 * * * * *', + 'foo', + None, + False + ] + + +@pytest.mark.parametrize('expression,expected', get_valid_expressions()) +def test_parse_cron_expression(expression, expected): + parsed = recurrence.parse_cron_expression(expression) + + assert parsed == expected + + +@pytest.mark.parametrize('expression', get_invalid_expressions()) +def test_parse_cron_expression_with_invalid_expression(expression): + with pytest.raises(Exception): + recurrence.parse_cron_expression(expression) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py new file mode 100644 index 0000000..d7ec058 --- /dev/null +++ b/tests/unit/test_utils.py @@ -0,0 +1,9 @@ +import pytest + +from lib import utils + +def test_get_tag_by_key(): + tags = [{'Key': 'foo', 'Value': 'bar'}, {'Key': 'baz', 'Value': 'quux'}] + + assert utils.get_tag_by_key(tags, 'foo') == 'bar' + assert utils.get_tag_by_key(tags, 'nope') == None