Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosafonso committed Oct 25, 2020
0 parents commit b337932
Show file tree
Hide file tree
Showing 18 changed files with 496 additions and 0 deletions.
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Python
__pycache__
.venv
.pytest_cache

# SAM
samconfig.toml
.aws-sam

# Other ignores
hello_world_function
README.md
Empty file.
84 changes: 84 additions & 0 deletions adjust_schedule_function/adjust_schedule/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import boto3
from lib.autoscaling import AutoScalingClient
from lib.events import EventBus
from lib.recurrence import RecurrenceCalculator
from lib import utils
import logging
import os

# The tag that determines whether the ASG should be processed by this script.
ENABLED_TAG = 'scheduled-scaling-adjuster:enabled'

# The prefix of the tag that determines the local time at which the scaling
# policy is expected to run.
LOCAL_TIME_TAG_PREFIX = 'scheduled-scaling-adjuster:local-time:'

# The tag that determines the timezone of the local time.
LOCAL_TIMEZONE_TAG = 'scheduled-scaling-adjuster:local-timezone'

asg_client = AutoScalingClient(boto3.client('autoscaling'))
bus = EventBus(boto3.client('events'))
recurrence_calculator = RecurrenceCalculator()


def process_asg(asg, local_timezone):
asg_name = asg['AutoScalingGroupName']
scheduled_actions = asg_client.get_asg_scheduled_actions(asg_name=asg_name)
scheduled_action_updates = []

for action in scheduled_actions['ScheduledUpdateGroupActions']:
action_name = action['ScheduledActionName']
current_recurrence = action['Recurrence']

local_time = utils.get_tag_by_key(asg['Tags'], LOCAL_TIME_TAG_PREFIX + action_name)
if not local_time:
print("Skipping: action '{}' does not have local time tag (missing tag '{}')".format(action_name, LOCAL_TIME_TAG_PREFIX + action_name))
continue

print("Processing action '{}'".format(action_name))

correct_recurrence = recurrence_calculator.calculate_recurrence(action, local_time, local_timezone)
if correct_recurrence != current_recurrence:
print("Calculated recurrence '{}' does not match current recurrence '{}'. This action will be updated.".format(correct_recurrence, current_recurrence))
scheduled_action_updates.append({
'ScheduledActionName': action_name,
'Recurrence': correct_recurrence,
# Need to specify one of min, max or desired
'DesiredCapacity': action['DesiredCapacity']
})

if not len(scheduled_action_updates):
print("No scheduled actions need to be updated for ASG '{}'".format(asg_name))
else:
print("There are actions which need to be updated. Updating them now.")
update_response = asg_client.update_asg_scheduled_actions(asg_name,
scheduled_action_updates)

if len(update_response['FailedScheduledUpdateGroupActions']):
print(update_response['FailedScheduledUpdateGroupActions'])
raise Exception('{} actions failed to update'.format(len(update_response['FailedScheduledUpdateGroupActions'])))


def lambda_handler(event, context):
for asg in asg_client.get_asgs()['AutoScalingGroups']:
asg_name = asg['AutoScalingGroupName']

if not utils.get_tag_by_key(asg['Tags'], ENABLED_TAG):
print("Skipping: ASG '{}' is not enabled (missing tag '{}')".format(asg_name, ENABLED_TAG))
continue

local_timezone = utils.get_tag_by_key(asg['Tags'], LOCAL_TIMEZONE_TAG)
if not local_timezone:
print("Skipping: ASG '{}' has no timezone defined (missing tag '{}')".format(asg_name, LOCAL_TIMEZONE_TAG))
continue

try:
print("Processing ASG '{}'".format(asg_name))
process_asg(asg, local_timezone)
print("ASG '{}' has been processed successfully".format(asg_name))
except:
print("ASG '{}' failed to be processed".format(asg_name))
raise

