+
+ # Get Account ID and Operation Index
+ account_id, op_index = self._get_account_and_op_HA(op_id)
+ db_table_name = self.topic2dbtable_dict[topic]
+
+ # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED)
+ # If the account exist, register the HA task.
+ # Update DB for HA tasks
+ q_filter = {"_id": account_id}
+ update_dict = {
+ "_admin.operations.{}.operationState".format(op_index): operationState,
+ "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
+ "_admin.operations.{}.worker".format(op_index): None,
+ "_admin.current_operation": None,
+ }
+ self.db.set_one(
+ db_table_name,
+ q_filter=q_filter,
+ update_dict=update_dict,
+ fail_on_empty=False,
+ )
+ return
+
+ async def waitfor_related_HA(self, topic, op_type, op_id=None):
+ """
+ Wait for any pending related HA tasks
+ """
+
+ # Backward compatibility
+ if not (
+ self._is_service_type_HA(topic) or self._is_account_type_HA(topic)
+ ) and (op_id is None):
+ return
+
+ # Get DB table name
+ db_table_name = self.topic2dbtable_dict.get(topic)
+
+ # Get instance ID
+ _id = self._get_instance_id_HA(topic, op_type, op_id)
+ _filter = {"_id": _id}
+ db_lcmop = self.db.get_one(db_table_name, _filter, fail_on_empty=False)
+ if not db_lcmop:
+ return
+
+ # Set DB _filter for querying any related process state
+ _filter = self._get_waitfor_filter_HA(db_lcmop, topic, op_type, op_id)
+
+ # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
+ timeout_wait_for_task = (
+ 3600 # Max time (seconds) to wait for a related task to finish
+ )
+ # interval_wait_for_task = 30 # A too long polling interval slows things down considerably
+ interval_wait_for_task = 10 # Interval in seconds for polling related tasks
+ time_left = timeout_wait_for_task
+ old_num_related_tasks = 0
+ while True:
+ # Get related tasks (operations within the same instance as this) which are
+ # still running (operationState='PROCESSING') and which were started before this task.
+ # In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps.
+ db_waitfor_related_task = self.db.get_list(db_table_name, q_filter=_filter)
+ new_num_related_tasks = len(db_waitfor_related_task)
+ # If there are no related tasks, there is nothing to wait for, so return.
+ if not new_num_related_tasks:
+ return
+ # If number of pending related tasks have changed,
+ # update the 'detailed-status' field and log the change.
+ # Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY').
+ if (op_type != "ANY") and (new_num_related_tasks != old_num_related_tasks):
+ step = "Waiting for {} related tasks to be completed.".format(
+ new_num_related_tasks
+ )
+ update_dict = {}
+ q_filter = {"_id": _id}
+ # NS/NSI
+ if self._is_service_type_HA(topic):
+ update_dict = {
+ "detailed-status": step,
+ "queuePosition": new_num_related_tasks,
+ }
+ # VIM/WIM/SDN
+ elif self._is_account_type_HA(topic):
+ _, op_index = self._get_account_and_op_HA(op_id)
+ update_dict = {
+ "_admin.operations.{}.detailed-status".format(op_index): step
+ }
+ self.logger.debug("Task {} operation={} {}".format(topic, _id, step))
+ self.db.set_one(
+ db_table_name,
+ q_filter=q_filter,
+ update_dict=update_dict,
+ fail_on_empty=False,
+ )
+ old_num_related_tasks = new_num_related_tasks
+ time_left -= interval_wait_for_task
+ if time_left < 0:
+ raise LcmException(
+ "Timeout ({}) when waiting for related tasks to be completed".format(
+ timeout_wait_for_task
+ )
+ )
+ await asyncio.sleep(interval_wait_for_task)
+
+ return