X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=osm_lcm%2Flcm_utils.py;h=bb53d7c2db1c6750854fc492afe024c81de73c8e;hb=6a470c6529293da9a67fb366713f078a9fa70541;hp=b4d0455ea0528c512f5f969cd14ec44ecb4c0c43;hpb=2d9f6f537a05514b93dea9fd54a8001a17afcc53;p=osm%2FLCM.git diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index b4d0455..bb53d7c 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -93,7 +93,22 @@ class TaskRegistry(LcmBase): - worker: the worker ID for this process """ - instance_id_label_dict = {'ns': 'nsInstanceId', 'nsi': 'netsliceInstanceId'} + # NS/NSI: "services" VIM/WIM/SDN: "accounts" + topic_service_list = ['ns', 'nsi'] + topic_account_list = ['vim', 'wim', 'sdn'] + + # Map topic to InstanceID + topic2instid_dict = { + 'ns': 'nsInstanceId', + 'nsi': 'netsliceInstanceId'} + + # Map topic to DB table name + topic2dbtable_dict = { + 'ns': 'nslcmops', + 'nsi': 'nsilcmops', + 'vim': 'vim_accounts', + 'wim': 'wim_accounts', + 'sdn': 'sdns'} def __init__(self, worker_id=None, db=None, logger=None): self.task_registry = { @@ -187,46 +202,184 @@ class TaskRegistry(LcmBase): # if result: # self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name)) + # Is topic NS/NSI? + def _is_service_type_HA(self, topic): + return topic in self.topic_service_list + + # Is topic VIM/WIM/SDN? + def _is_account_type_HA(self, topic): + return topic in self.topic_account_list + + # Input: op_id, example: 'abc123def:3' Output: account_id='abc123def', op_index=3 + def _get_account_and_op_HA(self, op_id): + if not op_id: + return (None, None) + account_id, _, op_index = op_id.rpartition(':') + if not account_id: + return (None, None) + if not op_index.isdigit(): + return (None, None) + return account_id, op_index + + # Get '_id' for any topic and operation + def _get_instance_id_HA(self, topic, op_type, op_id): + _id = None + # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id' + if op_type == 'ANY': + _id = op_id + # NS/NSI: Use op_id as '_id' + elif self._is_service_type_HA(topic): + _id = op_id + # VIM/SDN/WIM: Split op_id to get Account ID and Operation Index, use Account ID as '_id' + elif self._is_account_type_HA(topic): + _id, _ = self._get_account_and_op_HA(op_id) + return _id + + # Set DB _filter for querying any related process state + def _get_waitfor_filter_HA(self, db_lcmop, topic, op_type, op_id): + _filter = {} + # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id' + # In this special case, the timestamp is ignored + if op_type == 'ANY': + _filter = {'operationState': 'PROCESSING'} + # Otherwise, get 'startTime' timestamp for this operation + else: + # NS/NSI + if self._is_service_type_HA(topic): + starttime_this_op = db_lcmop.get("startTime") + instance_id_label = self.topic2instid_dict.get(topic) + instance_id = db_lcmop.get(instance_id_label) + _filter = {instance_id_label: instance_id, + 'operationState': 'PROCESSING', + 'startTime.lt': starttime_this_op} + # VIM/WIM/SDN + elif self._is_account_type_HA(topic): + _, op_index = self._get_account_and_op_HA(op_id) + _ops = db_lcmop['_admin']['operations'] + _this_op = _ops[int(op_index)] + starttime_this_op = _this_op.get('startTime', None) + _filter = {'operationState': 'PROCESSING', + 'startTime.lt': starttime_this_op} + return _filter + + # Get DB params for any topic and operation + def _get_dbparams_for_lock_HA(self, topic, op_type, op_id): + q_filter = {} + update_dict = {} + # NS/NSI + if self._is_service_type_HA(topic): + q_filter = {'_id': op_id, '_admin.worker': None} + update_dict = {'_admin.worker': self.worker_id} + # VIM/WIM/SDN + elif self._is_account_type_HA(topic): + account_id, op_index = self._get_account_and_op_HA(op_id) + if not account_id: + return None, None + if op_type == 'create': + # Creating a VIM/WIM/SDN account implies setting '_admin.current_operation' = 0 + op_index = 0 + q_filter = {'_id': account_id, "_admin.operations.{}.worker".format(op_index): None} + update_dict = {'_admin.operations.{}.worker'.format(op_index): self.worker_id, + '_admin.current_operation': op_index} + return q_filter, update_dict + def lock_HA(self, topic, op_type, op_id): """ - Lock an task, if possible, to indicate to the HA system that + Lock a task, if possible, to indicate to the HA system that the task will be executed in this LCM instance. - :param topic: Can be "ns", "nsi" - :param op_type: Operation type, can be "nslcmops", "nsilcmops" - :param op_id: id of the operation of the related item + :param topic: Can be "ns", "nsi", "vim", "wim", or "sdn" + :param op_type: Operation type, can be "nslcmops", "nsilcmops", "create", "edit", "delete" + :param op_id: NS, NSI: Operation ID VIM,WIM,SDN: Account ID + ':' + Operation Index :return: - True=lock successful => execute the task (not registered by any other LCM instance) + True=lock was successful => execute the task (not registered by any other LCM instance) False=lock failed => do NOT execute the task (already registered by another LCM instance) + + HA tasks and backward compatibility: + If topic is "account type" (VIM/WIM/SDN) and op_id is None, 'op_id' was not provided by NBI. + This means that the running NBI instance does not support HA. + In such a case this method should always return True, to always execute + the task in this instance of LCM, without querying the DB. """ - db_lock_task = self.db.set_one(op_type, - q_filter={'_id': op_id, '_admin.worker': None}, - update_dict={'_admin.worker': self.worker_id}, - fail_on_empty=False) + # Backward compatibility for VIM/WIM/SDN without op_id + if self._is_account_type_HA(topic) and op_id is None: + return True + # Try to lock this task + db_table_name = self.topic2dbtable_dict.get(topic) + q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id) + db_lock_task = self.db.set_one(db_table_name, + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False) if db_lock_task is None: self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id)) return False else: + # Set 'detailed-status' to 'In progress' for VIM/WIM/SDN operations + if self._is_account_type_HA(topic): + detailed_status = 'In progress' + account_id, op_index = self._get_account_and_op_HA(op_id) + q_filter = {'_id': account_id} + update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): detailed_status} + self.db.set_one(db_table_name, + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False) return True + def register_HA(self, topic, op_type, op_id, operationState, detailed_status): + """ + Register a task, done when finished a VIM/WIM/SDN 'create' operation. + :param topic: Can be "vim", "wim", or "sdn" + :param op_type: Operation type, can be "create", "edit", "delete" + :param op_id: Account ID + ':' + Operation Index + :return: nothing + """ + + # Backward compatibility + if not self._is_account_type_HA(topic) or (self._is_account_type_HA(topic) and op_id is None): + return + + # Get Account ID and Operation Index + account_id, op_index = self._get_account_and_op_HA(op_id) + db_table_name = self.topic2dbtable_dict.get(topic) + + # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED) + # If the account exist, register the HA task. + # Update DB for HA tasks + q_filter = {'_id': account_id} + update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState, + '_admin.operations.{}.detailed-status'.format(op_index): detailed_status} + self.db.set_one(db_table_name, + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False) + return + async def waitfor_related_HA(self, topic, op_type, op_id=None): """ Wait for any pending related HA tasks """ - # InstanceId label - instance_id_label = self.instance_id_label_dict.get(topic) + # Backward compatibility + if not (self._is_service_type_HA(topic) or self._is_account_type_HA(topic)) and (op_id is None): + return + + # Get DB table name + db_table_name = self.topic2dbtable_dict.get(topic) - # Get 'startTime' timestamp for this operation - step = "Getting timestamp for op_id={} from db".format(op_id) - db_lcmop = self.db.get_one(op_type, - {"_id": op_id}, + # Get instance ID + _id = self._get_instance_id_HA(topic, op_type, op_id) + _filter = {"_id": _id} + db_lcmop = self.db.get_one(db_table_name, + _filter, fail_on_empty=False) if not db_lcmop: return - starttime_this_op = db_lcmop.get("startTime") - instance_id = db_lcmop.get(instance_id_label) + + # Set DB _filter for querying any related process state + _filter = self._get_waitfor_filter_HA(db_lcmop, topic, op_type, op_id) # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable. timeout_wait_for_task = 3600 # Max time (seconds) to wait for a related task to finish @@ -235,26 +388,34 @@ class TaskRegistry(LcmBase): time_left = timeout_wait_for_task old_num_related_tasks = 0 while True: - # Get related tasks (operations within the same NS or NSI instance) which are + # Get related tasks (operations within the same instance as this) which are # still running (operationState='PROCESSING') and which were started before this task. - _filter = {instance_id_label: instance_id, - 'operationState': 'PROCESSING', - 'startTime.lt': starttime_this_op} - db_waitfor_related_task = self.db.get_list(op_type, + # In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps. + db_waitfor_related_task = self.db.get_list(db_table_name, q_filter=_filter) new_num_related_tasks = len(db_waitfor_related_task) + # If there are no related tasks, there is nothing to wait for, so return. if not new_num_related_tasks: - # There are no related tasks, no need to wait, so return. return # If number of pending related tasks have changed, # update the 'detailed-status' field and log the change. - if new_num_related_tasks != old_num_related_tasks: - db_lcmops_update = {} - step = db_lcmops_update["detailed-status"] = \ - "Waiting for {} related tasks to be completed.".format( - new_num_related_tasks) - self.logger.debug("Task {} operation={} {}".format(topic, op_id, step)) - self.update_db_2(op_type, op_id, db_lcmops_update) + # Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY'). + if (op_type != 'ANY') and (new_num_related_tasks != old_num_related_tasks): + step = "Waiting for {} related tasks to be completed.".format(new_num_related_tasks) + update_dict = {} + q_filter = {'_id': _id} + # NS/NSI + if self._is_service_type_HA(topic): + update_dict = {'detailed-status': step} + # VIM/WIM/SDN + elif self._is_account_type_HA(topic): + _, op_index = self._get_account_and_op_HA(op_id) + update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): step} + self.logger.debug("Task {} operation={} {}".format(topic, _id, step)) + self.db.set_one(db_table_name, + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False) old_num_related_tasks = new_num_related_tasks time_left -= interval_wait_for_task if time_left < 0: