Source code for opslib.icsas

"""
IcsAS: Library for Auto Scaling
-------------------------------

+--------------------+------------+--+
| This is the IcsAS common library.  |
+--------------------+------------+--+
"""

from boto.ec2.autoscale import regions
from boto.ec2.autoscale import AutoScaleConnection
from opslib.icsutils.misc import dict_merge
from opslib.icsutils.misc import gen_timestamp
from opslib.icsutils.misc import user_data_decode
from opslib.icsutils.misc import clean_empty_items
from opslib.icsutils.misc import init_botocore_service, operate
from opslib.icsutils.misc import filter_resource_from_json
from opslib.icsutils.misc import keyname_formatd
from opslib.icsutils.misc import fetch_used_params
from opslib.icsexception import IcsASException

import logging
log = logging.getLogger(__name__)


[docs]def get_region(region_name, **kw_params): """ Find and return a :class:`boto.ec2.autoscale.RegionInfo` object given a region name. :type region_name: str :param region_name: The name of the region. :rtype: :class:`boto.ec2.autoscale.RegionInfo` :return: The RegionInfo object for the given region or None if an invalid region name is provided. """ for region in regions(**kw_params): if region.name == region_name: return region return None
[docs]class IcsAS(object): """ ICS Library for AutoScale """
[docs] def __init__(self, region, **kwargs): self.conn = AutoScaleConnection(region=get_region(region), **kwargs)
[docs] def to_list(self, input): """ Validate input, if not list, but string, make it as a list """ if input is None: return input elif isinstance(input, list): return input elif isinstance(input, basestring): return [input] else: raise IcsASException("Need the type '%s' but '%s' found" % ('list', type(input)))
[docs] def get_group_name_from_instance(self, instance_id): """ Get the ASG name from the specific instance id :type instance_id: string :param instance_id: EC2 instance id startwith 'i-xxxxxxx' :rtype: string :return: name of the ASG, this instance belongs to """ instances = self.conn.get_all_autoscaling_instances( instance_ids=self.to_list(instance_id)) if instances: return instances[0].group_name else: return None
[docs] def get_instances_from_group_name(self, name): """ Get the instance from the specific ASG name :type name: string :param name: the specific ASG name :rtype: list :return: a list contains all the instances """ instances = [] for group in self.conn.get_all_groups(names=self.to_list(name)): instances.extend(group.instances) return instances
[docs] def get_group_from_name(self, name): """ Get the ASG from its name :type name: string :param name: the ASG name :rtype: list :return: a list represents the specific ASG(s) """ return self.conn.get_all_groups(names=self.to_list(name))
[docs] def get_launch_config_from_name(self, name): """ Get the Launch Configuration from its name :type name: string :param name: the Launch Configuration name :rtype: list :return: a list represents the specific Launch Configuration(s) """ return self.conn.get_all_launch_configurations( names=self.to_list(name))
[docs] def create_launch_config(self, launch_config): """ Create the Launch Configuration :type launch_config: class :param launch_config: boto launch_config object :rtype: string :return: AWS request Id """ return self.conn.create_launch_configuration(launch_config)
[docs] def delete_launch_config_from_name(self, name): """ Delete the Launch Configuration from its name :type name: string :param name: the name of launch configuration :rtype: string :return: AWS request Id """ log.info("delete the launch configuration:") log.info(">> %s" % name) return self.conn.delete_launch_configuration(name)
[docs] def update_launch_config(self, name, launch_config): """ Update the Launch Configuration for specific ASG :type name: string :param name: the name of Auto-Scaling Group :type launch_config: class :param launch_config: boto launch_config object :rtype: string :return: AWS request Id """ groups = self.get_group_from_name(name) if groups: group = groups[0] else: raise IcsASException("no such Auto-Scaling Group '%s' found" % name) self.create_launch_config(launch_config) old_lc_name = group.launch_config_name new_lc_name = launch_config.name group.__dict__["launch_config_name"] = launch_config.name group.update() if self.get_launch_config_from_name(new_lc_name): group = self.get_group_from_name(name)[0] if group.launch_config_name == new_lc_name: return self.delete_launch_config_from_name(old_lc_name) else: raise IcsASException("failed to update " + "launch config for ASG '%s'" % name) else: raise IcsASException("no such new launch config '%s'" % new_lc_name)
[docs] def suspend_scaling_group(self, name, scaling_processes=None): """ Suspends Auto Scaling processes for an Auto Scaling group. :type name: string :param name: the ASG name :type scaling_processes: string or list :param scaling_processes: scaling process names * Launch * Terminate * HealthCheck * ReplaceUnhealthy * AZRebalance * AlarmNotification * ScheduledActions * AddToLoadBalancer """ if not isinstance(name, basestring): return None group = self.get_group_from_name(self.to_list(name))[0] return group.suspend_processes(self.to_list(scaling_processes))
[docs] def resume_scaling_group(self, name, scaling_processes=None): """ Resumes Auto Scaling processes for an Auto Scaling group. :type name: string :param name: the ASG name :type scaling_processes: string or list :param scaling_processes: scaling process names * Launch * Terminate * HealthCheck * ReplaceUnhealthy * AZRebalance * AlarmNotification * ScheduledActions * AddToLoadBalancer """ if not isinstance(name, basestring): return None group = self.get_group_from_name(self.to_list(name))[0] return group.resume_processes(self.to_list(scaling_processes))
[docs] def terminate_group_instance(self, instance_id, decrement_capacity=True): """ Terminates the specified instance. The desired group size can also be adjusted, if desired. :type instance_id: str :param instance_id: The ID of the instance to be terminated. :type decrement_capability: bool :param decrement_capacity: Whether to decrement the size of the autoscaling group or not. """ return self.conn.terminate_instance( instance_id=instance_id, decrement_capacity=decrement_capacity)
[docs] def update_instance_health(self, instance_id, health_status, grace_period=False): """ Explicitly set the health status of an instance. :type instance_id: str :param instance_id: The identifier of the EC2 instance :type health_status: str :param health_status: The health status of the instance. * Healthy: the instance is healthy and should remain in service. * Unhealthy: the instance is unhealthy. \ Auto Scaling should terminate and replace it. :type grace_period: bool :param grace_period: If True, this call should respect the grace period associated with the group. """ self.conn.set_instance_health(instance_id, health_status, should_respect_grace_period=grace_period)
[docs]class RawAS(object): """ Raw Library for AutoScale, based on Botocore """
[docs] def __init__(self, region): """ Initialize the proper botocore service """ self.name = "autoscaling" self.region = region self.service, self.endpoint = init_botocore_service( self.name, self.region)
[docs] def fetch_all_groups(self): """ Fetch all the Auto-Scaling Groups :rtype: dict :return: JSON object for all the Auto-Scaling Groups """ endpoint = {'endpoint': self.endpoint} cmd = "DescribeAutoScalingGroups" return operate(self.service, cmd, endpoint)
[docs] def find_groups(self, filter={}): """ Find the names of Auto-Scaling Groups in the filters :type filter: dict :param filter: a dictionary to used for resource filtering The format should be consistent with botocore JSON output .. code-block:: javascript { "Tags": [ { "Key": "Owner", "Value": "Production" } ] } :rtype: list :return: a list containing all the names of filtered groups """ names = ["AutoScalingGroupName", "LaunchConfigurationName"] status, raw_data = self.fetch_all_groups() return filter_resource_from_json(names, filter=filter, raw_data=raw_data)
[docs] def handle_response(self, response): """ Handle the botocore response """ if response[0].status_code == 200: return response[1] raise IcsASException("Status Code: %s; Reason: %s" % ( response[0].status_code, response[0].reason))
[docs] def fetch_group(self, name): """ Fetch an existing Auto-Scaling Group :type name: string :param name: auto-scaling group name """ endpoint = {'endpoint': self.endpoint} cmd = "DescribeAutoScalingGroups" key = 'AutoScalingGroupNames' params = {key: [name]} params = keyname_formatd(params) params.update(endpoint) log.info("find the auto-scaling group") log.info(">> %s" % name) try: response = operate(self.service, cmd, params) except Exception, e: raise IcsASException(e) data = self.handle_response(response) groups = data['AutoScalingGroups'] if not groups: raise IcsASException( "auto-scaling group '%s': not found" % name) elif len(groups) > 1: raise IcsASException( "too many auto-scaling groups found: '%s'" % group) log.info("OK") return groups[0]
[docs] def fetch_launch_config(self, name): """ Fetch an existing Launch Configuration :type name: string :param name: launch configuration name """ endpoint = {'endpoint': self.endpoint} cmd = "DescribeLaunchConfigurations" key = 'LaunchConfigurationNames' params = {key: [name]} params = keyname_formatd(params) params.update(endpoint) log.info("find the launch configuration") log.info(">> %s" % name) try: response = operate(self.service, cmd, params) except Exception, e: raise IcsASException(e) data = self.handle_response(response) lcs = data['LaunchConfigurations'] if not lcs: raise IcsASException( "auto-scaling group '%s': not found" % name) elif len(lcs) > 1: raise IcsASException( "too many auto-scaling groups found: '%s'" % lcs) log.info("OK") return lcs[0]
[docs] def create_group(self, group_config): """ Create a new Auto-Scaling Group :type group_config: dict :param group_config: auto-scaling group configuration """ endpoint = {'endpoint': self.endpoint} params = keyname_formatd(group_config) params.update(endpoint) cmd = "CreateAutoScalingGroup" log.info("create the auto-scaling group") log.info(">> %s" % group_config['AutoScalingGroupName']) try: self.handle_response(operate(self.service, cmd, params)) except Exception, e: raise IcsASException(e) log.info("OK")
[docs] def create_launch_config(self, launch_config): """ Create a new Launch Configuration :type launch_config: dict :param launch_config: launch configuration """ endpoint = {'endpoint': self.endpoint} params = keyname_formatd(launch_config) params.update(endpoint) cmd = "CreateLaunchConfiguration" log.info("create the launch configuration") log.info(">> %s" % launch_config['LaunchConfigurationName']) try: self.handle_response(operate(self.service, cmd, params)) except Exception, e: raise IcsASException(e) log.info("OK")
[docs] def delete_group(self, name): """ Delete an existing Auto-Scaling Group :type name: string :param name: auto-scaling group name """ endpoint = {'endpoint': self.endpoint} cmd = "DeleteAutoScalingGroup" key = 'AutoScalingGroupName' params = {key: name} params['ForceDelete'] = True params = keyname_formatd(params) params.update(endpoint) log.info("delete the auto-scaling group") log.info(">> %s" % name) try: self.handle_response(operate(self.service, cmd, params)) except Exception, e: raise IcsASException(e) log.info("OK")
[docs] def delete_launch_config(self, name): """ Delete an existing Launch Configuration :type name: string :param name: launch configuration name """ endpoint = {'endpoint': self.endpoint} cmd = "DeleteLaunchConfiguration" key = 'LaunchConfigurationName' params = {key: name} params = keyname_formatd(params) params.update(endpoint) log.info("delete the launch configuration") log.info(">> %s" % name) try: self.handle_response(operate(self.service, cmd, params)) except Exception, e: raise IcsASException(e) log.info("OK")
[docs] def modify_launch_config(self, launch_config, delimiter='_U_'): """ Modify the Launch Configuration :type launch_config: dict :param launch_config: launch configuration """ lc_name = launch_config['LaunchConfigurationName'] if delimiter in lc_name: new_lc_name = delimiter.join([lc_name.split( delimiter)[0], gen_timestamp()]) else: new_lc_name = delimiter.join([lc_name, gen_timestamp()]) launch_config['LaunchConfigurationName'] = new_lc_name self.create_launch_config(launch_config) return new_lc_name
[docs] def modify_group(self, group_config): """ Modify the Auto-Scaling Group :type group_config: dict :param group_config: auto-scaling group configuration """ endpoint = {'endpoint': self.endpoint} cmd = "UpdateAutoScalingGroup" params = fetch_used_params(self.name, cmd, group_config) params.update(endpoint) try: self.handle_response(operate(self.service, cmd, params)) except Exception, e: raise IcsASException(e) log.info("OK")
[docs] def update_group(self, group_config, launch_config): """ Update the Auto-Scaling Group :type group_config: dict :param group_config: auto-scaling group configuration :type launch_config: dict :param launch_config: launch configuration """ group_name = group_config['AutoScalingGroupName'] try: group_data = clean_empty_items(self.fetch_group(group_name)) except Exception, e: log.error(e) return False lc_name = group_data['LaunchConfigurationName'] try: launch_data = clean_empty_items(self.fetch_launch_config(lc_name)) except Exception, e: log.error(e) return False # FIXME: trick to remove user-data launch_config.pop('UserData') launch_config = dict_merge(launch_data, launch_config) # FIXME: need to refine and remove the unused items user_data = launch_config['UserData'] launch_config['UserData'] = user_data_decode(user_data) if 'CreatedTime' in launch_config: launch_config.pop('CreatedTime') if 'LaunchConfigurationARN' in launch_config: launch_config.pop('LaunchConfigurationARN') if launch_config == launch_data: log.info("no need to update the launch configuration") else: try: new_lc_name = self.modify_launch_config(launch_config) group_config['LaunchConfigurationName'] = new_lc_name except Exception, e: log.error(e) return False log.info("attach the new launch configuration to") log.info(">> %s" % group_name) try: self.modify_group(group_config) except Exception, e: log.error(e) try: self.delete_launch_config(new_lc_name) except Exception, e: log.error(e) return False try: self.delete_launch_config(lc_name) except Exception, e: log.error(e) return False else: return True
[docs] def launch_group(self, group_config, launch_config): """ Launch a new Auto-Scaling Group :type group_config: dict :param group_config: auto-scaling group configuration :type launch_config: dict :param launch_config: launch configuration """ result = 0 try: self.create_launch_config(launch_config) except Exception, e: log.error(e) else: result += 1 try: self.create_group(group_config) except Exception, e: log.error(e) else: result += 1 if result == 2: return True else: return False
[docs] def kill_group(self, group_name, force=False, force_cmd="-force"): """ Delete a new Auto-Scaling Group :type name: string :param name: launch configuration name :type force: boolean :param force: whether to delete the auto-scaling group forcely """ try: group_data = self.fetch_group(group_name) except Exception, e: log.error(e) return False lc_name = group_data['LaunchConfigurationName'] if group_data['Instances'] and not force: error_msg = "auto-scaling group '%s' " % group_name + \ "has running instances, " + \ "if you have to kill it forcely, " + \ "please use '%s'" % force_cmd log.error(error_msg) return False result = 0 try: self.delete_group(group_name) except Exception, e: log.error(e) else: result += 1 try: self.delete_launch_config(lc_name) except Exception, e: log.error(e) else: result += 1 if result == 2: return True else: return False
[docs] def new_scaling_policy(self, scaling_policy, metric_alarm): """ Create a new Scaling Policy :type scaling_policy: dict :param scaling_policy: scaling policy configuration :type metric_alarm: dict :param metric_alarm: metric alarm configuration """ result = 0 endpoint = {'endpoint': self.endpoint} cmd = "PutScalingPolicy" params = fetch_used_params(self.name, cmd, scaling_policy) params.update(endpoint) log.info("create the scaling policy") log.info(">> %s" % scaling_policy["PolicyName"]) try: data = self.handle_response(operate(self.service, cmd, params)) except Exception, e: log.error(e) else: log.info("OK") result += 1 policy_arn = data['PolicyARN'] # FIXME: special here, just for cloudwatch service name = "cloudwatch" service, endpoint = init_botocore_service(name, self.region) endpoint = {'endpoint': endpoint} cmd = "PutMetricAlarm" metric_alarm['AlarmActions'].append(policy_arn) params = fetch_used_params(name, cmd, metric_alarm) params.update(endpoint) log.info("create the associated metric alarm") log.info(">> %s" % metric_alarm["AlarmName"]) try: self.handle_response(operate(service, cmd, params)) except Exception, e: log.error(e) else: log.info("OK") result += 1 if result == 2: return True else: return False
[docs] def delete_scaling_policy(self, scaling_policy, metric_alarm): """ Delete an existing Scaling Policy :type scaling_policy: dict :param scaling_policy: scaling policy configuration :type metric_alarm: dict :param metric_alarm: metric alarm configuration """ result = 0 endpoint = {'endpoint': self.endpoint} cmd = "DeletePolicy" params = fetch_used_params(self.name, cmd, scaling_policy) params.update(endpoint) log.info("delete the scaling policy") log.info(">> %s" % scaling_policy["PolicyName"]) try: self.handle_response(operate(self.service, cmd, params)) except Exception, e: log.error(e) else: log.info("OK") result += 1 # FIXME: special here, just for cloudwatch service name = "cloudwatch" service, endpoint = init_botocore_service(name, self.region) endpoint = {'endpoint': endpoint} cmd = "DeleteAlarms" metric_alarm['AlarmNames'] = [metric_alarm['AlarmName']] params = fetch_used_params(name, cmd, metric_alarm) params.update(endpoint) log.info("delete the associated metric alarm") log.info(">> %s" % metric_alarm["AlarmName"]) try: self.handle_response(operate(service, cmd, params)) except Exception, e: log.error(e) else: log.info("OK") result += 1 if result == 2: return True else: return False # vim: tabstop=4 shiftwidth=4 softtabstop=4
Read the Docs v: latest
Versions
latest
Downloads
PDF
HTML
Epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.