X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwautoscaler%2Frift%2Ftasklets%2Frwautoscaler%2Frwautoscaler.py;h=1741a58469b81059295f3f456b21d72d8fd54e15;hb=f314b4af9744068a7ed7a6a6314220c3aa857523;hp=affa579b398e5425d4020930433e149c8838fbd5;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwautoscaler/rift/tasklets/rwautoscaler/rwautoscaler.py b/rwlaunchpad/plugins/rwautoscaler/rift/tasklets/rwautoscaler/rwautoscaler.py index affa579b..1741a584 100644 --- a/rwlaunchpad/plugins/rwautoscaler/rift/tasklets/rwautoscaler/rwautoscaler.py +++ b/rwlaunchpad/plugins/rwautoscaler/rift/tasklets/rwautoscaler/rwautoscaler.py @@ -1,6 +1,6 @@ """ -# -# Copyright 2016 RIFT.IO Inc +# +# Copyright 2016-2017 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -39,97 +39,46 @@ from gi.repository import ( import rift.mano.cloud import rift.mano.dts as subscriber import rift.tasklets +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, + ) +class AutoScalerProject(ManoProject, engine.ScalingPolicy.Delegate): + def __init__(self, name, tasklet, **kw): + super(AutoScalerProject, self).__init__(tasklet.log, name) + self.update(tasklet) -class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate): - """The main task of this Tasklet is to listen for NSR changes and once the - NSR is configured, ScalingPolicy is created. - """ - def __init__(self, *args, **kwargs): - - try: - super().__init__(*args, **kwargs) - self.store = None - self.monparam_store = None - - self.nsr_sub = None - self.nsr_monp_subscribers = {} - self.instance_id_store = collections.defaultdict(list) - - except Exception as e: - self.log.exception(e) - - def start(self): - super().start() - - self.log.debug("Registering with dts") - - self.dts = rift.tasklets.DTS( - self.tasklet_info, - RwLaunchpadYang.get_schema(), - self.loop, - self.on_dts_state_change - ) + self.store = None + self.monparam_store = None + self.nsr_sub = None + self.nsr_monp_subscribers = {} + self.instance_id_store = collections.defaultdict(list) - self.store = subscriber.SubscriberStore.from_tasklet(self) - self.nsr_sub = subscriber.NsrCatalogSubscriber(self.log, self.dts, self.loop, self.handle_nsr) + self.store = subscriber.SubscriberStore.from_project(self) + self.nsr_sub = subscriber.NsrCatalogSubscriber(self.log, self.dts, self.loop, + self, self.handle_nsr) - self.log.debug("Created DTS Api GI Object: %s", self.dts) + def deregister(self): + self.log.debug("De-register project {}".format(self.name)) + self.nsr_sub.deregister() + self.store.deregister() - def stop(self): - try: - self.dts.deinit() - except Exception as e: - self.log.exception(e) @asyncio.coroutine - def init(self): + def register (self): self.log.debug("creating vnfr subscriber") yield from self.store.register() yield from self.nsr_sub.register() - @asyncio.coroutine - def run(self): - pass - - @asyncio.coroutine - def on_dts_state_change(self, state): - """Handle DTS state change - - Take action according to current DTS state to transition application - into the corresponding application state - - Arguments - state - current dts state - - """ - switch = { - rwdts.State.INIT: rwdts.State.REGN_COMPLETE, - rwdts.State.CONFIG: rwdts.State.RUN, - } - - handlers = { - rwdts.State.INIT: self.init, - rwdts.State.RUN: self.run, - } - - # Transition application to next state - handler = handlers.get(state, None) - if handler is not None: - yield from handler() - - # Transition dts to next state - next_state = switch.get(state, None) - if next_state is not None: - self.dts.handle.set_state(next_state) - - def scale_in(self, scaling_group_name, nsr_id): + def scale_in(self, scaling_group_name, nsr_id, instance_id): """Delegate callback Args: scaling_group_name (str): Scaling group name to be scaled in nsr_id (str): NSR id + instance_id (str): Instance id of the scaling group """ self.log.info("Sending a scaling-in request for {} in NSR: {}".format( @@ -138,12 +87,14 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate): @asyncio.coroutine def _scale_in(): - instance_id = self.instance_id_store[(scaling_group_name, nsr_id)].pop() + # Purposely ignore passed instance_id + instance_id_ = self.instance_id_store[(scaling_group_name, nsr_id)].pop() # Trigger an rpc rpc_ip = NsrYang.YangInput_Nsr_ExecScaleIn.from_dict({ + 'project_name': self.name, 'nsr_id_ref': nsr_id, - 'instance_id': instance_id, + 'instance_id': instance_id_, 'scaling_group_name_ref': scaling_group_name}) rpc_out = yield from self.dts.query_rpc( @@ -151,7 +102,9 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate): 0, rpc_ip) - self.loop.create_task(_scale_in()) + # Check for existing scaled-out VNFs if any. + if len(self.instance_id_store): + self.loop.create_task(_scale_in()) def scale_out(self, scaling_group_name, nsr_id): """Delegate callback for scale out requests @@ -168,6 +121,7 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate): def _scale_out(): # Trigger an rpc rpc_ip = NsrYang.YangInput_Nsr_ExecScaleOut.from_dict({ + 'project_name': self.name, 'nsr_id_ref': nsr_id , 'scaling_group_name_ref': scaling_group_name}) @@ -191,7 +145,7 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate): NS that moves to config state. Args: - nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): Ns Opdata + nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): Ns Opdata action (rwdts.QueryAction): Action type of the change. """ def nsr_create(): @@ -199,12 +153,15 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate): nsr_id = nsr.ns_instance_config_ref self.nsr_monp_subscribers[nsr_id] = [] nsd = self.store.get_nsd(nsr.nsd_ref) + self.log.debug ("Creating a scaling policy monitor for NSR: {}".format( + nsr_id)) + @asyncio.coroutine def task(): for scaling_group in nsd.scaling_group_descriptor: for policy_cfg in scaling_group.scaling_policy: policy = engine.ScalingPolicy( - self.log, self.dts, self.loop, + self.log, self.dts, self.loop, self, nsr.ns_instance_config_ref, nsr.nsd_ref, scaling_group.name, @@ -213,6 +170,9 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate): delegate=self) self.nsr_monp_subscribers[nsr_id].append(policy) yield from policy.register() + self.log.debug ("Started a scaling policy monitor for NSR: {}".format( + nsr_id)) + self.loop.create_task(task()) @@ -223,8 +183,90 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate): for policy in policies: policy.deregister() del self.nsr_monp_subscribers[nsr.ns_instance_config_ref] + self.log.debug ("Deleted the scaling policy monitor for NSD: {}".format( + nsr.ns_instance_config_ref)) + if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]: nsr_create() elif action == rwdts.QueryAction.DELETE: nsr_delete() + + +class AutoScalerTasklet(rift.tasklets.Tasklet): + """The main task of this Tasklet is to listen for NSR changes and once the + NSR is configured, ScalingPolicy is created. + """ + def __init__(self, *args, **kwargs): + + try: + super().__init__(*args, **kwargs) + self.rwlog.set_category("rw-mano-log") + + self._project_handler = None + self.projects = {} + + except Exception as e: + self.log.exception(e) + + def start(self): + super().start() + + self.log.debug("Registering with dts") + + self.dts = rift.tasklets.DTS( + self.tasklet_info, + RwLaunchpadYang.get_schema(), + self.loop, + self.on_dts_state_change + ) + + self.log.debug("Created DTS Api GI Object: %s", self.dts) + + def stop(self): + try: + self.dts.deinit() + except Exception as e: + self.log.exception(e) + + @asyncio.coroutine + def init(self): + self.log.debug("creating project handler") + self.project_handler = ProjectHandler(self, AutoScalerProject) + self.project_handler.register() + + @asyncio.coroutine + def run(self): + pass + + @asyncio.coroutine + def on_dts_state_change(self, state): + """Handle DTS state change + + Take action according to current DTS state to transition application + into the corresponding application state + + Arguments + state - current dts state + + """ + switch = { + rwdts.State.INIT: rwdts.State.REGN_COMPLETE, + rwdts.State.CONFIG: rwdts.State.RUN, + } + + handlers = { + rwdts.State.INIT: self.init, + rwdts.State.RUN: self.run, + } + + # Transition application to next state + handler = handlers.get(state, None) + if handler is not None: + yield from handler() + + # Transition dts to next state + next_state = switch.get(state, None) + if next_state is not None: + self.dts.handle.set_state(next_state) +