print(bus.emit_process_completed({'dummy': 'payload'}))
print("All ASGs have been processed. No further work to do.")
7 changes: 7 additions & 0 deletions adjust_schedule_function/lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# coding: utf-8

from __future__ import absolute_import

from lib import events
from lib import recurrence
from lib import utils
19 changes: 19 additions & 0 deletions adjust_schedule_function/lib/autoscaling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class AutoScalingClient(object):

def __init__(self, client):
self._client = client

def get_asgs(self):
# To-Do: use paginators instead
return self._client.describe_auto_scaling_groups()

def get_asg_scheduled_actions(self, asg_name):
return self._client.describe_scheduled_actions(
AutoScalingGroupName=asg_name
)

def update_asg_scheduled_actions(self, asg_name, action_updates):
return self._client.batch_put_scheduled_update_group_action(
AutoScalingGroupName=asg_name,
ScheduledUpdateGroupActions=action_updates
)
23 changes: 23 additions & 0 deletions adjust_schedule_function/lib/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import json

class Event:
PROCESS_COMPLETED = 'ProcessCompleted'

class EventBus:
SOURCE = 'scheduled-scaling-adjuster'
BUS_NAME = 'default'

def __init__(self, eventbridge_client):
self._eventbridge_client = eventbridge_client

def emit_process_completed(self, data):
return self._eventbridge_client.put_events(
Entries=[
{
'Source': self.SOURCE,
'DetailType': Event.PROCESS_COMPLETED,
'Detail': json.dumps(data),
'EventBusName': self.BUS_NAME
}
]
)
134 changes: 134 additions & 0 deletions adjust_schedule_function/lib/recurrence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from crontab import CronTab
from dateutil import parser
from datetime import datetime, timedelta
import pytz
import re

def parse_cron_expression(expression):
pattern = r'^([^\s]+)\s+([^\s]+)((?:\s+[^\s]+){3,4})$'
match = re.match(pattern, expression)

if not match:
raise Exception("String '{}' is not a valid cron expression".format(expression))

return {
'minute': match.group(1),
'hour': match.group(2),
'rest': match.group(3).strip()
}


class MockTimeSource:
"""A time source that allows mocking the current date and time."""
def __init__(self, dt):
self._dt = dt

def get_current_utc_datetime(self):
return self._dt


class DefaultTimeSource:
"""A time source that uses the standard datetime module."""
def get_current_utc_datetime(self):
return pytz.utc.localize(datetime.utcnow())


class RecurrenceCalculator:
"""A recurrence calculator for Auto Scaling Group scheduled scaling
actions.
"""
def __init__(self, time_source=None):
if not time_source:
self._time_source = DefaultTimeSource()
else:
self._time_source = time_source

def calculate_recurrence(self, scheduled_action, expected_time, timezone):
"""Calculates the correct recurrence expression for the given scheduled
scaling action, expected local time and local timezone.
The recurrence must be defined as a cron expression.
If the action's recurrence is not selective on the hour, or if the
action's next run will occur in more than a day in the future, this
method will return the current recurrence.
Args:
scheduled_action: A dictionary with the scheduled action
definition, as returned by boto3.
expected_time: A string with the local time at which the action is
expected to run.
timezone: The name of the timezone (e.g., 'Europe/Madrid') of the
local time.
Returns:
A string with the appropriate recurrence, formatted as a cron
expression.
Raises:
NotImplementedError: The original recurrence contains anything
other than single hours or minutes (e.g., ranges). These are
currently not supported by this implementation."""

parsed_recurrence = parse_cron_expression(scheduled_action['Recurrence'])

# For the time being, we don't handle cron expressions which specify
# anything but a specific hour and minute (i.e., ranges, multiple
# hours, etc.). This is a feature that will be implemented in the
# future.
if not re.match(r'^\d+$', parsed_recurrence['hour']):
raise NotImplementedError("This script cannot yet handle multiple hours in cron expressions: '{}'".format(scheduled_action['Recurrence']))
if not re.match(r'^\d+$', parsed_recurrence['minute']):
raise NotImplementedError("This script cannot yet handle multiple minutes in cron expressions: '{}'".format(scheduled_action['Recurrence']))

