Features 11020,11022-11026: Advanced cluster management

Change-Id: I348e6149326e8ba7c2c79ea7ff2ea2223ed047ca
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py
index 6db88eb..77c3d82 100644
--- a/osm_lcm/lcm.py
+++ b/osm_lcm/lcm.py
@@ -46,6 +46,7 @@
 from osm_lcm.data_utils.database.database import Database
 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
 from osm_lcm.data_utils.lcm_config import LcmCfg
+from osm_lcm.data_utils.list_utils import find_in_list
 from osm_lcm.lcm_hc import get_health_check_file
 from os import path, getenv
 from n2vc import version as n2vc_version
@@ -63,6 +64,13 @@
 
 
 class Lcm:
+    profile_collection_mapping = {
+        "infra_controller_profiles": "k8sinfra_controller",
+        "infra_config_profiles": "k8sinfra_config",
+        "resource_profiles": "k8sresource",
+        "app_profiles": "k8sapp",
+    }
+
     ping_interval_pace = (
         120  # how many time ping is send once is confirmed all is running
     )
@@ -315,6 +323,13 @@
                 wait_time = 2 if not first_start else 5
                 await asyncio.sleep(wait_time)
 
+    def get_operation_params(self, item, operation_id):
+        operation_history = item.get("operationHistory", [])
+        operation = find_in_list(
+            operation_history, lambda op: op["op_id"] == operation_id
+        )
+        return operation.get("operationParams", {})
+
     async def kafka_read_callback(self, topic, command, params):
         order_id = 1
         self.logger.info(
@@ -628,56 +643,56 @@
                 return
         elif topic == "vim_account":
             vim_id = params["_id"]
-            op_id = params["_id"]
+            op_id = vim_id
+            op_params = params
             db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
-            vim_config = db_vim["config"]
+            vim_config = db_vim.get("config", {})
+            self.logger.debug("Db Vim: {}".format(db_vim))
             if command in ("create", "created"):
-                self.logger.info("Command : {}".format(command))
+                self.logger.debug("Main config: {}".format(self.main_config.to_dict()))
+                if "credentials" in vim_config:
+                    self.logger.info("Vim add cloud credentials")
+                    task = asyncio.ensure_future(
+                        self.cloud_credentials.add(op_id, op_params, db_vim)
+                    )
+                    self.lcm_tasks.register(
+                        "vim_account", vim_id, op_id, "cloud_credentials_add", task
+                    )
                 if not self.main_config.RO.ng:
-                    self.logger.info("Vim create")
+                    self.logger.info("Calling RO to create VIM (no NG-RO)")
                     task = asyncio.ensure_future(self.vim.create(params, order_id))
                     self.lcm_tasks.register(
                         "vim_account", vim_id, order_id, "vim_create", task
                     )
-                    return
-                elif "credentials" in vim_config.keys():
-                    task = asyncio.ensure_future(
-                        self.cloud_credentials.add(params, order_id)
-                    )
-                    self.lcm_tasks.register(
-                        "vim-account", vim_id, op_id, "cloud_credentials_add", task
-                    )
-                    return
+                return
             elif command == "delete" or command == "deleted":
-                if "credentials" in vim_config.keys():
+                self.lcm_tasks.cancel(topic, vim_id)
+                if "credentials" in vim_config:
+                    self.logger.info("Vim remove cloud credentials")
                     task = asyncio.ensure_future(
-                        self.cloud_credentials.remove(params, order_id)
+                        self.cloud_credentials.remove(op_id, op_params, db_vim)
                     )
                     self.lcm_tasks.register(
-                        "vim-account", vim_id, op_id, "cloud_credentials_remove", task
+                        "vim_account", vim_id, op_id, "cloud_credentials_remove", task
                     )
-                    return
-                else:
-                    self.lcm_tasks.cancel(topic, vim_id)
-                    task = asyncio.ensure_future(self.vim.delete(params, order_id))
-                    self.lcm_tasks.register(
-                        "vim_account", vim_id, order_id, "vim_delete", task
-                    )
-                    return
+                task = asyncio.ensure_future(self.vim.delete(params, order_id))
+                self.lcm_tasks.register(
+                    "vim_account", vim_id, order_id, "vim_delete", task
+                )
+                return
             elif command == "show":
                 print("not implemented show with vim_account")
                 sys.stdout.flush()
                 return
             elif command in ("edit", "edited"):
-                if "credentials" in vim_config.keys():
-                    self.logger.info("Vim Edit")
+                if "credentials" in vim_config:
+                    self.logger.info("Vim update cloud credentials")
                     task = asyncio.ensure_future(
-                        self.cloud_credentials.edit(params, order_id)
+                        self.cloud_credentials.edit(op_id, op_params, db_vim)
                     )
                     self.lcm_tasks.register(
                         "vim_account", vim_id, op_id, "cloud_credentials_update", task
                     )
-                    return
                 if not self.main_config.RO.ng:
                     task = asyncio.ensure_future(self.vim.edit(params, order_id))
                     self.lcm_tasks.register(
@@ -735,213 +750,322 @@
             elif command == "deleted":
                 return  # TODO cleaning of task just in case should be done
         elif topic == "cluster":
-            cluster_id = params["_id"]
+            if command != "get_creds":
+                op_id = params["operation_id"]
+                cluster_id = params["cluster_id"]
+                db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+                op_params = self.get_operation_params(db_cluster, op_id)
+                db_content = {
+                    "cluster": db_cluster,
+                }
             if command == "create" or command == "created":
                 self.logger.debug("cluster_id = {}".format(cluster_id))
-                task = asyncio.ensure_future(self.cluster.create(params, order_id))
+                # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]})
+                db_vim = self.db.get_one(
+                    "vim_accounts", {"name": db_cluster["vim_account"]}
+                )
+                db_content["vim_account"] = db_vim
+                task = asyncio.ensure_future(
+                    self.cluster.create(op_id, op_params, db_content)
+                )
                 self.lcm_tasks.register(
-                    "cluster", cluster_id, order_id, "cluster_create", task
+                    "cluster", cluster_id, op_id, "cluster_create", task
                 )
                 return
             elif command == "delete" or command == "deleted":
-                task = asyncio.ensure_future(self.cluster.delete(params, order_id))
+                task = asyncio.ensure_future(
+                    self.cluster.delete(op_id, op_params, db_content)
+                )
                 self.lcm_tasks.register(
-                    "cluster", cluster_id, order_id, "cluster_delete", task
+                    "cluster", cluster_id, op_id, "cluster_delete", task
                 )
                 return
             elif command == "add" or command == "added":
-                add_id = params.get("_id")
-                task = asyncio.ensure_future(self.cluster.add(params, order_id))
+                profile_type = params["profile_type"]
+                profile_collection = self.profile_collection_mapping[profile_type]
+                db_profile = self.db.get_one(
+                    profile_collection, {"_id": params["profile_id"]}
+                )
+                db_profile["profile_type"] = profile_type
+                db_content["profile"] = db_profile
+                task = asyncio.ensure_future(
+                    self.cluster.attach_profile(op_id, op_params, db_content)
+                )
                 self.lcm_tasks.register(
-                    "cluster", add_id, order_id, "profile_add", task
+                    "cluster", cluster_id, op_id, "profile_add", task
                 )
                 return
             elif command == "remove" or command == "removed":
-                remove_id = params.get("_id")
-                task = asyncio.ensure_future(self.cluster.remove(params, order_id))
+                profile_type = params["profile_type"]
+                profile_collection = self.profile_collection_mapping[profile_type]
+                db_profile = self.db.get_one(
+                    profile_collection, {"_id": params["profile_id"]}
+                )
+                db_profile["profile_type"] = profile_type
+                db_content["profile"] = db_profile
+                task = asyncio.ensure_future(
+                    self.cluster.detach_profile(op_id, op_params, db_content)
+                )
                 self.lcm_tasks.register(
-                    "cluster", remove_id, order_id, "profile_remove", task
+                    "cluster", cluster_id, op_id, "profile_remove", task
                 )
                 return
             elif command == "register" or command == "registered":
-                cluster_id = params.get("_id")
-                task = asyncio.ensure_future(self.cluster.register(params, order_id))
+                task = asyncio.ensure_future(
+                    self.cluster.register(op_id, op_params, db_content)
+                )
                 self.lcm_tasks.register(
-                    "cluster", cluster_id, order_id, "cluster_register", task
+                    "cluster", cluster_id, op_id, "cluster_register", task
                 )
                 return
             elif command == "deregister" or command == "deregistered":
-                cluster_id = params.get("_id")
-                task = asyncio.ensure_future(self.cluster.deregister(params, order_id))
+                task = asyncio.ensure_future(
+                    self.cluster.deregister(op_id, op_params, db_content)
+                )
                 self.lcm_tasks.register(
-                    "cluster", cluster_id, order_id, "cluster_deregister", task
+                    "cluster", cluster_id, op_id, "cluster_deregister", task
                 )
                 return
             elif command == "get_creds":
-                task = asyncio.ensure_future(self.cluster.get_creds(params, order_id))
-                # self.logger.info("task: {}".format(task))
+                cluster_id = params["_id"]
+                db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+                task = asyncio.ensure_future(self.cluster.get_creds(db_cluster))
                 self.lcm_tasks.register(
-                    "k8sclus", cluster_id, order_id, "k8sclus_get_creds", task
+                    "cluster", cluster_id, cluster_id, "cluster_get_credentials", task
                 )
                 return
             elif command == "upgrade" or command == "scale":
-                task = asyncio.ensure_future(self.cluster.update(params, order_id))
-                # self.logger.info("task: {}".format(task))
-                if command == "upgrade":
-                    self.lcm_tasks.register(
-                        "k8sclus", cluster_id, order_id, "k8sclus_upgrade", task
-                    )
+                # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]})
+                db_vim = self.db.get_one(
+                    "vim_accounts", {"name": db_cluster["vim_account"]}
+                )
+                db_content["vim_account"] = db_vim
+                task = asyncio.ensure_future(
+                    self.cluster.update(op_id, op_params, db_content)
+                )
+                self.lcm_tasks.register(
+                    "cluster", cluster_id, op_id, "cluster_update", task
+                )
                 return
         elif topic == "k8s_app":
+            op_id = params["operation_id"]
+            profile_id = params["profile_id"]
+            db_profile = self.db.get_one("k8sapp", {"_id": profile_id})
+            db_profile["profile_type"] = "applications"
+            op_params = self.get_operation_params(db_profile, op_id)
             if command == "profile_create" or command == "profile_created":
-                k8s_app_id = params.get("_id")
-                self.logger.debug("k8s_app_id = {}".format(k8s_app_id))
-                task = asyncio.ensure_future(self.k8s_app.create(params, order_id))
+                self.logger.debug("Create k8s_app_id = {}".format(profile_id))
+                task = asyncio.ensure_future(
+                    self.k8s_app.create(op_id, op_params, db_profile)
+                )
                 self.lcm_tasks.register(
-                    "k8s_app", k8s_app_id, order_id, "k8s_app_create", task
+                    "k8s_app", profile_id, op_id, "k8s_app_create", task
                 )
                 return
             elif command == "delete" or command == "deleted":
-                k8s_app_id = params.get("_id")
-                task = asyncio.ensure_future(self.k8s_app.delete(params, order_id))
+                self.logger.debug("Delete k8s_app_id = {}".format(profile_id))
+                task = asyncio.ensure_future(
+                    self.k8s_app.delete(op_id, op_params, db_profile)
+                )
                 self.lcm_tasks.register(
-                    "k8s_app", k8s_app_id, order_id, "k8s_app_delete", task
+                    "k8s_app", profile_id, op_id, "k8s_app_delete", task
                 )
                 return
         elif topic == "k8s_resource":
+            op_id = params["operation_id"]
+            profile_id = params["profile_id"]
+            db_profile = self.db.get_one("k8sresource", {"_id": profile_id})
+            db_profile["profile_type"] = "managed-resources"
+            op_params = self.get_operation_params(db_profile, op_id)
             if command == "profile_create" or command == "profile_created":
-                k8s_resource_id = params.get("_id")
-                self.logger.debug("k8s_resource_id = {}".format(k8s_resource_id))
-                task = asyncio.ensure_future(self.k8s_resource.create(params, order_id))
+                self.logger.debug("Create k8s_resource_id = {}".format(profile_id))
+                task = asyncio.ensure_future(
+                    self.k8s_resource.create(op_id, op_params, db_profile)
+                )
                 self.lcm_tasks.register(
                     "k8s_resource",
-                    k8s_resource_id,
-                    order_id,
+                    profile_id,
+                    op_id,
                     "k8s_resource_create",
                     task,
                 )
                 return
             elif command == "delete" or command == "deleted":
-                k8s_resource_id = params.get("_id")
-                task = asyncio.ensure_future(self.k8s_resource.delete(params, order_id))
+                self.logger.debug("Delete k8s_resource_id = {}".format(profile_id))
+                task = asyncio.ensure_future(
+                    self.k8s_resource.delete(op_id, op_params, db_profile)
+                )
                 self.lcm_tasks.register(
                     "k8s_resource",
-                    k8s_resource_id,
-                    order_id,
+                    profile_id,
+                    op_id,
                     "k8s_resource_delete",
                     task,
                 )
                 return
 
         elif topic == "k8s_infra_controller":
+            op_id = params["operation_id"]
+            profile_id = params["profile_id"]
+            db_profile = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
+            db_profile["profile_type"] = "infra-controllers"
+            op_params = self.get_operation_params(db_profile, op_id)
             if command == "profile_create" or command == "profile_created":
-                k8s_infra_controller_id = params.get("_id")
                 self.logger.debug(
-                    "k8s_infra_controller_id = {}".format(k8s_infra_controller_id)
+                    "Create k8s_infra_controller_id = {}".format(profile_id)
                 )
                 task = asyncio.ensure_future(
-                    self.k8s_infra_controller.create(params, order_id)
+                    self.k8s_infra_controller.create(op_id, op_params, db_profile)
                 )
                 self.lcm_tasks.register(
                     "k8s_infra_controller",
-                    k8s_infra_controller_id,
-                    order_id,
+                    profile_id,
+                    op_id,
                     "k8s_infra_controller_create",
                     task,
                 )
                 return
             elif command == "delete" or command == "deleted":
-                k8s_infra_controller_id = params.get("_id")
+                self.logger.debug(
+                    "Delete k8s_infra_controller_id = {}".format(profile_id)
+                )
                 task = asyncio.ensure_future(
-                    self.k8s_infra_controller.delete(params, order_id)
+                    self.k8s_infra_controller.delete(op_id, op_params, db_profile)
                 )
                 self.lcm_tasks.register(
                     "k8s_infra_controller",
-                    k8s_infra_controller_id,
-                    order_id,
+                    profile_id,
+                    op_id,
                     "k8s_infra_controller_delete",
                     task,
                 )
                 return
 
         elif topic == "k8s_infra_config":
+            op_id = params["operation_id"]
+            profile_id = params["profile_id"]
+            db_profile = self.db.get_one("k8sinfra_config", {"_id": profile_id})
+            db_profile["profile_type"] = "infra-configs"
+            op_params = self.get_operation_params(db_profile, op_id)
             if command == "profile_create" or command == "profile_created":
-                k8s_infra_config_id = params.get("_id")
-                self.logger.debug(
-                    "k8s_infra_config_id = {}".format(k8s_infra_config_id)
-                )
+                self.logger.debug("Create k8s_infra_config_id = {}".format(profile_id))
                 task = asyncio.ensure_future(
-                    self.k8s_infra_config.create(params, order_id)
+                    self.k8s_infra_config.create(op_id, op_params, db_profile)
                 )
                 self.lcm_tasks.register(
                     "k8s_infra_config",
-                    k8s_infra_config_id,
-                    order_id,
+                    profile_id,
+                    op_id,
                     "k8s_infra_config_create",
                     task,
                 )
                 return
             elif command == "delete" or command == "deleted":
-                k8s_infra_config_id = params.get("_id")
+                self.logger.debug("Delete k8s_infra_config_id = {}".format(profile_id))
                 task = asyncio.ensure_future(
-                    self.k8s_infra_config.delete(params, order_id)
+                    self.k8s_infra_config.delete(op_id, op_params, db_profile)
                 )
                 self.lcm_tasks.register(
                     "k8s_infra_config",
-                    k8s_infra_config_id,
-                    order_id,
+                    profile_id,
+                    op_id,
                     "k8s_infra_config_delete",
                     task,
                 )
                 return
         elif topic == "oka":
-            # self.logger.info("Oka Elif")
-            oka_id = params["_id"]
-            # self.logger.info("Command: {}".format(command))
+            op_id = params["operation_id"]
+            oka_id = params["oka_id"]
+            db_oka = self.db.get_one("okas", {"_id": oka_id})
+            op_params = self.get_operation_params(db_oka, op_id)
             if command == "create":
-                task = asyncio.ensure_future(self.oka.create(params, order_id))
-                # self.logger.info("Task: {}".format(task))
-                self.lcm_tasks.register("oka", oka_id, order_id, "oka_create", task)
+                task = asyncio.ensure_future(self.oka.create(op_id, op_params, db_oka))
+                self.lcm_tasks.register("oka", oka_id, op_id, "oka_create", task)
                 return
             elif command == "edit":
-                task = asyncio.ensure_future(self.oka.edit(params, order_id))
-                # self.logger.info("Task: {}".format(task))
-                self.lcm_tasks.register("oka", oka_id, order_id, "oka_edit", task)
+                task = asyncio.ensure_future(self.oka.edit(op_id, op_params, db_oka))
+                self.lcm_tasks.register("oka", oka_id, op_id, "oka_edit", task)
                 return
             elif command == "delete":
-                task = asyncio.ensure_future(self.oka.delete(params, order_id))
-                # self.logger.info("Task: {}".format(task))
-                self.lcm_tasks.register("oka", oka_id, order_id, "oka_delete", task)
+                task = asyncio.ensure_future(self.oka.delete(op_id, op_params, db_oka))
+                self.lcm_tasks.register("oka", oka_id, op_id, "oka_delete", task)
                 return
         elif topic == "ksu":
-            # self.logger.info("Ksu Elif")
-            ksu_id = params["_id"]
+            op_id = params["operation_id"]
+            op_params = None
+            db_content = None
+            if not (command == "clone" or command == "move"):
+                # op_params is a list
+                # db_content is a list of KSU
+                db_content = []
+                op_params = []
+                for ksu_id in params["ksus_list"]:
+                    db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
+                    db_content.append(db_ksu)
+                    ksu_params = {}
+                    if command == "delete":
+                        ksu_params["profile"] = {}
+                        ksu_params["profile"]["profile_type"] = db_ksu["profile"][
+                            "profile_type"
+                        ]
+                        ksu_params["profile"]["_id"] = db_ksu["profile"]["_id"]
+                    else:
+                        ksu_params = self.get_operation_params(db_ksu, op_id)
+                    # Update ksu_params["profile"] with profile name and age-pubkey
+                    profile_type = ksu_params["profile"]["profile_type"]
+                    profile_id = ksu_params["profile"]["_id"]
+                    profile_collection = self.profile_collection_mapping[profile_type]
+                    db_profile = self.db.get_one(
+                        profile_collection, {"_id": profile_id}
+                    )
+                    ksu_params["profile"]["name"] = db_profile["name"]
+                    ksu_params["profile"]["age_pubkey"] = db_profile.get(
+                        "age_pubkey", ""
+                    )
+                    if command == "create" or command == "edit" or command == "edited":
+                        # Update ksu_params["oka"] with sw_catalog_path (when missing)
+                        for oka in ksu_params["oka"]:
+                            if "sw_catalog_path" not in oka:
+                                oka_id = oka["_id"]
+                                db_oka = self.db.get_one("okas", {"_id": oka_id})
+                                oka[
+                                    "sw_catalog_path"
+                                ] = f"infra-controllers/{db_oka['git_name']}"
+                    op_params.append(ksu_params)
+            else:
+                # db_content and op_params are single items
+                db_content = self.db.get_one("ksus", {"_id": params["_id"]})
+                db_content = db_ksu
+                op_params = self.get_operation_params(db_ksu, op_id)
             if command == "create":
-                task = asyncio.ensure_future(self.ksu.create(params, order_id))
-                # self.logger.info("task: {}".format(task))
-                self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_create", task)
+                task = asyncio.ensure_future(
+                    self.ksu.create(op_id, op_params, db_content)
+                )
+                self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_create", task)
                 return
             elif command == "edit" or command == "edited":
-                task = asyncio.ensure_future(self.ksu.edit(params, order_id))
-                # self.logger.info("Task: {}".format(task))
-                self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_edit", task)
+                task = asyncio.ensure_future(
+                    self.ksu.edit(op_id, op_params, db_content)
+                )
+                self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_edit", task)
                 return
             elif command == "delete":
-                task = asyncio.ensure_future(self.ksu.delete(params, order_id))
-                # self.logger.info("Task: {}".format(task))
-                self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_delete", task)
+                task = asyncio.ensure_future(
+                    self.ksu.delete(op_id, op_params, db_content)
+                )
+                self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_delete", task)
                 return
             elif command == "clone":
-                # self.logger.info("KSU clone")
-                task = asyncio.ensure_future(self.ksu.edit(params, order_id))
-                # self.logger.info("Task: {}".format(task))
-                self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_clone", task)
+                task = asyncio.ensure_future(
+                    self.ksu.clone(op_id, op_params, db_content)
+                )
+                self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_clone", task)
                 return
             elif command == "move":
-                # self.logger.info("KSU move")
-                task = asyncio.ensure_future(self.ksu.edit(params, order_id))
-                # self.logger.info("Task: {}".format(task))
-                self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_move", task)
+                task = asyncio.ensure_future(
+                    self.ksu.move(op_id, op_params, db_content)
+                )
+                self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_move", task)
                 return
 
         self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
@@ -983,11 +1107,6 @@
                         self.consecutive_errors, self.first_start
                     )
                 )
-                # self.logger.info(
-                #     "Consecutive errors: {} first start: {}".format(
-                #         self.consecutive_errors, self.first_start
-                #     )
-                # )
                 topics_admin = ("admin",)
                 await asyncio.gather(
                     self.msg.aioread(