From fa076c3067d3fffe36c0b5dcf00327523b400f81 Mon Sep 17 00:00:00 2001 From: tierno Date: Thu, 13 Aug 2020 14:25:47 +0000 Subject: [PATCH] fix 1192: Paralell k8scluster init for helm and juju. Added a timeout of 10 minutes Change-Id: I340d95ee390207c262b8f921b566396b93aa79c7 Signed-off-by: tierno --- osm_lcm/lcm_utils.py | 20 +-- osm_lcm/vim_sdn.py | 311 +++++++++++++++++++++---------------------- 2 files changed, 165 insertions(+), 166 deletions(-) diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index 6895015..dcea3c0 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -265,12 +265,10 @@ class TaskRegistry(LcmBase): # Input: op_id, example: 'abc123def:3' Output: account_id='abc123def', op_index=3 def _get_account_and_op_HA(self, op_id): if not op_id: - return (None, None) + return None, None account_id, _, op_index = op_id.rpartition(':') - if not account_id: - return (None, None) - if not op_index.isdigit(): - return (None, None) + if not account_id or not op_index.isdigit(): + return None, None return account_id, op_index # Get '_id' for any topic and operation @@ -361,7 +359,7 @@ class TaskRegistry(LcmBase): return True # Try to lock this task - db_table_name = self.topic2dbtable_dict.get(topic) + db_table_name = self.topic2dbtable_dict[topic] q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id) db_lock_task = self.db.set_one(db_table_name, q_filter=q_filter, @@ -383,7 +381,7 @@ class TaskRegistry(LcmBase): fail_on_empty=False) return True - def register_HA(self, topic, op_type, op_id, operationState, detailed_status): + def unlock_HA(self, topic, op_type, op_id, operationState, detailed_status): """ Register a task, done when finished a VIM/WIM/SDN 'create' operation. :param topic: Can be "vim", "wim", or "sdn" @@ -393,19 +391,21 @@ class TaskRegistry(LcmBase): """ # Backward compatibility - if not self._is_account_type_HA(topic) or (self._is_account_type_HA(topic) and op_id is None): + if not self._is_account_type_HA(topic) or not op_id: return # Get Account ID and Operation Index account_id, op_index = self._get_account_and_op_HA(op_id) - db_table_name = self.topic2dbtable_dict.get(topic) + db_table_name = self.topic2dbtable_dict[topic] # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED) # If the account exist, register the HA task. # Update DB for HA tasks q_filter = {'_id': account_id} update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState, - '_admin.operations.{}.detailed-status'.format(op_index): detailed_status} + '_admin.operations.{}.detailed-status'.format(op_index): detailed_status, + '_admin.operations.{}.worker'.format(op_index): None, + '_admin.current_operation': None} self.db.set_one(db_table_name, q_filter=q_filter, update_dict=update_dict, diff --git a/osm_lcm/vim_sdn.py b/osm_lcm/vim_sdn.py index b216c77..afdabc2 100644 --- a/osm_lcm/vim_sdn.py +++ b/osm_lcm/vim_sdn.py @@ -17,6 +17,7 @@ ## import yaml +import asyncio import logging import logging.handlers from osm_lcm import ROclient @@ -26,6 +27,7 @@ from n2vc.k8s_juju_conn import K8sJujuConnector from n2vc.exceptions import K8sException, N2VCException from osm_common.dbbase import DbException from copy import deepcopy +from time import time __author__ = "Alfonso Tierno" @@ -67,8 +69,6 @@ class VimLcm(LcmBase): db_vim_update = {} exc = None RO_sdn_id = None - operationState_HA = '' - detailed_status_HA = '' try: step = "Getting vim-id='{}' from db".format(vim_id) db_vim = self.db.get_one("vim_accounts", {"_id": vim_id}) @@ -138,8 +138,8 @@ class VimLcm(LcmBase): db_vim_update["_admin.operationalState"] = "ENABLED" db_vim_update["_admin.detailed-status"] = "Done" # Mark the VIM 'create' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' # await asyncio.sleep(15) # TODO remove. This is for test self.logger.debug(logging_text + "Exit Ok VIM account created at RO_vim_account_id={}".format(desc["uuid"])) @@ -156,16 +156,16 @@ class VimLcm(LcmBase): db_vim_update["_admin.operationalState"] = "ERROR" db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the VIM 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_vim_update: self.update_db_2("vim_accounts", vim_id, db_vim_update) # Register the VIM 'create' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('vim', 'create', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('vim', 'create', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) @@ -189,8 +189,6 @@ class VimLcm(LcmBase): RO_sdn_id = None RO_vim_id = None db_vim_update = {} - operationState_HA = '' - detailed_status_HA = '' step = "Getting vim-id='{}' from db".format(vim_id) try: # wait for any previous tasks in process @@ -268,8 +266,8 @@ class VimLcm(LcmBase): await RO.edit("vim_account", RO_vim_id, descriptor=vim_account_RO) db_vim_update["_admin.operationalState"] = "ENABLED" # Mark the VIM 'edit' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' self.logger.debug(logging_text + "Exit Ok RO_vim_id={}".format(RO_vim_id)) return @@ -285,16 +283,16 @@ class VimLcm(LcmBase): db_vim_update["_admin.operationalState"] = "ERROR" db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the VIM 'edit' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_vim_update: self.update_db_2("vim_accounts", vim_id, db_vim_update) # Register the VIM 'edit' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('vim', 'edit', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('vim', 'edit', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) @@ -316,8 +314,6 @@ class VimLcm(LcmBase): db_vim = None db_vim_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' step = "Getting vim from db" try: # wait for any previous tasks in process @@ -364,11 +360,11 @@ class VimLcm(LcmBase): db_vim_update["_admin.operationalState"] = "ERROR" db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the VIM 'delete' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) - self.lcm_tasks.register_HA('vim', 'delete', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) + self.lcm_tasks.unlock_HA('vim', 'delete', op_id, + operationState=operation_state, + detailed_status=operation_details) try: if db_vim and db_vim_update: self.update_db_2("vim_accounts", vim_id, db_vim_update) @@ -413,8 +409,6 @@ class WimLcm(LcmBase): db_wim = None db_wim_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' try: step = "Getting wim-id='{}' from db".format(wim_id) db_wim = self.db.get_one("wim_accounts", {"_id": wim_id}) @@ -465,8 +459,8 @@ class WimLcm(LcmBase): db_wim_update["_admin.operationalState"] = "ENABLED" db_wim_update["_admin.detailed-status"] = "Done" # Mark the WIM 'create' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' self.logger.debug(logging_text + "Exit Ok WIM account created at RO_wim_account_id={}".format(desc["uuid"])) return @@ -482,16 +476,16 @@ class WimLcm(LcmBase): db_wim_update["_admin.operationalState"] = "ERROR" db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_wim_update: self.update_db_2("wim_accounts", wim_id, db_wim_update) # Register the WIM 'create' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('wim', 'create', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('wim', 'create', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("wim_account", wim_id, order_id) @@ -514,8 +508,6 @@ class WimLcm(LcmBase): RO_wim_id = None db_wim_update = {} step = "Getting wim-id='{}' from db".format(wim_id) - operationState_HA = '' - detailed_status_HA = '' try: # wait for any previous tasks in process await self.lcm_tasks.waitfor_related_HA('wim', 'edit', op_id) @@ -572,8 +564,8 @@ class WimLcm(LcmBase): await RO.edit("wim_account", RO_wim_id, descriptor=wim_account_RO) db_wim_update["_admin.operationalState"] = "ENABLED" # Mark the WIM 'edit' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' self.logger.debug(logging_text + "Exit Ok RO_wim_id={}".format(RO_wim_id)) return @@ -589,16 +581,16 @@ class WimLcm(LcmBase): db_wim_update["_admin.operationalState"] = "ERROR" db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'edit' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_wim_update: self.update_db_2("wim_accounts", wim_id, db_wim_update) # Register the WIM 'edit' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('wim', 'edit', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('wim', 'edit', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("wim_account", wim_id, order_id) @@ -620,8 +612,6 @@ class WimLcm(LcmBase): db_wim_update = {} exc = None step = "Getting wim from db" - operationState_HA = '' - detailed_status_HA = '' try: # wait for any previous tasks in process await self.lcm_tasks.waitfor_related_HA('wim', 'delete', op_id) @@ -667,11 +657,11 @@ class WimLcm(LcmBase): db_wim_update["_admin.operationalState"] = "ERROR" db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'delete' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) - self.lcm_tasks.register_HA('wim', 'delete', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) + self.lcm_tasks.unlock_HA('wim', 'delete', op_id, + operationState=operation_state, + detailed_status=operation_details) try: if db_wim and db_wim_update: self.update_db_2("wim_accounts", wim_id, db_wim_update) @@ -715,8 +705,6 @@ class SdnLcm(LcmBase): db_sdn_update = {} RO_sdn_id = None exc = None - operationState_HA = '' - detailed_status_HA = '' try: step = "Getting sdn from db" db_sdn = self.db.get_one("sdns", {"_id": sdn_id}) @@ -742,8 +730,8 @@ class SdnLcm(LcmBase): db_sdn_update["_admin.operationalState"] = "ENABLED" self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id)) # Mark the SDN 'create' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' return except (ROclient.ROClientException, DbException) as e: @@ -757,16 +745,16 @@ class SdnLcm(LcmBase): db_sdn_update["_admin.operationalState"] = "ERROR" db_sdn_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the SDN 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_sdn and db_sdn_update: self.update_db_2("sdns", sdn_id, db_sdn_update) # Register the SDN 'create' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('sdn', 'create', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('sdn', 'create', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("sdn", sdn_id, order_id) @@ -787,8 +775,6 @@ class SdnLcm(LcmBase): db_sdn = None db_sdn_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' step = "Getting sdn from db" try: # wait for any previous tasks in process @@ -812,8 +798,8 @@ class SdnLcm(LcmBase): await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO) db_sdn_update["_admin.operationalState"] = "ENABLED" # Mark the SDN 'edit' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id)) return @@ -829,16 +815,16 @@ class SdnLcm(LcmBase): db_sdn["_admin.operationalState"] = "ERROR" db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the SDN 'edit' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_sdn_update: self.update_db_2("sdns", sdn_id, db_sdn_update) # Register the SDN 'edit' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('sdn', 'edit', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('sdn', 'edit', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("sdn", sdn_id, order_id) @@ -859,8 +845,6 @@ class SdnLcm(LcmBase): db_sdn = None db_sdn_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' step = "Getting sdn from db" try: # wait for any previous tasks in process @@ -897,11 +881,11 @@ class SdnLcm(LcmBase): db_sdn["_admin.operationalState"] = "ERROR" db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the SDN 'delete' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) - self.lcm_tasks.register_HA('sdn', 'delete', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) + self.lcm_tasks.unlock_HA('sdn', 'delete', op_id, + operationState=operation_state, + detailed_status=operation_details) try: if db_sdn and db_sdn_update: self.update_db_2("sdns", sdn_id, db_sdn_update) @@ -913,6 +897,7 @@ class SdnLcm(LcmBase): class K8sClusterLcm(LcmBase): + timeout_create = 600 def __init__(self, db, msg, fs, lcm_tasks, config, loop): """ @@ -945,15 +930,15 @@ class K8sClusterLcm(LcmBase): db=self.db, on_update_db=None ) + self.k8s_map = { + "helm-chart": self.helm_k8scluster, + "juju-bundle": self.juju_k8scluster, + } super().__init__(db, msg, fs, self.logger) async def create(self, k8scluster_content, order_id): - # HA tasks and backward compatibility: - # If 'vim_content' does not include 'op_id', we a running a legacy NBI version. - # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing. - # Register 'create' task here for related future HA operations op_id = k8scluster_content.pop('op_id', None) if not self.lcm_tasks.lock_HA('k8scluster', 'create', op_id): return @@ -964,10 +949,7 @@ class K8sClusterLcm(LcmBase): db_k8scluster = None db_k8scluster_update = {} - exc = None - operationState_HA = '' - detailed_status_HA = '' try: step = "Getting k8scluster-id='{}' from db".format(k8scluster_id) self.logger.debug(logging_text + step) @@ -975,50 +957,69 @@ class K8sClusterLcm(LcmBase): self.db.encrypt_decrypt_fields(db_k8scluster.get("credentials"), 'decrypt', ['password', 'secret'], schema_version=db_k8scluster["schema_version"], salt=db_k8scluster["_id"]) k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials")) - error_text_list = [] + pending_tasks = [] + task2name = {} init_target = deep_get(db_k8scluster, ("_admin", "init")) - # helm-chart - if not init_target or "helm-chart" in init_target: - k8s_hc_id = None - try: - k8s_hc_id, uninstall_sw = await self.helm_k8scluster.init_env(k8s_credentials, - reuse_cluster_uuid=k8scluster_id) - db_k8scluster_update["_admin.helm-chart.id"] = k8s_hc_id - db_k8scluster_update["_admin.helm-chart.created"] = uninstall_sw - except Exception as e: - error_text_list.append("Failing init helm-chart: {}".format(e)) - db_k8scluster_update["_admin.helm-chart.error_msg"] = str(e) - if isinstance(e, K8sException): - self.logger.error(logging_text + "Failing init helm-chart: {}".format(e)) - else: - self.logger.error(logging_text + "Failing init helm-chart: {}".format(e), exc_info=True) + step = "Launching k8scluster init tasks" + for task_name in ("helm-chart", "juju-bundle"): + if init_target and task_name not in init_target: + continue + task = asyncio.ensure_future(self.k8s_map[task_name].init_env(k8s_credentials, + reuse_cluster_uuid=k8scluster_id)) + pending_tasks.append(task) + task2name[task] = task_name - if not init_target or "juju-bundle" in init_target: - # Juju/k8s cluster - k8s_jb_id = None - try: - k8s_jb_id, uninstall_sw = await self.juju_k8scluster.init_env( - k8s_credentials, - reuse_cluster_uuid=k8scluster_id - ) - db_k8scluster_update["_admin.juju-bundle.id"] = k8s_jb_id - db_k8scluster_update["_admin.juju-bundle.created"] = uninstall_sw - except Exception as e: - error_text_list.append("Failing init juju-bundle: {}".format(e)) - db_k8scluster_update["_admin.juju-bundle.error_msg"] = str(e) - if isinstance(e, N2VCException): - self.logger.error(logging_text + "Failing init juju-bundle: {}".format(e)) + error_text_list = [] + tasks_name_ok = [] + reached_timeout = False + now = time() + + while pending_tasks: + _timeout = max(1, self.timeout_create - (time() - now)) # ensure not negative with max + step = "Waiting for k8scluster init tasks" + done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout, + return_when=asyncio.FIRST_COMPLETED) + if not done: + # timeout. Set timeout is reached and process pending as if they hase been finished + done = pending_tasks + pending_tasks = None + reached_timeout = True + for task in done: + task_name = task2name[task] + if reached_timeout: + exc = "Timeout" + elif task.cancelled(): + exc = "Cancelled" else: - self.logger.error(logging_text + "Failing init juju-bundle: {}".format(e), exc_info=True) - - # mark as an error if both helm-chart and juju-bundle have been failed - if k8s_hc_id or k8s_jb_id: - self.logger.debug(logging_text + "successfully created") - db_k8scluster_update["_admin.operationalState"] = "ENABLED" - else: - self.logger.debug(logging_text + "created with errors") + exc = task.exception() + + if exc: + error_text_list.append("Failing init {}: {}".format(task_name, exc)) + db_k8scluster_update["_admin.{}.error_msg".format(task_name)] = str(exc) + db_k8scluster_update["_admin.{}.id".format(task_name)] = None + self.logger.error(logging_text + "{} init fail: {}".format(task_name, exc), + exc_info=not isinstance(exc, (N2VCException, str))) + else: + k8s_id, uninstall_sw = task.result() + tasks_name_ok.append(task_name) + self.logger.debug(logging_text + "{} init success. id={} created={}".format( + task_name, k8s_id, uninstall_sw)) + db_k8scluster_update["_admin.{}.error_msg".format(task_name)] = None + db_k8scluster_update["_admin.{}.id".format(task_name)] = k8s_id + db_k8scluster_update["_admin.{}.created".format(task_name)] = uninstall_sw + db_k8scluster_update["_admin.operationalState"] = "ENABLED" + # update database + step = "Updating database for " + task_name + self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update) + if not tasks_name_ok: db_k8scluster_update["_admin.operationalState"] = "ERROR" - db_k8scluster_update["_admin.detailed-status"] = ";".join(error_text_list) + operation_state = "FAILED" + else: + db_k8scluster_update["_admin.detailed-status"] = "ready for " + ", ;".join(tasks_name_ok) + operation_state = "COMPLETED" + operation_details = db_k8scluster_update["_admin.detailed-status"] = ";".join(error_text_list) + self.logger.debug(logging_text + "Done. Result: " + operation_state) + exc = None except Exception as e: self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) @@ -1027,19 +1028,16 @@ class K8sClusterLcm(LcmBase): if exc and db_k8scluster: db_k8scluster_update["_admin.operationalState"] = "ERROR" db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) - - # Mark the k8scluster 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: - if db_k8scluster_update: + if db_k8scluster and db_k8scluster_update: self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update) - # Register the K8scluster 'create' HA task either - # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('k8scluster', 'create', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + # Register the operation and unlock + self.lcm_tasks.unlock_HA('k8scluster', 'create', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id) @@ -1061,8 +1059,6 @@ class K8sClusterLcm(LcmBase): db_k8scluster = None db_k8scluster_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' try: step = "Getting k8scluster='{}' from db".format(k8scluster_id) self.logger.debug(logging_text + step) @@ -1070,17 +1066,16 @@ class K8sClusterLcm(LcmBase): k8s_hc_id = deep_get(db_k8scluster, ("_admin", "helm-chart", "id")) k8s_jb_id = deep_get(db_k8scluster, ("_admin", "juju-bundle", "id")) - uninstall_sw = deep_get(db_k8scluster, ("_admin", "helm-chart", "created")) cluster_removed = True if k8s_jb_id: # delete in reverse order of creation step = "Removing juju-bundle '{}'".format(k8s_jb_id) - uninstall_sw = uninstall_sw or False + uninstall_sw = deep_get(db_k8scluster, ("_admin", "juju-bundle", "created")) or False cluster_removed = await self.juju_k8scluster.reset(cluster_uuid=k8s_jb_id, uninstall_sw=uninstall_sw) db_k8scluster_update["_admin.juju-bundle.id"] = None if k8s_hc_id: step = "Removing helm-chart '{}'".format(k8s_hc_id) - uninstall_sw = uninstall_sw or False + uninstall_sw = deep_get(db_k8scluster, ("_admin", "helm-chart", "created")) or False cluster_removed = await self.helm_k8scluster.reset(cluster_uuid=k8s_hc_id, uninstall_sw=uninstall_sw) db_k8scluster_update["_admin.helm-chart.id"] = None @@ -1111,16 +1106,20 @@ class K8sClusterLcm(LcmBase): db_k8scluster_update["_admin.operationalState"] = "ERROR" db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) + else: + operation_state = 'COMPLETED' + operation_details = "deleted" + try: if db_k8scluster_update: self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update) # Register the K8scluster 'delete' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('k8scluster', 'delete', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('k8scluster', 'delete', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id) @@ -1171,8 +1170,8 @@ class K8sRepoLcm(LcmBase): db_k8srepo = None db_k8srepo_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' + operation_state = 'COMPLETED' + operation_details = '' try: step = "Getting k8srepo-id='{}' from db".format(k8srepo_id) self.logger.debug(logging_text + step) @@ -1186,16 +1185,16 @@ class K8sRepoLcm(LcmBase): db_k8srepo_update["_admin.operationalState"] = "ERROR" db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_k8srepo_update: self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update) # Register the K8srepo 'create' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('k8srepo', 'create', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('k8srepo', 'create', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id) @@ -1218,8 +1217,8 @@ class K8sRepoLcm(LcmBase): db_k8srepo_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' + operation_state = 'COMPLETED' + operation_details = '' try: step = "Getting k8srepo-id='{}' from db".format(k8srepo_id) self.logger.debug(logging_text + step) @@ -1233,16 +1232,16 @@ class K8sRepoLcm(LcmBase): db_k8srepo_update["_admin.operationalState"] = "ERROR" db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_k8srepo_update: self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update) # Register the K8srepo 'delete' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('k8srepo', 'delete', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('k8srepo', 'delete', op_id, + operationState=operation_state, + detailed_status=operation_details) self.db.del_one("k8srepos", {"_id": k8srepo_id}) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) -- 2.17.1