# If the action's cron expression is not selective on the hour, it does not
# make sense to keep going.
if parsed_recurrence['hour'] == '*':
print("Action '{}' has a cron expression ('{}') which is not selective on the hour. Leaving recurrence as is.".format(scheduled_action['ScheduledActionName'], scheduled_action['Recurrence']))
return scheduled_action['Recurrence']

utc_now = self._time_source.get_current_utc_datetime()

# If the action's start date is over a day in the future, skip it. (This
# decision might not belong to this function though.)
if (scheduled_action['StartTime'] - utc_now).days > 1:
print("Action '{}' next run is over a day away. Leaving recurrence as is.".format(scheduled_action['ScheduledActionName']))
return scheduled_action['Recurrence']

# Determine when the action will run next, and compare the time with
# the expected local time at the specified timezone. If they match,
# then we're all good. If they don't, we need to update the recurrence.
#
# Note that we're adding one extra second to the time delta, to account
# for precision errors which might produce incorrect results. (See for
# example when the expected time is 14:00:00 and the delta causes us to
# see 13:59:59.998). This is pretty hacky and I should revisit this,
# for sure.

recurrence = CronTab(scheduled_action['Recurrence'])
utc_now = pytz.utc.localize(datetime.utcnow())
delta = timedelta(seconds = recurrence.next(default_utc=True) + 1)
utc_next_run = utc_now + delta
local_next_run = utc_next_run.astimezone(pytz.timezone(timezone))
local_next_run_time = local_next_run.strftime('%H:%M')

print("This action should run at '{}' local time. The next run will occur at '{}', which is '{}' at specified local timezone '{}'.".format(expected_time, utc_next_run.isoformat(), local_next_run_time, timezone))

if local_next_run_time == expected_time:
print("Times match. Current recurrence is correct.")
return scheduled_action['Recurrence']

print("Times don't match. Current recurrence must be recalculated.")
local_expected_run = local_next_run.replace(hour=parser.parse(expected_time).hour,
minute=parser.parse(expected_time).minute)
utc_expected_run = local_expected_run.astimezone(pytz.timezone('UTC'))

# We should only change the hour and minute parts of the cron
# expression. The rest should be left as it was originally. The reason
# we're changing the minutes too is because some timezones don't have
# whole offsets. E.g., see "Indian Standard Time".
new_recurrence = '{} {} {}'.format(utc_expected_run.minute,
utc_expected_run.hour,
parsed_recurrence['rest'])

return new_recurrence
5 changes: 5 additions & 0 deletions adjust_schedule_function/lib/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def get_tag_by_key(tags, key):
tags = list(filter(lambda t: t['Key'] == key, tags))
if len(tags):
return tags[0]['Value']
return None
4 changes: 4 additions & 0 deletions adjust_schedule_function/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
boto3==1.16.4
crontab==0.22.9
python-dateutil==2.8.1
pytz==2020.1
3 changes: 3 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import sys, os

sys.path.insert(0, os.path.abspath("adjust_schedule_function"))
13 changes: 13 additions & 0 deletions events/event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"version": "0",
"id": "3d956a44-5259-406c-c9e6-ea038c7877e6",
"detail-type": "Scheduled Event",
"source": "aws.events",
"account": "012345678901",
"time": "2020-10-21T17:54:15Z",
"region": "eu-west-1",
"resources": [
"arn:aws:events:eu-west-1:012345678901:rule/scheduled"
],
"detail": {}
}
11 changes: 11 additions & 0 deletions send-event.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

aws events put-events \
--entries '[
{
"EventBusName": "default",
"Source": "scheduled-scaling-adjuster",
"DetailType": "ManualTrigger",
"Detail": "{}"
}
]' | jq .
Loading

0 comments on commit b337932

Please sign in to comment.