From 771dea8c0efc561290a5f332055520fcfa294e4b Mon Sep 17 00:00:00 2001 From: yshah Date: Fri, 5 Jul 2024 15:11:49 +0000 Subject: [PATCH] Feature 11022,11025: Advanced Cluster Management Change-Id: I3b0e3e86f6cce0f68330704ff50677ccd5d0ad22 Signed-off-by: yshah --- osm_lcm/k8s.py | 583 +++++++++++++++++++++++++++++++++++++++++++ osm_lcm/lcm.py | 133 ++++++++-- osm_lcm/lcm_utils.py | 6 + 3 files changed, 702 insertions(+), 20 deletions(-) diff --git a/osm_lcm/k8s.py b/osm_lcm/k8s.py index 4272cfd0..33f8a569 100644 --- a/osm_lcm/k8s.py +++ b/osm_lcm/k8s.py @@ -437,6 +437,163 @@ class ClusterLcm(LcmBase): self.db.del_one("clusters", {"_id": items["_id"]}) return + async def get_creds(self, content, order_id): + # self.logger.info("Cluster get creds Enter") + # self.logger.info("Content: {} order_id: {}".format(content, order_id)) + db_content = self.db.get_one(self.db_topic, content) + # self.logger.info("Content: {}".format(db_content)) + + odu_workflow = self.odu.launch_workflow("get_creds_cluster", db_content) + workflow_status, workflow_msg = self.odu.check_workflow_status(odu_workflow) + # self.logger.info( + # "Workflow Status: {} Workflow Message: {}".format( + # workflow_status, workflow_msg + # ) + # ) + return + + async def update(self, content, order_id): + # self.logger.info("Cluster update Enter") + _id = content["_id"] + # self.logger.info("Content: {} order_id: {}".format(content, order_id)) + # self.logger.info("Cluster ID: {}".format(_id)) + db_content = self.db.get_one(self.db_topic, {"_id": _id}) + # self.logger.info("Content: {}".format(db_content)) + + odu_workflow = self.odu.launch_workflow("update_cluster", db_content) + workflow_status, workflow_msg = self.odu.check_workflow_status(odu_workflow) + # self.logger.info( + # "Workflow Status: {} Workflow Message: {}".format( + # workflow_status, workflow_msg + # ) + # ) + + if workflow_status: + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history(db_content, workflow_status, None) + # self.logger.info("Db content: {}".format(db_content)) + self.db.set_one(self.db_topic, {"_id": _id}, db_content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "update_cluster", db_content + ) + # self.logger.info( + # "Resource Status: {} Resource Message: {}".format( + # resource_status, resource_msg + # ) + # ) + + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, workflow_status, resource_status + ) + + db_content["operatingState"] = "IDLE" + if db_content["operatingState"] == "IDLE": + # self.logger.info("Content: {}".format(db_content)) + if "k8s_version" in content: + db_content["k8s_version"] = content["k8s_version"] + elif "node_count" in content: + db_content["node_count"] = content["node_count"] + self.db.set_one(self.db_topic, {"_id": _id}, db_content) + return + + +class CloudCredentialsLcm(LcmBase): + db_collection = "vim_accounts" + + def __init__(self, msg, lcm_tasks, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + + self.logger = logging.getLogger("lcm.cloud_credentials") + self.lcm_tasks = lcm_tasks + self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) + + super().__init__(msg, self.logger) + + async def add(self, content, order_id): + self.logger.info("Cloud Credentials create") + workflow_name = await self.odu.launch_workflow( + "create_cloud_credentials", content + ) + + workflow_status, workflow_msg = await self.odu.check_workflow_status( + workflow_name + ) + + self.logger.info( + "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg) + ) + + if workflow_status: + resource_status, resource_msg = await self.odu.check_resource_status( + "create_cloud_credentials", content + ) + self.logger.info( + "Resource Status: {} Resource Message: {}".format( + resource_status, resource_msg + ) + ) + return + + async def edit(self, content, order_id): + workflow_name = await self.odu.launch_workflow( + "update_cloud_credentials", content + ) + workflow_status, workflow_msg = await self.odu.check_workflow_status( + workflow_name + ) + self.logger.info( + "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg) + ) + + if workflow_status: + resource_status, resource_msg = await self.odu.check_resource_status( + "update_cloud_credentials", content + ) + self.logger.info( + "Resource Status: {} Resource Message: {}".format( + resource_status, resource_msg + ) + ) + return + + async def remove(self, content, order_id): + self.logger.info("Cloud Credentials delete") + workflow_name = await self.odu.launch_workflow( + "delete_cloud_credentials", content + ) + workflow_status, workflow_msg = await self.odu.check_workflow_status( + workflow_name + ) + self.logger.info( + "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg) + ) + + if workflow_status: + resource_status, resource_msg = await self.odu.check_resource_status( + "delete_cloud_credentials", content + ) + self.logger.info( + "Resource Status: {} Resource Message: {}".format( + resource_status, resource_msg + ) + ) + self.db.del_one(self.db_collection, {"_id": content["_id"]}) + return + class K8sAppLcm(LcmBase): def __init__(self, msg, lcm_tasks, config): @@ -855,3 +1012,429 @@ class K8sInfraConfigLcm(LcmBase): if items["state"] == "DELETED": self.db.del_one("k8sinfra_config", {"_id": content["_id"]}) return + + +class OkaLcm(LcmBase): + db_collection = "okas" + + def __init__(self, msg, lcm_tasks, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + + self.logger = logging.getLogger("lcm.oka") + self.lcm_tasks = lcm_tasks + self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) + + super().__init__(msg, self.logger) + + async def create(self, content, order_id): + # self.logger.info("OKA Create Enter") + # self.logger.info("Content: {}".format(content)) + + db_content = self.db.get_one(self.db_collection, content) + # self.logger.info("Content: {}".format(db_content)) + + odu_workflow = self.odu.launch_workflow("create_oka", db_content) + # self.logger.info("ODU workflow: {}".format(odu_workflow)) + workflow_status, workflow_msg = self.odu.check_workflow_status(odu_workflow) + # self.logger.info( + # "Workflow Status: {} Workflow Message: {}".format( + # workflow_status, workflow_msg + # ) + # ) + + if workflow_status: + db_content["state"] = "CREATED" + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["state"] = "FAILED_CREATION" + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history(db_content, workflow_status, None) + # self.logger.info("Db content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "create_oka", db_content + ) + # self.logger.info( + # "Resource Status: {} Resource Message: {}".format( + # resource_status, resource_msg + # ) + # ) + + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + # self.logger.info("Db content: {}".format(db_content)) + db_content = self.update_operation_history( + db_content, workflow_status, resource_status + ) + + db_content["operatingState"] = "IDLE" + # self.logger.info("Content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + return + + async def edit(self, content, order_id): + # self.logger.info("OKA Edit Enter") + db_content = self.db.get_one(self.db_collection, content) + # self.logger.info("Content: {}".format(db_content)) + + odu_workflow = self.odu.launch_workflow("update_oka", db_content) + workflow_status, workflow_msg = self.odu.check_workflow_status(odu_workflow) + # self.logger.info( + # "Workflow Status: {} Workflow Message: {}".format( + # workflow_status, workflow_msg + # ) + # ) + + if workflow_status: + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history(db_content, workflow_status, None) + # self.logger.info("Db content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "update_oka", db_content + ) + # self.logger.info( + # "Resource Status: {} Resource Message: {}".format( + # resource_status, resource_msg + # ) + # ) + + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, workflow_status, resource_status + ) + + db_content["operatingState"] = "IDLE" + # self.logger.info("Content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + return + + async def delete(self, content, order_id): + # self.logger.info("OKA delete Enter") + + db_content = self.db.get_one(self.db_collection, content) + # self.logger.info("Content: {}".format(db_content)) + + odu_workflow = self.odu.launch_workflow("delete_oka", db_content) + workflow_status, workflow_msg = self.odu.check_workflow_status(odu_workflow) + # self.logger.info( + # "Workflow Status: {} Workflow Message: {}".format( + # workflow_status, workflow_msg + # ) + # ) + + if workflow_status: + db_content["state"] = "DELETED" + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["state"] = "FAILED_DELETION" + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history(db_content, workflow_status, None) + # self.logger.info("Db content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "delete_oka", db_content + ) + # self.logger.info( + # "Resource Status: {} Resource Message: {}".format( + # resource_status, resource_msg + # ) + # ) + + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, workflow_status, resource_status + ) + + db_content["operatingState"] = "IDLE" + # self.logger.info("Content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + if db_content["state"] == "DELETED": + self.db.del_one(self.db_collection, content) + return + + +class KsuLcm(LcmBase): + db_collection = "ksus" + + def __init__(self, msg, lcm_tasks, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + + self.logger = logging.getLogger("lcm.ksu") + self.lcm_tasks = lcm_tasks + self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) + + super().__init__(msg, self.logger) + + async def create(self, content, order_id): + # self.logger.info("ksu Create Enter") + # self.logger.info("Content: {}".format(content)) + + db_content = self.db.get_one(self.db_collection, content) + # self.logger.info("Content: {}".format(db_content)) + + odu_workflow = self.odu.launch_workflow("create_ksus", db_content) + workflow_status, workflow_msg = self.odu.check_workflow_status(odu_workflow) + # self.logger.info( + # "Workflow Status: {} Workflow Message: {}".format( + # workflow_status, workflow_msg + # ) + # ) + + if workflow_status: + db_content["state"] = "CREATED" + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["state"] = "FAILED_CREATION" + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history(db_content, workflow_status, None) + # self.logger.info("Db content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "create_ksus", db_content + ) + # self.logger.info( + # "Resource Status: {} Resource Message: {}".format( + # resource_status, resource_msg + # ) + # ) + + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, workflow_status, resource_status + ) + + db_content["operatingState"] = "IDLE" + # self.logger.info("Content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + return + + async def edit(self, content, order_id): + # self.logger.info("ksu edit Enter") + + db_content = self.db.get_one(self.db_collection, {"_id": content["_id"]}) + # self.logger.info("Content: {}".format(db_content)) + + odu_workflow = self.odu.launch_workflow("update_ksus", db_content) + workflow_status, workflow_msg = self.odu.check_workflow_status(odu_workflow) + # self.logger.info( + # "Workflow Status: {} Workflow Message: {}".format( + # workflow_status, workflow_msg + # ) + # ) + + if workflow_status: + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history(db_content, workflow_status, None) + # self.logger.info("Db content: {}".format(db_content)) + self.db.set_one(self.db_collection, {"_id": content["_id"]}, db_content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "update_ksus", db_content + ) + # self.logger.info( + # "Resource Status: {} Resource Message: {}".format( + # resource_status, resource_msg + # ) + # ) + + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, workflow_status, resource_status + ) + + db_content["operatingState"] = "IDLE" + # self.logger.info("Content: {}".format(db_content)) + self.db.set_one(self.db_collection, {"_id": content["_id"]}, db_content) + return + + async def delete(self, content, order_id): + # self.logger.info("ksu delete Enter") + + db_content = self.db.get_one(self.db_collection, content) + # self.logger.info("Content: {}".format(db_content)) + + odu_workflow = self.odu.launch_workflow("delete_ksus", db_content) + workflow_status, workflow_msg = self.odu.check_workflow_status(odu_workflow) + # self.logger.info( + # "Workflow Status: {} Workflow Message: {}".format( + # workflow_status, workflow_msg + # ) + # ) + + if workflow_status: + db_content["state"] = "DELETED" + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["state"] = "FAILED_DELETION" + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history(db_content, workflow_status, None) + # self.logger.info("Db content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "delete_ksus", db_content + ) + # self.logger.info( + # "Resource Status: {} Resource Message: {}".format( + # resource_status, resource_msg + # ) + # ) + + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, workflow_status, resource_status + ) + + db_content["operatingState"] = "IDLE" + # self.logger.info("Content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + if db_content["state"] == "DELETED": + self.db.del_one(self.db_collection, content) + return + + async def clone(self, content, order_id): + # self.logger.info("ksu clone Enter") + db_content = self.db.get_one(self.db_collection, content) + # self.logger.info("Content: {}".format(db_content)) + + odu_workflow = self.odu.launch_workflow("clone_ksus", db_content) + workflow_status, workflow_msg = self.odu.check_workflow_status(odu_workflow) + # self.logger.info( + # "Workflow Status: {} Workflow Message: {}".format( + # workflow_status, workflow_msg + # ) + # ) + + if workflow_status: + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history(db_content, workflow_status, None) + # self.logger.info("Db content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "clone_ksus", db_content + ) + # self.logger.info( + # "Resource Status: {} Resource Message: {}".format( + # resource_status, resource_msg + # ) + # ) + + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, workflow_status, resource_status + ) + + db_content["operatingState"] = "IDLE" + # self.logger.info("Content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + return + + async def move(self, content, order_id): + # self.logger.info("ksu move Enter") + db_content = self.db.get_one(self.db_collection, content) + # self.logger.info("Content: {}".format(db_content)) + + odu_workflow = self.odu.launch_workflow("move_ksus", db_content) + workflow_status, workflow_msg = self.odu.check_workflow_status(odu_workflow) + # self.logger.info( + # "Workflow Status: {} Workflow Message: {}".format( + # workflow_status, workflow_msg + # ) + # ) + + if workflow_status: + db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history(db_content, workflow_status, None) + # self.logger.info("Db content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "move_ksus", db_content + ) + # self.logger.info( + # "Resource Status: {} Resource Message: {}".format( + # resource_status, resource_msg + # ) + # ) + if resource_status: + db_content["resourceState"] = "READY" + else: + db_content["resourceState"] = "ERROR" + + db_content = self.update_operation_history( + db_content, workflow_status, resource_status + ) + + db_content["operatingState"] = "IDLE" + # self.logger.info("Content: {}".format(db_content)) + self.db.set_one(self.db_collection, content, db_content) + return diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 9d711ea5..6db88eb1 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -628,30 +628,56 @@ class Lcm: return elif topic == "vim_account": vim_id = params["_id"] - self.logger.info("Vim_ID: {}".format(vim_id)) + op_id = params["_id"] + db_vim = self.db.get_one("vim_accounts", {"_id": vim_id}) + vim_config = db_vim["config"] if command in ("create", "created"): self.logger.info("Command : {}".format(command)) - # "self.logger.info("Main config: {}".format(main_config))" - # "self.logger.info("Main config RO: {}".format(main_config.RO.ng))"" if not self.main_config.RO.ng: self.logger.info("Vim create") task = asyncio.ensure_future(self.vim.create(params, order_id)) self.lcm_tasks.register( "vim_account", vim_id, order_id, "vim_create", task ) - return + return + elif "credentials" in vim_config.keys(): + task = asyncio.ensure_future( + self.cloud_credentials.add(params, order_id) + ) + self.lcm_tasks.register( + "vim-account", vim_id, op_id, "cloud_credentials_add", task + ) + return elif command == "delete" or command == "deleted": - self.lcm_tasks.cancel(topic, vim_id) - task = asyncio.ensure_future(self.vim.delete(params, order_id)) - self.lcm_tasks.register( - "vim_account", vim_id, order_id, "vim_delete", task - ) - return + if "credentials" in vim_config.keys(): + task = asyncio.ensure_future( + self.cloud_credentials.remove(params, order_id) + ) + self.lcm_tasks.register( + "vim-account", vim_id, op_id, "cloud_credentials_remove", task + ) + return + else: + self.lcm_tasks.cancel(topic, vim_id) + task = asyncio.ensure_future(self.vim.delete(params, order_id)) + self.lcm_tasks.register( + "vim_account", vim_id, order_id, "vim_delete", task + ) + return elif command == "show": print("not implemented show with vim_account") sys.stdout.flush() return elif command in ("edit", "edited"): + if "credentials" in vim_config.keys(): + self.logger.info("Vim Edit") + task = asyncio.ensure_future( + self.cloud_credentials.edit(params, order_id) + ) + self.lcm_tasks.register( + "vim_account", vim_id, op_id, "cloud_credentials_update", task + ) + return if not self.main_config.RO.ng: task = asyncio.ensure_future(self.vim.edit(params, order_id)) self.lcm_tasks.register( @@ -709,24 +735,15 @@ class Lcm: elif command == "deleted": return # TODO cleaning of task just in case should be done elif topic == "cluster": + cluster_id = params["_id"] if command == "create" or command == "created": - operation_id = params["operation_id"] - cluster_id = params["_id"] - db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) - # cluster_id = params.get("_id") - self.logger.debug("operation_id = {}".format(operation_id)) self.logger.debug("cluster_id = {}".format(cluster_id)) - self.logger.debug("cluster_db = {}".format(db_cluster)) task = asyncio.ensure_future(self.cluster.create(params, order_id)) self.lcm_tasks.register( "cluster", cluster_id, order_id, "cluster_create", task ) return elif command == "delete" or command == "deleted": - # cluster_id = params.get("_id") - operation_id = params["operation_id"] - cluster_id = params["_id"] - db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) task = asyncio.ensure_future(self.cluster.delete(params, order_id)) self.lcm_tasks.register( "cluster", cluster_id, order_id, "cluster_delete", task @@ -760,6 +777,21 @@ class Lcm: "cluster", cluster_id, order_id, "cluster_deregister", task ) return + elif command == "get_creds": + task = asyncio.ensure_future(self.cluster.get_creds(params, order_id)) + # self.logger.info("task: {}".format(task)) + self.lcm_tasks.register( + "k8sclus", cluster_id, order_id, "k8sclus_get_creds", task + ) + return + elif command == "upgrade" or command == "scale": + task = asyncio.ensure_future(self.cluster.update(params, order_id)) + # self.logger.info("task: {}".format(task)) + if command == "upgrade": + self.lcm_tasks.register( + "k8sclus", cluster_id, order_id, "k8sclus_upgrade", task + ) + return elif topic == "k8s_app": if command == "profile_create" or command == "profile_created": k8s_app_id = params.get("_id") @@ -862,6 +894,55 @@ class Lcm: task, ) return + elif topic == "oka": + # self.logger.info("Oka Elif") + oka_id = params["_id"] + # self.logger.info("Command: {}".format(command)) + if command == "create": + task = asyncio.ensure_future(self.oka.create(params, order_id)) + # self.logger.info("Task: {}".format(task)) + self.lcm_tasks.register("oka", oka_id, order_id, "oka_create", task) + return + elif command == "edit": + task = asyncio.ensure_future(self.oka.edit(params, order_id)) + # self.logger.info("Task: {}".format(task)) + self.lcm_tasks.register("oka", oka_id, order_id, "oka_edit", task) + return + elif command == "delete": + task = asyncio.ensure_future(self.oka.delete(params, order_id)) + # self.logger.info("Task: {}".format(task)) + self.lcm_tasks.register("oka", oka_id, order_id, "oka_delete", task) + return + elif topic == "ksu": + # self.logger.info("Ksu Elif") + ksu_id = params["_id"] + if command == "create": + task = asyncio.ensure_future(self.ksu.create(params, order_id)) + # self.logger.info("task: {}".format(task)) + self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_create", task) + return + elif command == "edit" or command == "edited": + task = asyncio.ensure_future(self.ksu.edit(params, order_id)) + # self.logger.info("Task: {}".format(task)) + self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_edit", task) + return + elif command == "delete": + task = asyncio.ensure_future(self.ksu.delete(params, order_id)) + # self.logger.info("Task: {}".format(task)) + self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_delete", task) + return + elif command == "clone": + # self.logger.info("KSU clone") + task = asyncio.ensure_future(self.ksu.edit(params, order_id)) + # self.logger.info("Task: {}".format(task)) + self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_clone", task) + return + elif command == "move": + # self.logger.info("KSU move") + task = asyncio.ensure_future(self.ksu.edit(params, order_id)) + # self.logger.info("Task: {}".format(task)) + self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_move", task) + return self.logger.critical("unknown topic {} and command '{}'".format(topic, command)) @@ -894,12 +975,19 @@ class Lcm: "k8s_resource", "k8s_infra_controller", "k8s_infra_config", + "oka", + "ksu", ) self.logger.info( "Consecutive errors: {} first start: {}".format( self.consecutive_errors, self.first_start ) ) + # self.logger.info( + # "Consecutive errors: {} first start: {}".format( + # self.consecutive_errors, self.first_start + # ) + # ) topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( @@ -974,6 +1062,11 @@ class Lcm: self.k8s_infra_config = k8s.K8sInfraConfigLcm( self.msg, self.lcm_tasks, self.main_config.to_dict() ) + self.cloud_credentials = k8s.CloudCredentialsLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.oka = k8s.OkaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) + self.ksu = k8s.KsuLcm(self.msg, self.lcm_tasks, self.main_config.to_dict()) self.logger.info( "Msg: {} lcm tasks: {} main config: {}".format( diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index d1b310bb..47da8a9a 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -537,6 +537,8 @@ class TaskRegistry(LcmBase): "k8s_resource", "k8s_infra_controller", "k8s_infra_config", + "oka", + "ksu", ] # Map topic to InstanceID @@ -557,6 +559,8 @@ class TaskRegistry(LcmBase): "k8s_resource": "k8sresource", "k8s_infra_controller": "k8sinfra_controller", "k8s_infra_config": "k8sinfra_config", + "oka": "oka", + "ksu": "ksus", } def __init__(self, worker_id=None, logger=None): @@ -574,6 +578,8 @@ class TaskRegistry(LcmBase): "k8s_resource": {}, "k8s_infra_controller": {}, "k8s_infra_config": {}, + "oka": {}, + "ksu": {}, "odu": {}, } self.worker_id = worker_id -- 2.25.1