+
+ # Get Account ID and Operation Index
+ account_id, op_index = self._get_account_and_op_HA(op_id)
+ db_table_name = self.topic2dbtable_dict[topic]
+
+ # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED)
+ # If the account exist, register the HA task.
+ # Update DB for HA tasks
+ q_filter = {'_id': account_id}
+ update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
+ '_admin.operations.{}.detailed-status'.format(op_index): detailed_status,
+ '_admin.operations.{}.worker'.format(op_index): None,
+ '_admin.current_operation': None}
+ self.db.set_one(db_table_name,
+ q_filter=q_filter,
+ update_dict=update_dict,
+ fail_on_empty=False)
+ return
+
+ async def waitfor_related_HA(self, topic, op_type, op_id=None):
+ """
+ Wait for any pending related HA tasks
+ """
+
+ # Backward compatibility
+ if not (self._is_service_type_HA(topic) or self._is_account_type_HA(topic)) and (op_id is None):
+ return
+
+ # Get DB table name
+ db_table_name = self.topic2dbtable_dict.get(topic)
+
+ # Get instance ID
+ _id = self._get_instance_id_HA(topic, op_type, op_id)
+ _filter = {"_id": _id}
+ db_lcmop = self.db.get_one(db_table_name,
+ _filter,
+ fail_on_empty=False)
+ if not db_lcmop:
+ return
+
+ # Set DB _filter for querying any related process state
+ _filter = self._get_waitfor_filter_HA(db_lcmop, topic, op_type, op_id)
+
+ # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
+ timeout_wait_for_task = 3600 # Max time (seconds) to wait for a related task to finish
+ # interval_wait_for_task = 30 # A too long polling interval slows things down considerably
+ interval_wait_for_task = 10 # Interval in seconds for polling related tasks
+ time_left = timeout_wait_for_task
+ old_num_related_tasks = 0
+ while True:
+ # Get related tasks (operations within the same instance as this) which are
+ # still running (operationState='PROCESSING') and which were started before this task.
+ # In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps.
+ db_waitfor_related_task = self.db.get_list(db_table_name,
+ q_filter=_filter)
+ new_num_related_tasks = len(db_waitfor_related_task)
+ # If there are no related tasks, there is nothing to wait for, so return.
+ if not new_num_related_tasks:
+ return
+ # If number of pending related tasks have changed,
+ # update the 'detailed-status' field and log the change.
+ # Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY').
+ if (op_type != 'ANY') and (new_num_related_tasks != old_num_related_tasks):
+ step = "Waiting for {} related tasks to be completed.".format(new_num_related_tasks)
+ update_dict = {}
+ q_filter = {'_id': _id}
+ # NS/NSI
+ if self._is_service_type_HA(topic):
+ update_dict = {'detailed-status': step, 'queuePosition': new_num_related_tasks}
+ # VIM/WIM/SDN
+ elif self._is_account_type_HA(topic):
+ _, op_index = self._get_account_and_op_HA(op_id)
+ update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): step}
+ self.logger.debug("Task {} operation={} {}".format(topic, _id, step))
+ self.db.set_one(db_table_name,
+ q_filter=q_filter,
+ update_dict=update_dict,
+ fail_on_empty=False)
+ old_num_related_tasks = new_num_related_tasks
+ time_left -= interval_wait_for_task
+ if time_left < 0:
+ raise LcmException(
+ "Timeout ({}) when waiting for related tasks to be completed".format(
+ timeout_wait_for_task))
+ await asyncio.sleep(interval_wait_for_task)
+
+ return