X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fvim_sdn.py;h=a5eff36a275ee3e096d0cdf86e9a49ec0bda9800;hb=ba89cbb37502265011935fddbc04cb4304b14ca2;hp=6ad89dcc69d4b453dfe7e3c4c48bc56b82c64134;hpb=baacc3018ab5fc2f003545fd85b0c45abe8150c0;p=osm%2FLCM.git diff --git a/osm_lcm/vim_sdn.py b/osm_lcm/vim_sdn.py index 6ad89dc..a5eff36 100644 --- a/osm_lcm/vim_sdn.py +++ b/osm_lcm/vim_sdn.py @@ -16,16 +16,18 @@ # under the License. ## -import asyncio import yaml +import asyncio import logging import logging.handlers from osm_lcm import ROclient from osm_lcm.lcm_utils import LcmException, LcmBase, deep_get from n2vc.k8s_helm_conn import K8sHelmConnector from n2vc.k8s_juju_conn import K8sJujuConnector +from n2vc.exceptions import K8sException, N2VCException from osm_common.dbbase import DbException from copy import deepcopy +from time import time __author__ = "Alfonso Tierno" @@ -35,7 +37,7 @@ class VimLcm(LcmBase): vim_config_encrypted = {"1.1": ("admin_password", "nsx_password", "vcenter_password"), "default": ("admin_password", "nsx_password", "vcenter_password", "vrops_password")} - def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop): + def __init__(self, db, msg, fs, lcm_tasks, config, loop): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -45,7 +47,7 @@ class VimLcm(LcmBase): self.logger = logging.getLogger('lcm.vim') self.loop = loop self.lcm_tasks = lcm_tasks - self.ro_config = ro_config + self.ro_config = config["ro_config"] super().__init__(db, msg, fs, self.logger) @@ -60,7 +62,6 @@ class VimLcm(LcmBase): return vim_id = vim_content["_id"] - vim_content.pop("op_id", None) logging_text = "Task vim_create={} ".format(vim_id) self.logger.debug(logging_text + "Enter") @@ -68,8 +69,6 @@ class VimLcm(LcmBase): db_vim_update = {} exc = None RO_sdn_id = None - operationState_HA = '' - detailed_status_HA = '' try: step = "Getting vim-id='{}' from db".format(vim_id) db_vim = self.db.get_one("vim_accounts", {"_id": vim_id}) @@ -139,14 +138,14 @@ class VimLcm(LcmBase): db_vim_update["_admin.operationalState"] = "ENABLED" db_vim_update["_admin.detailed-status"] = "Done" # Mark the VIM 'create' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' # await asyncio.sleep(15) # TODO remove. This is for test self.logger.debug(logging_text + "Exit Ok VIM account created at RO_vim_account_id={}".format(desc["uuid"])) return - except (ROclient.ROClientException, DbException) as e: + except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except Exception as e: @@ -157,16 +156,16 @@ class VimLcm(LcmBase): db_vim_update["_admin.operationalState"] = "ERROR" db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the VIM 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_vim_update: self.update_db_2("vim_accounts", vim_id, db_vim_update) # Register the VIM 'create' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('vim', 'create', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('vim', 'create', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) @@ -182,7 +181,6 @@ class VimLcm(LcmBase): return vim_id = vim_content["_id"] - vim_content.pop("op_id", None) logging_text = "Task vim_edit={} ".format(vim_id) self.logger.debug(logging_text + "Enter") @@ -191,8 +189,6 @@ class VimLcm(LcmBase): RO_sdn_id = None RO_vim_id = None db_vim_update = {} - operationState_HA = '' - detailed_status_HA = '' step = "Getting vim-id='{}' from db".format(vim_id) try: # wait for any previous tasks in process @@ -270,13 +266,13 @@ class VimLcm(LcmBase): await RO.edit("vim_account", RO_vim_id, descriptor=vim_account_RO) db_vim_update["_admin.operationalState"] = "ENABLED" # Mark the VIM 'edit' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' self.logger.debug(logging_text + "Exit Ok RO_vim_id={}".format(RO_vim_id)) return - except (ROclient.ROClientException, DbException) as e: + except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except Exception as e: @@ -287,16 +283,16 @@ class VimLcm(LcmBase): db_vim_update["_admin.operationalState"] = "ERROR" db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the VIM 'edit' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_vim_update: self.update_db_2("vim_accounts", vim_id, db_vim_update) # Register the VIM 'edit' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('vim', 'edit', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('vim', 'edit', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) @@ -318,8 +314,6 @@ class VimLcm(LcmBase): db_vim = None db_vim_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' step = "Getting vim from db" try: # wait for any previous tasks in process @@ -354,7 +348,7 @@ class VimLcm(LcmBase): self.logger.debug(logging_text + "Exit Ok") return - except (ROclient.ROClientException, DbException) as e: + except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except Exception as e: @@ -366,11 +360,11 @@ class VimLcm(LcmBase): db_vim_update["_admin.operationalState"] = "ERROR" db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the VIM 'delete' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) - self.lcm_tasks.register_HA('vim', 'delete', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) + self.lcm_tasks.unlock_HA('vim', 'delete', op_id, + operationState=operation_state, + detailed_status=operation_details) try: if db_vim and db_vim_update: self.update_db_2("vim_accounts", vim_id, db_vim_update) @@ -385,7 +379,7 @@ class WimLcm(LcmBase): # values that are encrypted at wim config because they are passwords wim_config_encrypted = () - def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop): + def __init__(self, db, msg, fs, lcm_tasks, config, loop): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -395,7 +389,7 @@ class WimLcm(LcmBase): self.logger = logging.getLogger('lcm.vim') self.loop = loop self.lcm_tasks = lcm_tasks - self.ro_config = ro_config + self.ro_config = config["ro_config"] super().__init__(db, msg, fs, self.logger) @@ -409,15 +403,12 @@ class WimLcm(LcmBase): self.lcm_tasks.lock_HA('wim', 'create', op_id) wim_id = wim_content["_id"] - wim_content.pop("op_id", None) logging_text = "Task wim_create={} ".format(wim_id) self.logger.debug(logging_text + "Enter") db_wim = None db_wim_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' try: step = "Getting wim-id='{}' from db".format(wim_id) db_wim = self.db.get_one("wim_accounts", {"_id": wim_id}) @@ -468,13 +459,13 @@ class WimLcm(LcmBase): db_wim_update["_admin.operationalState"] = "ENABLED" db_wim_update["_admin.detailed-status"] = "Done" # Mark the WIM 'create' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' self.logger.debug(logging_text + "Exit Ok WIM account created at RO_wim_account_id={}".format(desc["uuid"])) return - except (ROclient.ROClientException, DbException) as e: + except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except Exception as e: @@ -485,16 +476,16 @@ class WimLcm(LcmBase): db_wim_update["_admin.operationalState"] = "ERROR" db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_wim_update: self.update_db_2("wim_accounts", wim_id, db_wim_update) # Register the WIM 'create' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('wim', 'create', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('wim', 'create', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("wim_account", wim_id, order_id) @@ -509,7 +500,6 @@ class WimLcm(LcmBase): return wim_id = wim_content["_id"] - wim_content.pop("op_id", None) logging_text = "Task wim_edit={} ".format(wim_id) self.logger.debug(logging_text + "Enter") @@ -518,8 +508,6 @@ class WimLcm(LcmBase): RO_wim_id = None db_wim_update = {} step = "Getting wim-id='{}' from db".format(wim_id) - operationState_HA = '' - detailed_status_HA = '' try: # wait for any previous tasks in process await self.lcm_tasks.waitfor_related_HA('wim', 'edit', op_id) @@ -576,13 +564,13 @@ class WimLcm(LcmBase): await RO.edit("wim_account", RO_wim_id, descriptor=wim_account_RO) db_wim_update["_admin.operationalState"] = "ENABLED" # Mark the WIM 'edit' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' self.logger.debug(logging_text + "Exit Ok RO_wim_id={}".format(RO_wim_id)) return - except (ROclient.ROClientException, DbException) as e: + except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except Exception as e: @@ -593,16 +581,16 @@ class WimLcm(LcmBase): db_wim_update["_admin.operationalState"] = "ERROR" db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'edit' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_wim_update: self.update_db_2("wim_accounts", wim_id, db_wim_update) # Register the WIM 'edit' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('wim', 'edit', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('wim', 'edit', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("wim_account", wim_id, order_id) @@ -624,8 +612,6 @@ class WimLcm(LcmBase): db_wim_update = {} exc = None step = "Getting wim from db" - operationState_HA = '' - detailed_status_HA = '' try: # wait for any previous tasks in process await self.lcm_tasks.waitfor_related_HA('wim', 'delete', op_id) @@ -659,7 +645,7 @@ class WimLcm(LcmBase): self.logger.debug(logging_text + "Exit Ok") return - except (ROclient.ROClientException, DbException) as e: + except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except Exception as e: @@ -671,11 +657,11 @@ class WimLcm(LcmBase): db_wim_update["_admin.operationalState"] = "ERROR" db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'delete' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) - self.lcm_tasks.register_HA('wim', 'delete', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) + self.lcm_tasks.unlock_HA('wim', 'delete', op_id, + operationState=operation_state, + detailed_status=operation_details) try: if db_wim and db_wim_update: self.update_db_2("wim_accounts", wim_id, db_wim_update) @@ -688,7 +674,7 @@ class WimLcm(LcmBase): class SdnLcm(LcmBase): - def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop): + def __init__(self, db, msg, fs, lcm_tasks, config, loop): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -698,7 +684,7 @@ class SdnLcm(LcmBase): self.logger = logging.getLogger('lcm.sdn') self.loop = loop self.lcm_tasks = lcm_tasks - self.ro_config = ro_config + self.ro_config = config["ro_config"] super().__init__(db, msg, fs, self.logger) @@ -712,7 +698,6 @@ class SdnLcm(LcmBase): self.lcm_tasks.lock_HA('sdn', 'create', op_id) sdn_id = sdn_content["_id"] - sdn_content.pop("op_id", None) logging_text = "Task sdn_create={} ".format(sdn_id) self.logger.debug(logging_text + "Enter") @@ -720,8 +705,6 @@ class SdnLcm(LcmBase): db_sdn_update = {} RO_sdn_id = None exc = None - operationState_HA = '' - detailed_status_HA = '' try: step = "Getting sdn from db" db_sdn = self.db.get_one("sdns", {"_id": sdn_id}) @@ -747,11 +730,11 @@ class SdnLcm(LcmBase): db_sdn_update["_admin.operationalState"] = "ENABLED" self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id)) # Mark the SDN 'create' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' return - except (ROclient.ROClientException, DbException) as e: + except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except Exception as e: @@ -762,16 +745,16 @@ class SdnLcm(LcmBase): db_sdn_update["_admin.operationalState"] = "ERROR" db_sdn_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the SDN 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_sdn and db_sdn_update: self.update_db_2("sdns", sdn_id, db_sdn_update) # Register the SDN 'create' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('sdn', 'create', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('sdn', 'create', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("sdn", sdn_id, order_id) @@ -786,15 +769,12 @@ class SdnLcm(LcmBase): return sdn_id = sdn_content["_id"] - sdn_content.pop("op_id", None) logging_text = "Task sdn_edit={} ".format(sdn_id) self.logger.debug(logging_text + "Enter") db_sdn = None db_sdn_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' step = "Getting sdn from db" try: # wait for any previous tasks in process @@ -818,13 +798,13 @@ class SdnLcm(LcmBase): await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO) db_sdn_update["_admin.operationalState"] = "ENABLED" # Mark the SDN 'edit' HA task as successful - operationState_HA = 'COMPLETED' - detailed_status_HA = 'Done' + operation_state = 'COMPLETED' + operation_details = 'Done' self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id)) return - except (ROclient.ROClientException, DbException) as e: + except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except Exception as e: @@ -835,16 +815,16 @@ class SdnLcm(LcmBase): db_sdn["_admin.operationalState"] = "ERROR" db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the SDN 'edit' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_sdn_update: self.update_db_2("sdns", sdn_id, db_sdn_update) # Register the SDN 'edit' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('sdn', 'edit', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('sdn', 'edit', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("sdn", sdn_id, order_id) @@ -865,8 +845,6 @@ class SdnLcm(LcmBase): db_sdn = None db_sdn_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' step = "Getting sdn from db" try: # wait for any previous tasks in process @@ -892,7 +870,7 @@ class SdnLcm(LcmBase): self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id)) return - except (ROclient.ROClientException, DbException) as e: + except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except Exception as e: @@ -903,11 +881,11 @@ class SdnLcm(LcmBase): db_sdn["_admin.operationalState"] = "ERROR" db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the SDN 'delete' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) - self.lcm_tasks.register_HA('sdn', 'delete', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) + self.lcm_tasks.unlock_HA('sdn', 'delete', op_id, + operationState=operation_state, + detailed_status=operation_details) try: if db_sdn and db_sdn_update: self.update_db_2("sdns", sdn_id, db_sdn_update) @@ -919,8 +897,9 @@ class SdnLcm(LcmBase): class K8sClusterLcm(LcmBase): + timeout_create = 300 - def __init__(self, db, msg, fs, lcm_tasks, vca_config, loop): + def __init__(self, db, msg, fs, lcm_tasks, config, loop): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -930,7 +909,7 @@ class K8sClusterLcm(LcmBase): self.logger = logging.getLogger('lcm.k8scluster') self.loop = loop self.lcm_tasks = lcm_tasks - self.vca_config = vca_config + self.vca_config = config["VCA"] self.fs = fs self.db = db @@ -949,110 +928,128 @@ class K8sClusterLcm(LcmBase): fs=self.fs, log=self.logger, db=self.db, - on_update_db=None + loop=self.loop, + on_update_db=None, + vca_config=self.vca_config, ) + self.k8s_map = { + "helm-chart": self.helm_k8scluster, + "juju-bundle": self.juju_k8scluster, + } super().__init__(db, msg, fs, self.logger) async def create(self, k8scluster_content, order_id): - # HA tasks and backward compatibility: - # If 'vim_content' does not include 'op_id', we a running a legacy NBI version. - # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing. - # Register 'create' task here for related future HA operations op_id = k8scluster_content.pop('op_id', None) if not self.lcm_tasks.lock_HA('k8scluster', 'create', op_id): return k8scluster_id = k8scluster_content["_id"] - k8scluster_content.pop("op_id", None) logging_text = "Task k8scluster_create={} ".format(k8scluster_id) self.logger.debug(logging_text + "Enter") db_k8scluster = None db_k8scluster_update = {} - db_juju_k8scluster_update = {} - exc = None - operationState_HA = '' - detailed_status_HA = '' try: step = "Getting k8scluster-id='{}' from db".format(k8scluster_id) self.logger.debug(logging_text + step) db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id}) self.db.encrypt_decrypt_fields(db_k8scluster.get("credentials"), 'decrypt', ['password', 'secret'], schema_version=db_k8scluster["schema_version"], salt=db_k8scluster["_id"]) - - k8s_hc_id, uninstall_sw = await self.helm_k8scluster.init_env( - yaml.safe_dump(db_k8scluster.get("credentials")) - ) - db_k8scluster_update["_admin.helm-chart.id"] = k8s_hc_id - db_k8scluster_update["_admin.helm-chart.created"] = uninstall_sw - - # Juju/k8s cluster - k8s_jb_id, uninstall_sw = await self.juju_k8scluster.init_env( - yaml.safe_dump(db_k8scluster.get("credentials")) - ) - db_k8scluster_update["_admin.juju-bundle.id"] = k8s_jb_id - db_k8scluster_update["_admin.juju-bundle.created"] = uninstall_sw - - step = "Getting the list of repos" - self.logger.debug(logging_text + step) - task_list = [] - db_k8srepo_list = self.db.get_list("k8srepos", {}) - for repo in db_k8srepo_list: - step = "Adding repo {} to cluster: {}".format(repo["name"], k8s_hc_id) - self.logger.debug(logging_text + step) - task = asyncio.ensure_future(self.helm_k8scluster.repo_add(cluster_uuid=k8s_hc_id, - name=repo["name"], url=repo["url"], - repo_type="chart")) - task_list.append(task) - if not repo["_admin"].get("cluster-inserted"): - repo["_admin"]["cluster-inserted"] = [] - repo["_admin"]["cluster-inserted"].append(k8s_hc_id) - self.update_db_2("k8srepos", repo["_id"], repo) - - done = None - pending = None - if len(task_list) > 0: - self.logger.debug('Waiting for terminate pending tasks...') - done, pending = await asyncio.wait(task_list, timeout=3600) - if not pending: - self.logger.debug('All tasks finished...') - else: - self.logger.info('There are pending tasks: {}'.format(pending)) - db_k8scluster_update["_admin.operationalState"] = "ENABLED" + k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials")) + pending_tasks = [] + task2name = {} + init_target = deep_get(db_k8scluster, ("_admin", "init")) + step = "Launching k8scluster init tasks" + for task_name in ("helm-chart", "juju-bundle"): + if init_target and task_name not in init_target: + continue + task = asyncio.ensure_future(self.k8s_map[task_name].init_env(k8s_credentials, + reuse_cluster_uuid=k8scluster_id)) + pending_tasks.append(task) + task2name[task] = task_name + + error_text_list = [] + tasks_name_ok = [] + reached_timeout = False + now = time() + + while pending_tasks: + _timeout = max(1, self.timeout_create - (time() - now)) # ensure not negative with max + step = "Waiting for k8scluster init tasks" + done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout, + return_when=asyncio.FIRST_COMPLETED) + if not done: + # timeout. Set timeout is reached and process pending as if they hase been finished + done = pending_tasks + pending_tasks = None + reached_timeout = True + for task in done: + task_name = task2name[task] + if reached_timeout: + exc = "Timeout" + elif task.cancelled(): + exc = "Cancelled" + else: + exc = task.exception() + + if exc: + error_text_list.append("Failing init {}: {}".format(task_name, exc)) + db_k8scluster_update["_admin.{}.error_msg".format(task_name)] = str(exc) + db_k8scluster_update["_admin.{}.id".format(task_name)] = None + db_k8scluster_update["_admin.{}.operationalState".format(task_name)] = "ERROR" + self.logger.error(logging_text + "{} init fail: {}".format(task_name, exc), + exc_info=not isinstance(exc, (N2VCException, str))) + else: + k8s_id, uninstall_sw = task.result() + tasks_name_ok.append(task_name) + self.logger.debug(logging_text + "{} init success. id={} created={}".format( + task_name, k8s_id, uninstall_sw)) + db_k8scluster_update["_admin.{}.error_msg".format(task_name)] = None + db_k8scluster_update["_admin.{}.id".format(task_name)] = k8s_id + db_k8scluster_update["_admin.{}.created".format(task_name)] = uninstall_sw + db_k8scluster_update["_admin.{}.operationalState".format(task_name)] = "ENABLED" + # update database + step = "Updating database for " + task_name + self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update) + if tasks_name_ok: + operation_details = "ready for " + ", ".join(tasks_name_ok) + operation_state = "COMPLETED" + db_k8scluster_update["_admin.operationalState"] = "ENABLED" if not error_text_list else "DEGRADED" + operation_details += "; " + ";".join(error_text_list) + else: + db_k8scluster_update["_admin.operationalState"] = "ERROR" + operation_state = "FAILED" + operation_details = ";".join(error_text_list) + db_k8scluster_update["_admin.detailed-status"] = operation_details + self.logger.debug(logging_text + "Done. Result: " + operation_state) + exc = None except Exception as e: - self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) + if isinstance(e, (LcmException, DbException, K8sException, N2VCException, asyncio.CancelledError)): + self.logger.error(logging_text + "Exit Exception {}".format(e)) + else: + self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) exc = e finally: if exc and db_k8scluster: db_k8scluster_update["_admin.operationalState"] = "ERROR" db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) - - if db_juju_k8scluster_update: - db_juju_k8scluster_update["_admin.operationalState"] = "ERROR" - db_juju_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) - - # Mark the k8scluster 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: - if db_k8scluster_update: + if db_k8scluster and db_k8scluster_update: self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update) - if db_juju_k8scluster_update: - self.update_db_2("k8sclusters", k8scluster_id, db_juju_k8scluster_update) - - # Register the K8scluster 'create' HA task either - # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('k8scluster', 'create', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + # Register the operation and unlock + self.lcm_tasks.unlock_HA('k8scluster', 'create', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) - self.lcm_tasks.remove("k8sclusters", k8scluster_id, order_id) + self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id) async def delete(self, k8scluster_content, order_id): @@ -1065,15 +1062,12 @@ class K8sClusterLcm(LcmBase): return k8scluster_id = k8scluster_content["_id"] - k8scluster_content.pop("op_id", None) logging_text = "Task k8scluster_delete={} ".format(k8scluster_id) self.logger.debug(logging_text + "Enter") db_k8scluster = None db_k8scluster_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' try: step = "Getting k8scluster='{}' from db".format(k8scluster_id) self.logger.debug(logging_text + step) @@ -1081,60 +1075,70 @@ class K8sClusterLcm(LcmBase): k8s_hc_id = deep_get(db_k8scluster, ("_admin", "helm-chart", "id")) k8s_jb_id = deep_get(db_k8scluster, ("_admin", "juju-bundle", "id")) - uninstall_sw = deep_get(db_k8scluster, ("_admin", "helm-chart", "created")) cluster_removed = True + if k8s_jb_id: # delete in reverse order of creation + step = "Removing juju-bundle '{}'".format(k8s_jb_id) + uninstall_sw = deep_get(db_k8scluster, ("_admin", "juju-bundle", "created")) or False + cluster_removed = await self.juju_k8scluster.reset(cluster_uuid=k8s_jb_id, uninstall_sw=uninstall_sw) + db_k8scluster_update["_admin.juju-bundle.id"] = None + db_k8scluster_update["_admin.juju-bundle.operationalState"] = "DISABLED" + if k8s_hc_id: - uninstall_sw = uninstall_sw or False + step = "Removing helm-chart '{}'".format(k8s_hc_id) + uninstall_sw = deep_get(db_k8scluster, ("_admin", "helm-chart", "created")) or False cluster_removed = await self.helm_k8scluster.reset(cluster_uuid=k8s_hc_id, uninstall_sw=uninstall_sw) + db_k8scluster_update["_admin.helm-chart.id"] = None + db_k8scluster_update["_admin.helm-chart.operationalState"] = "DISABLED" - if k8s_jb_id: - uninstall_sw = uninstall_sw or False - cluster_removed = await self.juju_k8scluster.reset(cluster_uuid=k8s_jb_id, uninstall_sw=uninstall_sw) - - if cluster_removed: - step = "Removing k8scluster='{}' from db".format(k8scluster_id) + # Try to remove from cluster_inserted to clean old versions + if k8s_hc_id and cluster_removed: + step = "Removing k8scluster='{}' from k8srepos".format(k8scluster_id) self.logger.debug(logging_text + step) - db_k8srepo_list = self.db.get_list("k8srepos", {}) + db_k8srepo_list = self.db.get_list("k8srepos", {"_admin.cluster-inserted": k8s_hc_id}) for k8srepo in db_k8srepo_list: - index = 0 - for cluster in k8srepo["_admin"]["cluster-inserted"]: - if db_k8scluster.get("cluster-uuid") == cluster: - del(k8srepo["_admin"]["cluster-inserted"][index]) - break - index += 1 - self.update_db_2("k8srepos", k8srepo["_id"], k8srepo) - self.db.del_one("k8sclusters", {"_id": k8scluster_id}) - else: - raise LcmException("An error happened during the reset of the k8s cluster '{}'".format(k8scluster_id)) - # if not cluster_removed: - # raise Exception("K8scluster was not properly removed") + try: + cluster_list = k8srepo["_admin"]["cluster-inserted"] + cluster_list.remove(k8s_hc_id) + self.update_db_2("k8srepos", k8srepo["_id"], {"_admin.cluster-inserted": cluster_list}) + except Exception as e: + self.logger.error("{}: {}".format(step, e)) + self.db.del_one("k8sclusters", {"_id": k8scluster_id}) + db_k8scluster_update = None + self.logger.debug(logging_text + "Done") except Exception as e: - self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) + if isinstance(e, (LcmException, DbException, K8sException, N2VCException, asyncio.CancelledError)): + self.logger.error(logging_text + "Exit Exception {}".format(e)) + else: + self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) exc = e finally: if exc and db_k8scluster: db_k8scluster_update["_admin.operationalState"] = "ERROR" db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) + else: + operation_state = 'COMPLETED' + operation_details = "deleted" + try: if db_k8scluster_update: self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update) # Register the K8scluster 'delete' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('k8scluster', 'delete', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('k8scluster', 'delete', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) - self.lcm_tasks.remove("k8sclusters", k8scluster_id, order_id) + self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id) class K8sRepoLcm(LcmBase): - def __init__(self, db, msg, fs, lcm_tasks, vca_config, loop): + def __init__(self, db, msg, fs, lcm_tasks, config, loop): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -1144,7 +1148,7 @@ class K8sRepoLcm(LcmBase): self.logger = logging.getLogger('lcm.k8srepo') self.loop = loop self.lcm_tasks = lcm_tasks - self.vca_config = vca_config + self.vca_config = config["VCA"] self.fs = fs self.db = db @@ -1177,54 +1181,33 @@ class K8sRepoLcm(LcmBase): db_k8srepo = None db_k8srepo_update = {} exc = None - operationState_HA = '' - detailed_status_HA = '' + operation_state = 'COMPLETED' + operation_details = '' try: step = "Getting k8srepo-id='{}' from db".format(k8srepo_id) self.logger.debug(logging_text + step) db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id}) - step = "Getting k8scluster_list from db" - self.logger.debug(logging_text + step) - db_k8scluster_list = self.db.get_list("k8sclusters", {}) - db_k8srepo_update["_admin.cluster-inserted"] = [] - task_list = [] - for k8scluster in db_k8scluster_list: - step = "Adding repo to cluster: {}".format(k8scluster["cluster-uuid"]) - self.logger.debug(logging_text + step) - task = asyncio.ensure_future(self.k8srepo.repo_add(cluster_uuid=k8scluster["cluster-uuid"], - name=db_k8srepo["name"], url=db_k8srepo["url"], - repo_type="chart")) - task_list.append(task) - db_k8srepo_update["_admin.cluster-inserted"].append(k8scluster["cluster-uuid"]) - - done = None - pending = None - if len(task_list) > 0: - self.logger.debug('Waiting for terminate pending tasks...') - done, pending = await asyncio.wait(task_list, timeout=3600) - if not pending: - self.logger.debug('All tasks finished...') - else: - self.logger.info('There are pending tasks: {}'.format(pending)) db_k8srepo_update["_admin.operationalState"] = "ENABLED" except Exception as e: - self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) + self.logger.error(logging_text + "Exit Exception {}".format(e), + exc_info=not isinstance(e, (LcmException, DbException, K8sException, N2VCException, + asyncio.CancelledError))) exc = e finally: if exc and db_k8srepo: db_k8srepo_update["_admin.operationalState"] = "ERROR" db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_k8srepo_update: self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update) # Register the K8srepo 'create' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('k8srepo', 'create', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('k8srepo', 'create', op_id, + operationState=operation_state, + detailed_status=operation_details) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id) @@ -1246,50 +1229,35 @@ class K8sRepoLcm(LcmBase): db_k8srepo = None db_k8srepo_update = {} - operationState_HA = '' - detailed_status_HA = '' + exc = None + operation_state = 'COMPLETED' + operation_details = '' try: step = "Getting k8srepo-id='{}' from db".format(k8srepo_id) self.logger.debug(logging_text + step) db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id}) - step = "Getting k8scluster_list from db" - self.logger.debug(logging_text + step) - db_k8scluster_list = self.db.get_list("k8sclusters", {}) - - task_list = [] - for k8scluster in db_k8scluster_list: - task = asyncio.ensure_future(self.k8srepo.repo_remove(cluster_uuid=k8scluster["cluster-uuid"], - name=db_k8srepo["name"])) - task_list.append(task) - done = None - pending = None - if len(task_list) > 0: - self.logger.debug('Waiting for terminate pending tasks...') - done, pending = await asyncio.wait(task_list, timeout=3600) - if not pending: - self.logger.debug('All tasks finished...') - else: - self.logger.info('There are pending tasks: {}'.format(pending)) - self.db.del_one("k8srepos", {"_id": k8srepo_id}) except Exception as e: - self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) + self.logger.error(logging_text + "Exit Exception {}".format(e), + exc_info=not isinstance(e, (LcmException, DbException, K8sException, N2VCException, + asyncio.CancelledError))) exc = e finally: if exc and db_k8srepo: db_k8srepo_update["_admin.operationalState"] = "ERROR" db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc) # Mark the WIM 'create' HA task as erroneous - operationState_HA = 'FAILED' - detailed_status_HA = "ERROR {}: {}".format(step, exc) + operation_state = 'FAILED' + operation_details = "ERROR {}: {}".format(step, exc) try: if db_k8srepo_update: self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update) # Register the K8srepo 'delete' HA task either # succesful or erroneous, or do nothing (if legacy NBI) - self.lcm_tasks.register_HA('k8srepo', 'delete', op_id, - operationState=operationState_HA, - detailed_status=detailed_status_HA) + self.lcm_tasks.unlock_HA('k8srepo', 'delete', op_id, + operationState=operation_state, + detailed_status=operation_details) + self.db.del_one("k8srepos", {"_id": k8srepo_id}) except DbException as e: self.logger.error(logging_text + "Cannot update database: {}".format(e)) self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)