X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm_utils.py;h=b4d0455ea0528c512f5f969cd14ec44ecb4c0c43;hb=d124bfe3d2ac63a95fd2af44db3184985ab74d75;hp=01701a5aada32053fd8410230c7308643d9c96f3;hpb=29cfa602d40ff4d3e7f611414cf66d4cc62a2be6;p=osm%2FLCM.git diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index 01701a5..b4d0455 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -16,6 +16,7 @@ # under the License. ## +import asyncio from collections import OrderedDict # from osm_common.dbbase import DbException @@ -45,7 +46,36 @@ def versiontuple(v): return tuple(filled) -class TaskRegistry: +# LcmBase must be listed before TaskRegistry, as it is a dependency. +class LcmBase: + + def __init__(self, db, msg, fs, logger): + """ + + :param db: database connection + """ + self.db = db + self.msg = msg + self.fs = fs + self.logger = logger + + def update_db_2(self, item, _id, _desc): + """ + Updates database with _desc information. If success _desc is cleared + :param item: + :param _id: + :param _desc: dictionary with the content to update. Keys are dot separated keys for + :return: None. Exception is raised on error + """ + if not _desc: + return + self.db.set_one(item, {"_id": _id}, _desc) + _desc.clear() + # except DbException as e: + # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e)) + + +class TaskRegistry(LcmBase): """ Implements a registry of task needed for later cancelation, look for related tasks that must be completed before etc. It stores a four level dict @@ -53,9 +83,19 @@ class TaskRegistry: Second level is the _id Third level is the operation id Fourth level is a descriptive name, the value is the task class + + The HA (High-Availability) methods are used when more than one LCM instance is running. + To register the current task in the external DB, use LcmBase as base class, to be able + to reuse LcmBase.update_db_2() + The DB registry uses the following fields to distinguish a task: + - op_type: operation type ("nslcmops" or "nsilcmops") + - op_id: operation ID + - worker: the worker ID for this process """ - def __init__(self): + instance_id_label_dict = {'ns': 'nsInstanceId', 'nsi': 'netsliceInstanceId'} + + def __init__(self, worker_id=None, db=None, logger=None): self.task_registry = { "ns": {}, "nsi": {}, @@ -63,6 +103,9 @@ class TaskRegistry: "wim_account": {}, "sdn": {}, } + self.worker_id = worker_id + self.db = db + self.logger = logger def register(self, topic, _id, op_id, task_name, task): """ @@ -128,7 +171,7 @@ class TaskRegistry: def cancel(self, topic, _id, target_op_id=None, target_task_name=None): """ - Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only + Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only this is cancelled, and the same with task_name """ if not self.task_registry[topic].get(_id): @@ -144,30 +187,80 @@ class TaskRegistry: # if result: # self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name)) - -class LcmBase: - - def __init__(self, db, msg, fs, logger): + def lock_HA(self, topic, op_type, op_id): """ - - :param db: database connection + Lock an task, if possible, to indicate to the HA system that + the task will be executed in this LCM instance. + :param topic: Can be "ns", "nsi" + :param op_type: Operation type, can be "nslcmops", "nsilcmops" + :param op_id: id of the operation of the related item + :return: + True=lock successful => execute the task (not registered by any other LCM instance) + False=lock failed => do NOT execute the task (already registered by another LCM instance) """ - self.db = db - self.msg = msg - self.fs = fs - self.logger = logger - def update_db_2(self, item, _id, _desc): + db_lock_task = self.db.set_one(op_type, + q_filter={'_id': op_id, '_admin.worker': None}, + update_dict={'_admin.worker': self.worker_id}, + fail_on_empty=False) + + if db_lock_task is None: + self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id)) + return False + else: + return True + + async def waitfor_related_HA(self, topic, op_type, op_id=None): """ - Updates database with _desc information. If success _desc is cleared - :param item: - :param _id: - :param _desc: dictionary with the content to update. Keys are dot separated keys for - :return: None. Exception is raised on error + Wait for any pending related HA tasks """ - if not _desc: + + # InstanceId label + instance_id_label = self.instance_id_label_dict.get(topic) + + # Get 'startTime' timestamp for this operation + step = "Getting timestamp for op_id={} from db".format(op_id) + db_lcmop = self.db.get_one(op_type, + {"_id": op_id}, + fail_on_empty=False) + if not db_lcmop: return - self.db.set_one(item, {"_id": _id}, _desc) - _desc.clear() - # except DbException as e: - # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e)) + starttime_this_op = db_lcmop.get("startTime") + instance_id = db_lcmop.get(instance_id_label) + + # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable. + timeout_wait_for_task = 3600 # Max time (seconds) to wait for a related task to finish + # interval_wait_for_task = 30 # A too long polling interval slows things down considerably + interval_wait_for_task = 10 # Interval in seconds for polling related tasks + time_left = timeout_wait_for_task + old_num_related_tasks = 0 + while True: + # Get related tasks (operations within the same NS or NSI instance) which are + # still running (operationState='PROCESSING') and which were started before this task. + _filter = {instance_id_label: instance_id, + 'operationState': 'PROCESSING', + 'startTime.lt': starttime_this_op} + db_waitfor_related_task = self.db.get_list(op_type, + q_filter=_filter) + new_num_related_tasks = len(db_waitfor_related_task) + if not new_num_related_tasks: + # There are no related tasks, no need to wait, so return. + return + # If number of pending related tasks have changed, + # update the 'detailed-status' field and log the change. + if new_num_related_tasks != old_num_related_tasks: + db_lcmops_update = {} + step = db_lcmops_update["detailed-status"] = \ + "Waiting for {} related tasks to be completed.".format( + new_num_related_tasks) + self.logger.debug("Task {} operation={} {}".format(topic, op_id, step)) + self.update_db_2(op_type, op_id, db_lcmops_update) + old_num_related_tasks = new_num_related_tasks + time_left -= interval_wait_for_task + if time_left < 0: + raise LcmException( + "Timeout ({}) when waiting for related tasks to be completed".format( + timeout_wait_for_task)) + await asyncio.sleep(interval_wait_for_task) + + return