- print(db_k8scluster.get("credentials"))
- print("\n\n\n FIN CREDENTIALS")
- print(yaml.safe_dump(db_k8scluster.get("credentials")))
- print("\n\n\n FIN OUTPUT")
- cluster_uuid, uninstall_sw = await self.k8scluster.init_env(yaml.safe_dump(db_k8scluster.
- get("credentials")))
- db_k8scluster_update["cluster-uuid"] = cluster_uuid
- if uninstall_sw:
- db_k8scluster_update["uninstall-sw"] = uninstall_sw
- step = "Getting the list of repos"
- self.logger.debug(logging_text + step)
- task_list = []
- db_k8srepo_list = self.db.get_list("k8srepos", {})
- for repo in db_k8srepo_list:
- step = "Adding repo {} to cluster: {}".format(repo["name"], cluster_uuid)
- self.logger.debug(logging_text + step)
- task = asyncio.ensure_future(self.k8scluster.repo_add(cluster_uuid=cluster_uuid,
- name=repo["name"], url=repo["url"],
- repo_type="chart"))
- task_list.append(task)
- if not repo["_admin"].get("cluster-inserted"):
- repo["_admin"]["cluster-inserted"] = []
- repo["_admin"]["cluster-inserted"].append(cluster_uuid)
- self.update_db_2("k8srepos", repo["_id"], repo)
-
- done = None
- pending = None
- if len(task_list) > 0:
- self.logger.debug('Waiting for terminate pending tasks...')
- done, pending = await asyncio.wait(task_list, timeout=3600)
- if not pending:
- self.logger.debug('All tasks finished...')
- else:
- self.logger.info('There are pending tasks: {}'.format(pending))
- db_k8scluster_update["_admin.operationalState"] = "ENABLED"
+ k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
+ pending_tasks = []
+ task2name = {}
+ init_target = deep_get(db_k8scluster, ("_admin", "init"))
+ step = "Launching k8scluster init tasks"
+ for task_name in ("helm-chart", "juju-bundle", "helm-chart-v3"):
+ if init_target and task_name not in init_target:
+ continue
+ task = asyncio.ensure_future(self.k8s_map[task_name].init_env(k8s_credentials,
+ reuse_cluster_uuid=k8scluster_id))
+ pending_tasks.append(task)
+ task2name[task] = task_name
+
+ error_text_list = []
+ tasks_name_ok = []
+ reached_timeout = False
+ now = time()
+
+ while pending_tasks:
+ _timeout = max(1, self.timeout_create - (time() - now)) # ensure not negative with max
+ step = "Waiting for k8scluster init tasks"
+ done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
+ return_when=asyncio.FIRST_COMPLETED)
+ if not done:
+ # timeout. Set timeout is reached and process pending as if they hase been finished
+ done = pending_tasks
+ pending_tasks = None
+ reached_timeout = True
+ for task in done:
+ task_name = task2name[task]
+ if reached_timeout:
+ exc = "Timeout"
+ elif task.cancelled():
+ exc = "Cancelled"
+ else:
+ exc = task.exception()
+
+ if exc:
+ error_text_list.append("Failing init {}: {}".format(task_name, exc))
+ db_k8scluster_update["_admin.{}.error_msg".format(task_name)] = str(exc)
+ db_k8scluster_update["_admin.{}.id".format(task_name)] = None
+ db_k8scluster_update["_admin.{}.operationalState".format(task_name)] = "ERROR"
+ self.logger.error(logging_text + "{} init fail: {}".format(task_name, exc),
+ exc_info=not isinstance(exc, (N2VCException, str)))
+ else:
+ k8s_id, uninstall_sw = task.result()
+ tasks_name_ok.append(task_name)
+ self.logger.debug(logging_text + "{} init success. id={} created={}".format(
+ task_name, k8s_id, uninstall_sw))
+ db_k8scluster_update["_admin.{}.error_msg".format(task_name)] = None
+ db_k8scluster_update["_admin.{}.id".format(task_name)] = k8s_id
+ db_k8scluster_update["_admin.{}.created".format(task_name)] = uninstall_sw
+ db_k8scluster_update["_admin.{}.operationalState".format(task_name)] = "ENABLED"
+ # update database
+ step = "Updating database for " + task_name
+ self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
+ if tasks_name_ok:
+ operation_details = "ready for " + ", ".join(tasks_name_ok)
+ operation_state = "COMPLETED"
+ db_k8scluster_update["_admin.operationalState"] = "ENABLED" if not error_text_list else "DEGRADED"
+ operation_details += "; " + ";".join(error_text_list)
+ else:
+ db_k8scluster_update["_admin.operationalState"] = "ERROR"
+ operation_state = "FAILED"
+ operation_details = ";".join(error_text_list)
+ db_k8scluster_update["_admin.detailed-status"] = operation_details
+ self.logger.debug(logging_text + "Done. Result: " + operation_state)
+ exc = None