+
+
+class K8sClusterLcm(LcmBase):
+ timeout_create = 300
+
+ def __init__(self, msg, lcm_tasks, config, loop):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger("lcm.k8scluster")
+ self.loop = loop
+ self.lcm_tasks = lcm_tasks
+ self.vca_config = config["VCA"]
+
+ super().__init__(msg, self.logger)
+
+ self.helm2_k8scluster = K8sHelmConnector(
+ kubectl_command=self.vca_config.get("kubectlpath"),
+ helm_command=self.vca_config.get("helmpath"),
+ log=self.logger,
+ on_update_db=None,
+ db=self.db,
+ fs=self.fs,
+ )
+
+ self.helm3_k8scluster = K8sHelm3Connector(
+ kubectl_command=self.vca_config.get("kubectlpath"),
+ helm_command=self.vca_config.get("helm3path"),
+ fs=self.fs,
+ log=self.logger,
+ db=self.db,
+ on_update_db=None,
+ )
+
+ self.juju_k8scluster = K8sJujuConnector(
+ kubectl_command=self.vca_config.get("kubectlpath"),
+ juju_command=self.vca_config.get("jujupath"),
+ log=self.logger,
+ loop=self.loop,
+ on_update_db=None,
+ db=self.db,
+ fs=self.fs,
+ )
+
+ self.k8s_map = {
+ "helm-chart": self.helm2_k8scluster,
+ "helm-chart-v3": self.helm3_k8scluster,
+ "juju-bundle": self.juju_k8scluster,
+ }
+
+ async def create(self, k8scluster_content, order_id):
+
+ op_id = k8scluster_content.pop("op_id", None)
+ if not self.lcm_tasks.lock_HA("k8scluster", "create", op_id):
+ return
+
+ k8scluster_id = k8scluster_content["_id"]
+ logging_text = "Task k8scluster_create={} ".format(k8scluster_id)
+ self.logger.debug(logging_text + "Enter")
+
+ db_k8scluster = None
+ db_k8scluster_update = {}
+ exc = None
+ try:
+ step = "Getting k8scluster-id='{}' from db".format(k8scluster_id)
+ self.logger.debug(logging_text + step)
+ db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id})
+ self.db.encrypt_decrypt_fields(
+ db_k8scluster.get("credentials"),
+ "decrypt",
+ ["password", "secret"],
+ schema_version=db_k8scluster["schema_version"],
+ salt=db_k8scluster["_id"],
+ )
+ k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
+ pending_tasks = []
+ task2name = {}
+ init_target = deep_get(db_k8scluster, ("_admin", "init"))
+ step = "Launching k8scluster init tasks"
+ for task_name in ("helm-chart", "juju-bundle", "helm-chart-v3"):
+ if init_target and task_name not in init_target:
+ continue
+ task = asyncio.ensure_future(
+ self.k8s_map[task_name].init_env(
+ k8s_credentials,
+ reuse_cluster_uuid=k8scluster_id,
+ vca_id=db_k8scluster.get("vca_id"),
+ )
+ )
+ pending_tasks.append(task)
+ task2name[task] = task_name
+
+ error_text_list = []
+ tasks_name_ok = []
+ reached_timeout = False
+ now = time()
+
+ while pending_tasks:
+ _timeout = max(
+ 1, self.timeout_create - (time() - now)
+ ) # ensure not negative with max
+ step = "Waiting for k8scluster init tasks"
+ done, pending_tasks = await asyncio.wait(
+ pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
+ )
+ if not done:
+ # timeout. Set timeout is reached and process pending as if they hase been finished
+ done = pending_tasks
+ pending_tasks = None
+ reached_timeout = True
+ for task in done:
+ task_name = task2name[task]
+ if reached_timeout:
+ exc = "Timeout"
+ elif task.cancelled():
+ exc = "Cancelled"
+ else:
+ exc = task.exception()
+
+ if exc:
+ error_text_list.append(
+ "Failing init {}: {}".format(task_name, exc)
+ )
+ db_k8scluster_update[
+ "_admin.{}.error_msg".format(task_name)
+ ] = str(exc)
+ db_k8scluster_update["_admin.{}.id".format(task_name)] = None
+ db_k8scluster_update[
+ "_admin.{}.operationalState".format(task_name)
+ ] = "ERROR"
+ self.logger.error(
+ logging_text + "{} init fail: {}".format(task_name, exc),
+ exc_info=not isinstance(exc, (N2VCException, str)),
+ )
+ else:
+ k8s_id, uninstall_sw = task.result()
+ tasks_name_ok.append(task_name)
+ self.logger.debug(
+ logging_text
+ + "{} init success. id={} created={}".format(
+ task_name, k8s_id, uninstall_sw
+ )
+ )
+ db_k8scluster_update[
+ "_admin.{}.error_msg".format(task_name)
+ ] = None
+ db_k8scluster_update["_admin.{}.id".format(task_name)] = k8s_id
+ db_k8scluster_update[
+ "_admin.{}.created".format(task_name)
+ ] = uninstall_sw
+ db_k8scluster_update[
+ "_admin.{}.operationalState".format(task_name)
+ ] = "ENABLED"
+ # update database
+ step = "Updating database for " + task_name
+ self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
+ if tasks_name_ok:
+ operation_details = "ready for " + ", ".join(tasks_name_ok)
+ operation_state = "COMPLETED"
+ db_k8scluster_update["_admin.operationalState"] = (
+ "ENABLED" if not error_text_list else "DEGRADED"
+ )
+ operation_details += "; " + ";".join(error_text_list)
+ else:
+ db_k8scluster_update["_admin.operationalState"] = "ERROR"
+ operation_state = "FAILED"
+ operation_details = ";".join(error_text_list)
+ db_k8scluster_update["_admin.detailed-status"] = operation_details
+ self.logger.debug(logging_text + "Done. Result: " + operation_state)
+ exc = None
+
+ except Exception as e:
+ if isinstance(
+ e,
+ (
+ LcmException,
+ DbException,
+ K8sException,
+ N2VCException,
+ asyncio.CancelledError,
+ ),
+ ):
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ else:
+ self.logger.critical(
+ logging_text + "Exit Exception {}".format(e), exc_info=True
+ )
+ exc = e
+ finally:
+ if exc and db_k8scluster:
+ db_k8scluster_update["_admin.operationalState"] = "ERROR"
+ db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(
+ step, exc
+ )
+ operation_state = "FAILED"
+ operation_details = "ERROR {}: {}".format(step, exc)
+ try:
+ if db_k8scluster and db_k8scluster_update:
+ self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
+
+ # Register the operation and unlock
+ self.lcm_tasks.unlock_HA(
+ "k8scluster",
+ "create",
+ op_id,
+ operationState=operation_state,
+ detailed_status=operation_details,
+ )
+ except DbException as e:
+ self.logger.error(logging_text + "Cannot update database: {}".format(e))
+ self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
+
+ async def delete(self, k8scluster_content, order_id):
+
+ # HA tasks and backward compatibility:
+ # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
+ # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
+ # Register 'delete' task here for related future HA operations
+ op_id = k8scluster_content.pop("op_id", None)
+ if not self.lcm_tasks.lock_HA("k8scluster", "delete", op_id):
+ return
+
+ k8scluster_id = k8scluster_content["_id"]
+ logging_text = "Task k8scluster_delete={} ".format(k8scluster_id)
+ self.logger.debug(logging_text + "Enter")
+
+ db_k8scluster = None
+ db_k8scluster_update = {}
+ exc = None
+ try:
+ step = "Getting k8scluster='{}' from db".format(k8scluster_id)
+ self.logger.debug(logging_text + step)
+ db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id})
+ k8s_hc_id = deep_get(db_k8scluster, ("_admin", "helm-chart", "id"))
+ k8s_h3c_id = deep_get(db_k8scluster, ("_admin", "helm-chart-v3", "id"))
+ k8s_jb_id = deep_get(db_k8scluster, ("_admin", "juju-bundle", "id"))
+
+ cluster_removed = True
+ if k8s_jb_id: # delete in reverse order of creation
+ step = "Removing juju-bundle '{}'".format(k8s_jb_id)
+ uninstall_sw = (
+ deep_get(db_k8scluster, ("_admin", "juju-bundle", "created"))
+ or False
+ )
+ cluster_removed = await self.juju_k8scluster.reset(
+ cluster_uuid=k8s_jb_id,
+ uninstall_sw=uninstall_sw,
+ vca_id=db_k8scluster.get("vca_id"),
+ )
+ db_k8scluster_update["_admin.juju-bundle.id"] = None
+ db_k8scluster_update["_admin.juju-bundle.operationalState"] = "DISABLED"
+
+ if k8s_hc_id:
+ step = "Removing helm-chart '{}'".format(k8s_hc_id)
+ uninstall_sw = (
+ deep_get(db_k8scluster, ("_admin", "helm-chart", "created"))
+ or False
+ )
+ cluster_removed = await self.helm2_k8scluster.reset(
+ cluster_uuid=k8s_hc_id, uninstall_sw=uninstall_sw
+ )
+ db_k8scluster_update["_admin.helm-chart.id"] = None
+ db_k8scluster_update["_admin.helm-chart.operationalState"] = "DISABLED"
+
+ if k8s_h3c_id:
+ step = "Removing helm-chart-v3 '{}'".format(k8s_hc_id)
+ uninstall_sw = (
+ deep_get(db_k8scluster, ("_admin", "helm-chart-v3", "created"))
+ or False
+ )
+ cluster_removed = await self.helm3_k8scluster.reset(
+ cluster_uuid=k8s_h3c_id, uninstall_sw=uninstall_sw
+ )
+ db_k8scluster_update["_admin.helm-chart-v3.id"] = None
+ db_k8scluster_update[
+ "_admin.helm-chart-v3.operationalState"
+ ] = "DISABLED"
+
+ # Try to remove from cluster_inserted to clean old versions
+ if k8s_hc_id and cluster_removed:
+ step = "Removing k8scluster='{}' from k8srepos".format(k8scluster_id)
+ self.logger.debug(logging_text + step)
+ db_k8srepo_list = self.db.get_list(
+ "k8srepos", {"_admin.cluster-inserted": k8s_hc_id}
+ )
+ for k8srepo in db_k8srepo_list:
+ try:
+ cluster_list = k8srepo["_admin"]["cluster-inserted"]
+ cluster_list.remove(k8s_hc_id)
+ self.update_db_2(
+ "k8srepos",
+ k8srepo["_id"],
+ {"_admin.cluster-inserted": cluster_list},
+ )
+ except Exception as e:
+ self.logger.error("{}: {}".format(step, e))
+ self.db.del_one("k8sclusters", {"_id": k8scluster_id})
+ db_k8scluster_update = None
+ self.logger.debug(logging_text + "Done")
+
+ except Exception as e:
+ if isinstance(
+ e,
+ (
+ LcmException,
+ DbException,
+ K8sException,
+ N2VCException,
+ asyncio.CancelledError,
+ ),
+ ):
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ else:
+ self.logger.critical(
+ logging_text + "Exit Exception {}".format(e), exc_info=True
+ )
+ exc = e
+ finally:
+ if exc and db_k8scluster:
+ db_k8scluster_update["_admin.operationalState"] = "ERROR"
+ db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(
+ step, exc
+ )
+ # Mark the WIM 'create' HA task as erroneous
+ operation_state = "FAILED"
+ operation_details = "ERROR {}: {}".format(step, exc)
+ else:
+ operation_state = "COMPLETED"
+ operation_details = "deleted"
+
+ try:
+ if db_k8scluster_update:
+ self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
+ # Register the K8scluster 'delete' HA task either
+ # succesful or erroneous, or do nothing (if legacy NBI)
+ self.lcm_tasks.unlock_HA(
+ "k8scluster",
+ "delete",
+ op_id,
+ operationState=operation_state,
+ detailed_status=operation_details,
+ )
+ except DbException as e:
+ self.logger.error(logging_text + "Cannot update database: {}".format(e))
+ self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
+
+
+class VcaLcm(LcmBase):
+ timeout_create = 30
+
+ def __init__(self, msg, lcm_tasks, config, loop):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger("lcm.vca")
+ self.loop = loop
+ self.lcm_tasks = lcm_tasks
+
+ super().__init__(msg, self.logger)
+
+ # create N2VC connector
+ self.n2vc = N2VCJujuConnector(
+ log=self.logger, loop=self.loop, fs=self.fs, db=self.db
+ )
+
+ def _get_vca_by_id(self, vca_id: str) -> dict:
+ db_vca = self.db.get_one("vca", {"_id": vca_id})
+ self.db.encrypt_decrypt_fields(
+ db_vca,
+ "decrypt",
+ ["secret", "cacert"],
+ schema_version=db_vca["schema_version"],
+ salt=db_vca["_id"],
+ )
+ return db_vca
+
+ async def create(self, vca_content, order_id):
+ op_id = vca_content.pop("op_id", None)
+ if not self.lcm_tasks.lock_HA("vca", "create", op_id):
+ return
+
+ vca_id = vca_content["_id"]
+ self.logger.debug("Task vca_create={} {}".format(vca_id, "Enter"))
+
+ db_vca = None
+ db_vca_update = {}
+
+ try:
+ self.logger.debug(
+ "Task vca_create={} {}".format(vca_id, "Getting vca from db")
+ )
+ db_vca = self._get_vca_by_id(vca_id)
+
+ task = asyncio.ensure_future(
+ asyncio.wait_for(
+ self.n2vc.validate_vca(db_vca["_id"]),
+ timeout=self.timeout_create,
+ )
+ )
+
+ await asyncio.wait([task], return_when=asyncio.FIRST_COMPLETED)
+ if task.exception():
+ raise task.exception()
+ self.logger.debug(
+ "Task vca_create={} {}".format(
+ vca_id, "vca registered and validated successfully"
+ )
+ )
+ db_vca_update["_admin.operationalState"] = "ENABLED"
+ db_vca_update["_admin.detailed-status"] = "Connectivity: ok"
+ operation_details = "VCA validated"
+ operation_state = "COMPLETED"
+
+ self.logger.debug(
+ "Task vca_create={} {}".format(
+ vca_id, "Done. Result: {}".format(operation_state)
+ )
+ )
+
+ except Exception as e:
+ error_msg = "Failed with exception: {}".format(e)
+ self.logger.error("Task vca_create={} {}".format(vca_id, error_msg))
+ db_vca_update["_admin.operationalState"] = "ERROR"
+ db_vca_update["_admin.detailed-status"] = error_msg
+ operation_state = "FAILED"
+ operation_details = error_msg
+ finally:
+ try:
+ self.update_db_2("vca", vca_id, db_vca_update)
+
+ # Register the operation and unlock
+ self.lcm_tasks.unlock_HA(
+ "vca",
+ "create",
+ op_id,
+ operationState=operation_state,
+ detailed_status=operation_details,
+ )
+ except DbException as e:
+ self.logger.error(
+ "Task vca_create={} {}".format(
+ vca_id, "Cannot update database: {}".format(e)
+ )
+ )
+ self.lcm_tasks.remove("vca", vca_id, order_id)
+
+ async def delete(self, vca_content, order_id):
+
+ # HA tasks and backward compatibility:
+ # If "vim_content" does not include "op_id", we a running a legacy NBI version.
+ # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
+ # Register "delete" task here for related future HA operations
+ op_id = vca_content.pop("op_id", None)
+ if not self.lcm_tasks.lock_HA("vca", "delete", op_id):
+ return
+
+ db_vca_update = {}
+ vca_id = vca_content["_id"]
+
+ try:
+ self.logger.debug(
+ "Task vca_delete={} {}".format(vca_id, "Deleting vca from db")
+ )
+ self.db.del_one("vca", {"_id": vca_id})
+ db_vca_update = None
+ operation_details = "deleted"
+ operation_state = "COMPLETED"
+
+ self.logger.debug(
+ "Task vca_delete={} {}".format(
+ vca_id, "Done. Result: {}".format(operation_state)
+ )
+ )
+ except Exception as e:
+ error_msg = "Failed with exception: {}".format(e)
+ self.logger.error("Task vca_delete={} {}".format(vca_id, error_msg))
+ db_vca_update["_admin.operationalState"] = "ERROR"
+ db_vca_update["_admin.detailed-status"] = error_msg
+ operation_state = "FAILED"
+ operation_details = error_msg
+ finally:
+ try:
+ self.update_db_2("vca", vca_id, db_vca_update)
+ self.lcm_tasks.unlock_HA(
+ "vca",
+ "delete",
+ op_id,
+ operationState=operation_state,
+ detailed_status=operation_details,
+ )
+ except DbException as e:
+ self.logger.error(
+ "Task vca_delete={} {}".format(
+ vca_id, "Cannot update database: {}".format(e)
+ )
+ )
+ self.lcm_tasks.remove("vca", vca_id, order_id)
+
+
+class K8sRepoLcm(LcmBase):
+ def __init__(self, msg, lcm_tasks, config, loop):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger("lcm.k8srepo")
+ self.loop = loop
+ self.lcm_tasks = lcm_tasks
+ self.vca_config = config["VCA"]
+
+ super().__init__(msg, self.logger)
+
+ self.k8srepo = K8sHelmConnector(
+ kubectl_command=self.vca_config.get("kubectlpath"),
+ helm_command=self.vca_config.get("helmpath"),
+ fs=self.fs,
+ log=self.logger,
+ db=self.db,
+ on_update_db=None,
+ )
+
+ async def create(self, k8srepo_content, order_id):
+
+ # HA tasks and backward compatibility:
+ # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
+ # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
+ # Register 'create' task here for related future HA operations
+
+ op_id = k8srepo_content.pop("op_id", None)
+ if not self.lcm_tasks.lock_HA("k8srepo", "create", op_id):
+ return
+
+ k8srepo_id = k8srepo_content.get("_id")
+ logging_text = "Task k8srepo_create={} ".format(k8srepo_id)
+ self.logger.debug(logging_text + "Enter")
+
+ db_k8srepo = None
+ db_k8srepo_update = {}
+ exc = None
+ operation_state = "COMPLETED"
+ operation_details = ""
+ try:
+ step = "Getting k8srepo-id='{}' from db".format(k8srepo_id)
+ self.logger.debug(logging_text + step)
+ db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id})
+ db_k8srepo_update["_admin.operationalState"] = "ENABLED"
+ except Exception as e:
+ self.logger.error(
+ logging_text + "Exit Exception {}".format(e),
+ exc_info=not isinstance(
+ e,
+ (
+ LcmException,
+ DbException,
+ K8sException,
+ N2VCException,
+ asyncio.CancelledError,
+ ),
+ ),
+ )
+ exc = e
+ finally:
+ if exc and db_k8srepo:
+ db_k8srepo_update["_admin.operationalState"] = "ERROR"
+ db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(
+ step, exc
+ )
+ # Mark the WIM 'create' HA task as erroneous
+ operation_state = "FAILED"
+ operation_details = "ERROR {}: {}".format(step, exc)
+ try:
+ if db_k8srepo_update:
+ self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update)
+ # Register the K8srepo 'create' HA task either
+ # succesful or erroneous, or do nothing (if legacy NBI)
+ self.lcm_tasks.unlock_HA(
+ "k8srepo",
+ "create",
+ op_id,
+ operationState=operation_state,
+ detailed_status=operation_details,
+ )
+ except DbException as e:
+ self.logger.error(logging_text + "Cannot update database: {}".format(e))
+ self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)
+
+ async def delete(self, k8srepo_content, order_id):
+
+ # HA tasks and backward compatibility:
+ # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
+ # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
+ # Register 'delete' task here for related future HA operations
+ op_id = k8srepo_content.pop("op_id", None)
+ if not self.lcm_tasks.lock_HA("k8srepo", "delete", op_id):
+ return
+
+ k8srepo_id = k8srepo_content.get("_id")
+ logging_text = "Task k8srepo_delete={} ".format(k8srepo_id)
+ self.logger.debug(logging_text + "Enter")
+
+ db_k8srepo = None
+ db_k8srepo_update = {}
+
+ exc = None
+ operation_state = "COMPLETED"
+ operation_details = ""
+ try:
+ step = "Getting k8srepo-id='{}' from db".format(k8srepo_id)
+ self.logger.debug(logging_text + step)
+ db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id})
+
+ except Exception as e:
+ self.logger.error(
+ logging_text + "Exit Exception {}".format(e),
+ exc_info=not isinstance(
+ e,
+ (
+ LcmException,
+ DbException,
+ K8sException,
+ N2VCException,
+ asyncio.CancelledError,
+ ),
+ ),
+ )
+ exc = e
+ finally:
+ if exc and db_k8srepo:
+ db_k8srepo_update["_admin.operationalState"] = "ERROR"
+ db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(
+ step, exc
+ )
+ # Mark the WIM 'create' HA task as erroneous
+ operation_state = "FAILED"
+ operation_details = "ERROR {}: {}".format(step, exc)
+ try:
+ if db_k8srepo_update:
+ self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update)
+ # Register the K8srepo 'delete' HA task either
+ # succesful or erroneous, or do nothing (if legacy NBI)
+ self.lcm_tasks.unlock_HA(
+ "k8srepo",
+ "delete",
+ op_id,
+ operationState=operation_state,
+ detailed_status=operation_details,
+ )
+ self.db.del_one("k8srepos", {"_id": k8srepo_id})
+ except DbException as e:
+ self.logger.error(logging_text + "Cannot update database: {}".format(e))
+ self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)