From d124bfe3d2ac63a95fd2af44db3184985ab74d75 Mon Sep 17 00:00:00 2001 From: kuuse Date: Tue, 18 Jun 2019 12:09:24 +0200 Subject: [PATCH] Prepare LCM tasks for HA Change-Id: I6146d5903e95d6d710d349a08fcf5ae03127dfed Signed-off-by: kuuse --- osm_lcm/lcm.py | 5 +- osm_lcm/lcm_utils.py | 141 +++++++++++++++++++++++++++++++++++-------- osm_lcm/netslice.py | 68 ++++++++++----------- osm_lcm/ns.py | 67 ++++++++++---------- 4 files changed, 187 insertions(+), 94 deletions(-) diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 8f66055..27a44cb 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -71,8 +71,6 @@ class Lcm: self.consecutive_errors = 0 self.first_start = False - # contains created tasks/futures to be able to cancel - self.lcm_tasks = TaskRegistry() # logging self.logger = logging.getLogger('lcm') # get id @@ -174,6 +172,9 @@ class Lcm: self.logger.critical(str(e), exc_info=True) raise LcmException(str(e)) + # contains created tasks/futures to be able to cancel + self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger) + self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop) self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop) diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index 01701a5..b4d0455 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -16,6 +16,7 @@ # under the License. ## +import asyncio from collections import OrderedDict # from osm_common.dbbase import DbException @@ -45,7 +46,36 @@ def versiontuple(v): return tuple(filled) -class TaskRegistry: +# LcmBase must be listed before TaskRegistry, as it is a dependency. +class LcmBase: + + def __init__(self, db, msg, fs, logger): + """ + + :param db: database connection + """ + self.db = db + self.msg = msg + self.fs = fs + self.logger = logger + + def update_db_2(self, item, _id, _desc): + """ + Updates database with _desc information. If success _desc is cleared + :param item: + :param _id: + :param _desc: dictionary with the content to update. Keys are dot separated keys for + :return: None. Exception is raised on error + """ + if not _desc: + return + self.db.set_one(item, {"_id": _id}, _desc) + _desc.clear() + # except DbException as e: + # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e)) + + +class TaskRegistry(LcmBase): """ Implements a registry of task needed for later cancelation, look for related tasks that must be completed before etc. It stores a four level dict @@ -53,9 +83,19 @@ class TaskRegistry: Second level is the _id Third level is the operation id Fourth level is a descriptive name, the value is the task class + + The HA (High-Availability) methods are used when more than one LCM instance is running. + To register the current task in the external DB, use LcmBase as base class, to be able + to reuse LcmBase.update_db_2() + The DB registry uses the following fields to distinguish a task: + - op_type: operation type ("nslcmops" or "nsilcmops") + - op_id: operation ID + - worker: the worker ID for this process """ - def __init__(self): + instance_id_label_dict = {'ns': 'nsInstanceId', 'nsi': 'netsliceInstanceId'} + + def __init__(self, worker_id=None, db=None, logger=None): self.task_registry = { "ns": {}, "nsi": {}, @@ -63,6 +103,9 @@ class TaskRegistry: "wim_account": {}, "sdn": {}, } + self.worker_id = worker_id + self.db = db + self.logger = logger def register(self, topic, _id, op_id, task_name, task): """ @@ -128,7 +171,7 @@ class TaskRegistry: def cancel(self, topic, _id, target_op_id=None, target_task_name=None): """ - Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only + Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only this is cancelled, and the same with task_name """ if not self.task_registry[topic].get(_id): @@ -144,30 +187,80 @@ class TaskRegistry: # if result: # self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name)) - -class LcmBase: - - def __init__(self, db, msg, fs, logger): + def lock_HA(self, topic, op_type, op_id): """ - - :param db: database connection + Lock an task, if possible, to indicate to the HA system that + the task will be executed in this LCM instance. + :param topic: Can be "ns", "nsi" + :param op_type: Operation type, can be "nslcmops", "nsilcmops" + :param op_id: id of the operation of the related item + :return: + True=lock successful => execute the task (not registered by any other LCM instance) + False=lock failed => do NOT execute the task (already registered by another LCM instance) """ - self.db = db - self.msg = msg - self.fs = fs - self.logger = logger - def update_db_2(self, item, _id, _desc): + db_lock_task = self.db.set_one(op_type, + q_filter={'_id': op_id, '_admin.worker': None}, + update_dict={'_admin.worker': self.worker_id}, + fail_on_empty=False) + + if db_lock_task is None: + self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id)) + return False + else: + return True + + async def waitfor_related_HA(self, topic, op_type, op_id=None): """ - Updates database with _desc information. If success _desc is cleared - :param item: - :param _id: - :param _desc: dictionary with the content to update. Keys are dot separated keys for - :return: None. Exception is raised on error + Wait for any pending related HA tasks """ - if not _desc: + + # InstanceId label + instance_id_label = self.instance_id_label_dict.get(topic) + + # Get 'startTime' timestamp for this operation + step = "Getting timestamp for op_id={} from db".format(op_id) + db_lcmop = self.db.get_one(op_type, + {"_id": op_id}, + fail_on_empty=False) + if not db_lcmop: return - self.db.set_one(item, {"_id": _id}, _desc) - _desc.clear() - # except DbException as e: - # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e)) + starttime_this_op = db_lcmop.get("startTime") + instance_id = db_lcmop.get(instance_id_label) + + # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable. + timeout_wait_for_task = 3600 # Max time (seconds) to wait for a related task to finish + # interval_wait_for_task = 30 # A too long polling interval slows things down considerably + interval_wait_for_task = 10 # Interval in seconds for polling related tasks + time_left = timeout_wait_for_task + old_num_related_tasks = 0 + while True: + # Get related tasks (operations within the same NS or NSI instance) which are + # still running (operationState='PROCESSING') and which were started before this task. + _filter = {instance_id_label: instance_id, + 'operationState': 'PROCESSING', + 'startTime.lt': starttime_this_op} + db_waitfor_related_task = self.db.get_list(op_type, + q_filter=_filter) + new_num_related_tasks = len(db_waitfor_related_task) + if not new_num_related_tasks: + # There are no related tasks, no need to wait, so return. + return + # If number of pending related tasks have changed, + # update the 'detailed-status' field and log the change. + if new_num_related_tasks != old_num_related_tasks: + db_lcmops_update = {} + step = db_lcmops_update["detailed-status"] = \ + "Waiting for {} related tasks to be completed.".format( + new_num_related_tasks) + self.logger.debug("Task {} operation={} {}".format(topic, op_id, step)) + self.update_db_2(op_type, op_id, db_lcmops_update) + old_num_related_tasks = new_num_related_tasks + time_left -= interval_wait_for_task + if time_left < 0: + raise LcmException( + "Timeout ({}) when waiting for related tasks to be completed".format( + timeout_wait_for_task)) + await asyncio.sleep(interval_wait_for_task) + + return diff --git a/osm_lcm/netslice.py b/osm_lcm/netslice.py index df32473..da428d1 100644 --- a/osm_lcm/netslice.py +++ b/osm_lcm/netslice.py @@ -70,6 +70,12 @@ class NetsliceLcm(LcmBase): raise LcmException("ns_update_nsir: Not found vld={} at RO info".format(vld["id"])) async def instantiate(self, nsir_id, nsilcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA('nsi', 'nsilcmops', nsilcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task netslice={} instantiate={} ".format(nsir_id, nsilcmop_id) self.logger.debug(logging_text + "Enter") # get all needed from database @@ -218,7 +224,7 @@ class NetsliceLcm(LcmBase): db_nsir_update_RO["netslice_scenario_id"] = desc["uuid"] db_nsir_update_RO["vld_id"] = RO_ns_params["name"] db_nsir_update["_admin.deployed.RO"].append(db_nsir_update_RO) - + def overwrite_nsd_params(self, db_nsir, nslcmop): RO_list = [] vld_op_list = [] @@ -261,38 +267,30 @@ class NetsliceLcm(LcmBase): return nsr_id, nslcmop try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA('nsi', 'nsilcmops', nsilcmop_id) + step = "Getting nsir={} from db".format(nsir_id) db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) step = "Getting nsilcmop={} from db".format(nsilcmop_id) db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id}) - # look if previous tasks is in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id) - if task_dependency: - step = db_nsilcmop_update["detailed-status"] = \ - "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) - _, pending = await asyncio.wait(task_dependency, timeout=3600) - if pending: - raise LcmException("Timeout waiting related tasks to be completed") - # Empty list to keep track of network service records status in the netslice nsir_admin = db_nsir_admin = db_nsir.get("_admin") # Slice status Creating db_nsir_update["detailed-status"] = "creating" db_nsir_update["operational-status"] = "init" - self.update_db_2("nsis", nsir_id, db_nsir_update) - + self.update_db_2("nsis", nsir_id, db_nsir_update) + # Creating netslice VLDs networking before NS instantiation db_nsir_update["_admin.deployed.RO"] = db_nsir_admin["deployed"]["RO"] for vld_item in get_iterable(nsir_admin, "netslice-vld"): await netslice_scenario_create(self, vld_item, nsir_id, db_nsir, db_nsir_admin, db_nsir_update) self.update_db_2("nsis", nsir_id, db_nsir_update) - + db_nsir_update["detailed-status"] = "Creating netslice subnets at RO" - self.update_db_2("nsis", nsir_id, db_nsir_update) + self.update_db_2("nsis", nsir_id, db_nsir_update) db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) @@ -300,9 +298,9 @@ class NetsliceLcm(LcmBase): # netslice_scenarios = db_nsir["_admin"]["deployed"]["RO"] # db_nsir_update_RO = deepcopy(netslice_scenarios) # for netslice_scenario in netslice_scenarios: - # await netslice_scenario_check(self, netslice_scenario["netslice_scenario_id"], + # await netslice_scenario_check(self, netslice_scenario["netslice_scenario_id"], # nsir_id, db_nsir_update_RO) - + # db_nsir_update["_admin.deployed.RO"] = db_nsir_update_RO # self.update_db_2("nsis", nsir_id, db_nsir_update) @@ -319,12 +317,12 @@ class NetsliceLcm(LcmBase): step = "Launching ns={} instantiate={} task".format(nsr_id, nslcmop_id) task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) - + # Wait until Network Slice is ready step = nsir_status_detailed = " Waiting nsi ready. nsi_id={}".format(nsir_id) nsrs_detailed_list_old = None self.logger.debug(logging_text + step) - + # TODO: substitute while for await (all task to be done or not) deployment_timeout = 2 * 3600 # Two hours while deployment_timeout > 0: @@ -414,9 +412,15 @@ class NetsliceLcm(LcmBase): self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_instantiate") async def terminate(self, nsir_id, nsilcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA('nsi', 'nsilcmops', nsilcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task nsi={} terminate={} ".format(nsir_id, nsilcmop_id) self.logger.debug(logging_text + "Enter") - exc = None + exc = None db_nsir = None db_nsilcmop = None db_nsir_update = {"_admin.nsilcmop": nsilcmop_id} @@ -427,12 +431,15 @@ class NetsliceLcm(LcmBase): nsilcmop_operation_state = None autoremove = False # autoremove after terminated try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA('nsi', 'nsilcmops', nsilcmop_id) + step = "Getting nsir={} from db".format(nsir_id) db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) nsir_deployed = deepcopy(db_nsir["_admin"].get("deployed")) step = "Getting nsilcmop={} from db".format(nsilcmop_id) db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id}) - + # TODO: Check if makes sense check the nsiState=NOT_INSTANTIATED when terminate # CASE: Instance was terminated but there is a second request to terminate the instance if db_nsir["_admin"]["nsiState"] == "NOT_INSTANTIATED": @@ -444,23 +451,12 @@ class NetsliceLcm(LcmBase): db_nsir_update["detailed-status"] = "Terminating Netslice subnets" self.update_db_2("nsis", nsir_id, db_nsir_update) - # look if previous tasks is in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id) - if task_dependency: - step = db_nsilcmop_update["detailed-status"] = \ - "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) - _, pending = await asyncio.wait(task_dependency, timeout=3600) - if pending: - raise LcmException("Timeout waiting related tasks to be completed") - # Gets the list to keep track of network service records status in the netslice nsir_admin = db_nsir["_admin"] - nsrs_detailed_list = [] + nsrs_detailed_list = [] # Iterate over the network services operation ids to terminate NSs - # TODO: (future improvement) look another way check the tasks instead of keep asking + # TODO: (future improvement) look another way check the tasks instead of keep asking # -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives # steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300) nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids") @@ -489,7 +485,7 @@ class NetsliceLcm(LcmBase): step = nsir_status_detailed = " Waiting nsi terminated. nsi_id={}".format(nsir_id) nsrs_detailed_list_old = None self.logger.debug(logging_text + step) - + termination_timeout = 2 * 3600 # Two hours while termination_timeout > 0: # Check ns termination status diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 4a16835..a04f0a8 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -628,6 +628,12 @@ class NsLcm(LcmBase): raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index)) async def instantiate(self, nsr_id, nslcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") # get all needed from database @@ -644,6 +650,9 @@ class NsLcm(LcmBase): n2vc_key_list = [] # list of public keys to be injected as authorized to VMs exc = None try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) + step = "Getting nslcmop={} from db".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) step = "Getting nsr={} from db".format(nsr_id) @@ -652,17 +661,6 @@ class NsLcm(LcmBase): nsd = db_nsr["nsd"] nsr_name = db_nsr["name"] # TODO short-name?? - # look if previous tasks in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id) - if task_dependency: - step = db_nslcmop_update["detailed-status"] = \ - "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - _, pending = await asyncio.wait(task_dependency, timeout=3600) - if pending: - raise LcmException("Timeout waiting related tasks to be completed") - step = "Getting vnfrs from db" db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) db_vnfds_ref = {} @@ -1650,6 +1648,12 @@ class NsLcm(LcmBase): vnf_index, seq.get("name"), nslcmop_operation_state_detail)) async def terminate(self, nsr_id, nslcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") db_nsr = None @@ -1662,6 +1666,9 @@ class NsLcm(LcmBase): nslcmop_operation_state = None autoremove = False # autoremove after terminated try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id) + step = "Getting nslcmop={} from db".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) step = "Getting nsr={} from db".format(nsr_id) @@ -1998,6 +2005,12 @@ class NsLcm(LcmBase): return "FAILED", str(e) async def action(self, nsr_id, nslcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") # get all needed from database @@ -2009,6 +2022,9 @@ class NsLcm(LcmBase): nslcmop_operation_state_detail = None exc = None try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) + step = "Getting information from database" db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) @@ -2031,17 +2047,6 @@ class NsLcm(LcmBase): step = "Getting nsd from database" db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) - # look if previous tasks in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id) - if task_dependency: - step = db_nslcmop_update["detailed-status"] = \ - "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - _, pending = await asyncio.wait(task_dependency, timeout=3600) - if pending: - raise LcmException("Timeout waiting related tasks to be completed") - # for backward compatibility if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict): nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values()) @@ -2129,6 +2134,12 @@ class NsLcm(LcmBase): return nslcmop_operation_state, nslcmop_operation_state_detail async def scale(self, nsr_id, nslcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") # get all needed from database @@ -2144,16 +2155,8 @@ class NsLcm(LcmBase): old_config_status = "" vnfr_scaled = False try: - # look if previous tasks in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id) - if task_dependency: - step = db_nslcmop_update["detailed-status"] = \ - "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - _, pending = await asyncio.wait(task_dependency, timeout=3600) - if pending: - raise LcmException("Timeout waiting related tasks to be completed") + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) step = "Getting nslcmop from database" self.logger.debug(step + " after having waited for previous tasks to be completed") -- 2.17.1