# under the License.
##
+import asyncio
from collections import OrderedDict
# from osm_common.dbbase import DbException
return tuple(filled)
-class TaskRegistry:
+# LcmBase must be listed before TaskRegistry, as it is a dependency.
+class LcmBase:
+
+ def __init__(self, db, msg, fs, logger):
+ """
+
+ :param db: database connection
+ """
+ self.db = db
+ self.msg = msg
+ self.fs = fs
+ self.logger = logger
+
+ def update_db_2(self, item, _id, _desc):
+ """
+ Updates database with _desc information. If success _desc is cleared
+ :param item:
+ :param _id:
+ :param _desc: dictionary with the content to update. Keys are dot separated keys for
+ :return: None. Exception is raised on error
+ """
+ if not _desc:
+ return
+ self.db.set_one(item, {"_id": _id}, _desc)
+ _desc.clear()
+ # except DbException as e:
+ # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
+
+
+class TaskRegistry(LcmBase):
"""
Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
etc. It stores a four level dict
Second level is the _id
Third level is the operation id
Fourth level is a descriptive name, the value is the task class
+
+ The HA (High-Availability) methods are used when more than one LCM instance is running.
+ To register the current task in the external DB, use LcmBase as base class, to be able
+ to reuse LcmBase.update_db_2()
+ The DB registry uses the following fields to distinguish a task:
+ - op_type: operation type ("nslcmops" or "nsilcmops")
+ - op_id: operation ID
+ - worker: the worker ID for this process
"""
- def __init__(self):
+ instance_id_label_dict = {'ns': 'nsInstanceId', 'nsi': 'netsliceInstanceId'}
+
+ def __init__(self, worker_id=None, db=None, logger=None):
self.task_registry = {
"ns": {},
"nsi": {},
"wim_account": {},
"sdn": {},
}
+ self.worker_id = worker_id
+ self.db = db
+ self.logger = logger
def register(self, topic, _id, op_id, task_name, task):
"""
def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
"""
- Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
+ Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
this is cancelled, and the same with task_name
"""
if not self.task_registry[topic].get(_id):
# if result:
# self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
-
-class LcmBase:
-
- def __init__(self, db, msg, fs, logger):
+ def lock_HA(self, topic, op_type, op_id):
"""
-
- :param db: database connection
+ Lock an task, if possible, to indicate to the HA system that
+ the task will be executed in this LCM instance.
+ :param topic: Can be "ns", "nsi"
+ :param op_type: Operation type, can be "nslcmops", "nsilcmops"
+ :param op_id: id of the operation of the related item
+ :return:
+ True=lock successful => execute the task (not registered by any other LCM instance)
+ False=lock failed => do NOT execute the task (already registered by another LCM instance)
"""
- self.db = db
- self.msg = msg
- self.fs = fs
- self.logger = logger
- def update_db_2(self, item, _id, _desc):
+ db_lock_task = self.db.set_one(op_type,
+ q_filter={'_id': op_id, '_admin.worker': None},
+ update_dict={'_admin.worker': self.worker_id},
+ fail_on_empty=False)
+
+ if db_lock_task is None:
+ self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id))
+ return False
+ else:
+ return True
+
+ async def waitfor_related_HA(self, topic, op_type, op_id=None):
"""
- Updates database with _desc information. If success _desc is cleared
- :param item:
- :param _id:
- :param _desc: dictionary with the content to update. Keys are dot separated keys for
- :return: None. Exception is raised on error
+ Wait for any pending related HA tasks
"""
- if not _desc:
+
+ # InstanceId label
+ instance_id_label = self.instance_id_label_dict.get(topic)
+
+ # Get 'startTime' timestamp for this operation
+ step = "Getting timestamp for op_id={} from db".format(op_id)
+ db_lcmop = self.db.get_one(op_type,
+ {"_id": op_id},
+ fail_on_empty=False)
+ if not db_lcmop:
return
- self.db.set_one(item, {"_id": _id}, _desc)
- _desc.clear()
- # except DbException as e:
- # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
+ starttime_this_op = db_lcmop.get("startTime")
+ instance_id = db_lcmop.get(instance_id_label)
+
+ # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
+ timeout_wait_for_task = 3600 # Max time (seconds) to wait for a related task to finish
+ # interval_wait_for_task = 30 # A too long polling interval slows things down considerably
+ interval_wait_for_task = 10 # Interval in seconds for polling related tasks
+ time_left = timeout_wait_for_task
+ old_num_related_tasks = 0
+ while True:
+ # Get related tasks (operations within the same NS or NSI instance) which are
+ # still running (operationState='PROCESSING') and which were started before this task.
+ _filter = {instance_id_label: instance_id,
+ 'operationState': 'PROCESSING',
+ 'startTime.lt': starttime_this_op}
+ db_waitfor_related_task = self.db.get_list(op_type,
+ q_filter=_filter)
+ new_num_related_tasks = len(db_waitfor_related_task)
+ if not new_num_related_tasks:
+ # There are no related tasks, no need to wait, so return.
+ return
+ # If number of pending related tasks have changed,
+ # update the 'detailed-status' field and log the change.
+ if new_num_related_tasks != old_num_related_tasks:
+ db_lcmops_update = {}
+ step = db_lcmops_update["detailed-status"] = \
+ "Waiting for {} related tasks to be completed.".format(
+ new_num_related_tasks)
+ self.logger.debug("Task {} operation={} {}".format(topic, op_id, step))
+ self.update_db_2(op_type, op_id, db_lcmops_update)
+ old_num_related_tasks = new_num_related_tasks
+ time_left -= interval_wait_for_task
+ if time_left < 0:
+ raise LcmException(
+ "Timeout ({}) when waiting for related tasks to be completed".format(
+ timeout_wait_for_task))
+ await asyncio.sleep(interval_wait_for_task)
+
+ return