- self.db.encrypt_decrypt_fields(db_k8scluster.get("credentials"), 'decrypt', ['password', 'secret'],
- schema_version=db_k8scluster["schema_version"], salt=db_k8scluster["_id"])
- # print(db_k8scluster.get("credentials"))
- # print("\n\n\n FIN CREDENTIALS")
- # print(yaml.safe_dump(db_k8scluster.get("credentials")))
- # print("\n\n\n FIN OUTPUT")
- k8s_hc_id, uninstall_sw = await self.k8scluster.init_env(yaml.safe_dump(db_k8scluster.get("credentials")))
- db_k8scluster_update["_admin.helm-chart.id"] = k8s_hc_id
- db_k8scluster_update["_admin.helm-chart.created"] = uninstall_sw
- step = "Getting the list of repos"
- self.logger.debug(logging_text + step)
- task_list = []
- db_k8srepo_list = self.db.get_list("k8srepos", {})
- for repo in db_k8srepo_list:
- step = "Adding repo {} to cluster: {}".format(repo["name"], k8s_hc_id)
- self.logger.debug(logging_text + step)
- task = asyncio.ensure_future(self.k8scluster.repo_add(cluster_uuid=k8s_hc_id,
- name=repo["name"], url=repo["url"],
- repo_type="chart"))
- task_list.append(task)
- if not repo["_admin"].get("cluster-inserted"):
- repo["_admin"]["cluster-inserted"] = []
- repo["_admin"]["cluster-inserted"].append(k8s_hc_id)
- self.update_db_2("k8srepos", repo["_id"], repo)
-
- done = None
- pending = None
- if len(task_list) > 0:
- self.logger.debug('Waiting for terminate pending tasks...')
- done, pending = await asyncio.wait(task_list, timeout=3600)
- if not pending:
- self.logger.debug('All tasks finished...')
- else:
- self.logger.info('There are pending tasks: {}'.format(pending))
- db_k8scluster_update["_admin.operationalState"] = "ENABLED"
+ self.db.encrypt_decrypt_fields(
+ db_k8scluster.get("credentials"),
+ "decrypt",
+ ["password", "secret"],
+ schema_version=db_k8scluster["schema_version"],
+ salt=db_k8scluster["_id"],
+ )
+ k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
+ pending_tasks = []
+ task2name = {}
+ init_target = deep_get(db_k8scluster, ("_admin", "init"))
+ step = "Launching k8scluster init tasks"
+
+ k8s_deploy_methods = db_k8scluster.get("deployment_methods", {})
+ # for backwards compatibility and all-false case
+ if not any(k8s_deploy_methods.values()):
+ k8s_deploy_methods = {"helm-chart": True, "juju-bundle": True, "helm-chart-v3": True}
+ deploy_methods = tuple(filter(k8s_deploy_methods.get, k8s_deploy_methods))
+
+ for task_name in deploy_methods:
+ if init_target and task_name not in init_target:
+ continue
+ task = asyncio.ensure_future(
+ self.k8s_map[task_name].init_env(
+ k8s_credentials,
+ reuse_cluster_uuid=k8scluster_id,
+ vca_id=db_k8scluster.get("vca_id"),
+ )
+ )
+ pending_tasks.append(task)
+ task2name[task] = task_name
+
+ error_text_list = []
+ tasks_name_ok = []
+ reached_timeout = False
+ now = time()
+
+ while pending_tasks:
+ _timeout = max(
+ 1, self.timeout_create - (time() - now)
+ ) # ensure not negative with max
+ step = "Waiting for k8scluster init tasks"
+ done, pending_tasks = await asyncio.wait(
+ pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
+ )
+ if not done:
+ # timeout. Set timeout is reached and process pending as if they hase been finished
+ done = pending_tasks
+ pending_tasks = None
+ reached_timeout = True
+ for task in done:
+ task_name = task2name[task]
+ if reached_timeout:
+ exc = "Timeout"
+ elif task.cancelled():
+ exc = "Cancelled"
+ else:
+ exc = task.exception()
+
+ if exc:
+ error_text_list.append(
+ "Failing init {}: {}".format(task_name, exc)
+ )
+ db_k8scluster_update[
+ "_admin.{}.error_msg".format(task_name)
+ ] = str(exc)
+ db_k8scluster_update["_admin.{}.id".format(task_name)] = None
+ db_k8scluster_update[
+ "_admin.{}.operationalState".format(task_name)
+ ] = "ERROR"
+ self.logger.error(
+ logging_text + "{} init fail: {}".format(task_name, exc),
+ exc_info=not isinstance(exc, (N2VCException, str)),
+ )
+ else:
+ k8s_id, uninstall_sw = task.result()
+ tasks_name_ok.append(task_name)
+ self.logger.debug(
+ logging_text
+ + "{} init success. id={} created={}".format(
+ task_name, k8s_id, uninstall_sw
+ )
+ )
+ db_k8scluster_update[
+ "_admin.{}.error_msg".format(task_name)
+ ] = None
+ db_k8scluster_update["_admin.{}.id".format(task_name)] = k8s_id
+ db_k8scluster_update[
+ "_admin.{}.created".format(task_name)
+ ] = uninstall_sw
+ db_k8scluster_update[
+ "_admin.{}.operationalState".format(task_name)
+ ] = "ENABLED"
+ # update database
+ step = "Updating database for " + task_name
+ self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
+ if tasks_name_ok:
+ operation_details = "ready for " + ", ".join(tasks_name_ok)
+ operation_state = "COMPLETED"
+ db_k8scluster_update["_admin.operationalState"] = (
+ "ENABLED" if not error_text_list else "DEGRADED"
+ )
+ operation_details += "; " + ";".join(error_text_list)
+ else:
+ db_k8scluster_update["_admin.operationalState"] = "ERROR"
+ operation_state = "FAILED"
+ operation_details = ";".join(error_text_list)
+ db_k8scluster_update["_admin.detailed-status"] = operation_details
+ self.logger.debug(logging_text + "Done. Result: " + operation_state)
+ exc = None