Features 11020,11022-11026: Advanced cluster management
Change-Id: I348e6149326e8ba7c2c79ea7ff2ea2223ed047ca
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py
index 6db88eb..77c3d82 100644
--- a/osm_lcm/lcm.py
+++ b/osm_lcm/lcm.py
@@ -46,6 +46,7 @@
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
from osm_lcm.data_utils.lcm_config import LcmCfg
+from osm_lcm.data_utils.list_utils import find_in_list
from osm_lcm.lcm_hc import get_health_check_file
from os import path, getenv
from n2vc import version as n2vc_version
@@ -63,6 +64,13 @@
class Lcm:
+ profile_collection_mapping = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "resource_profiles": "k8sresource",
+ "app_profiles": "k8sapp",
+ }
+
ping_interval_pace = (
120 # how many time ping is send once is confirmed all is running
)
@@ -315,6 +323,13 @@
wait_time = 2 if not first_start else 5
await asyncio.sleep(wait_time)
+ def get_operation_params(self, item, operation_id):
+ operation_history = item.get("operationHistory", [])
+ operation = find_in_list(
+ operation_history, lambda op: op["op_id"] == operation_id
+ )
+ return operation.get("operationParams", {})
+
async def kafka_read_callback(self, topic, command, params):
order_id = 1
self.logger.info(
@@ -628,56 +643,56 @@
return
elif topic == "vim_account":
vim_id = params["_id"]
- op_id = params["_id"]
+ op_id = vim_id
+ op_params = params
db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
- vim_config = db_vim["config"]
+ vim_config = db_vim.get("config", {})
+ self.logger.debug("Db Vim: {}".format(db_vim))
if command in ("create", "created"):
- self.logger.info("Command : {}".format(command))
+ self.logger.debug("Main config: {}".format(self.main_config.to_dict()))
+ if "credentials" in vim_config:
+ self.logger.info("Vim add cloud credentials")
+ task = asyncio.ensure_future(
+ self.cloud_credentials.add(op_id, op_params, db_vim)
+ )
+ self.lcm_tasks.register(
+ "vim_account", vim_id, op_id, "cloud_credentials_add", task
+ )
if not self.main_config.RO.ng:
- self.logger.info("Vim create")
+ self.logger.info("Calling RO to create VIM (no NG-RO)")
task = asyncio.ensure_future(self.vim.create(params, order_id))
self.lcm_tasks.register(
"vim_account", vim_id, order_id, "vim_create", task
)
- return
- elif "credentials" in vim_config.keys():
- task = asyncio.ensure_future(
- self.cloud_credentials.add(params, order_id)
- )
- self.lcm_tasks.register(
- "vim-account", vim_id, op_id, "cloud_credentials_add", task
- )
- return
+ return
elif command == "delete" or command == "deleted":
- if "credentials" in vim_config.keys():
+ self.lcm_tasks.cancel(topic, vim_id)
+ if "credentials" in vim_config:
+ self.logger.info("Vim remove cloud credentials")
task = asyncio.ensure_future(
- self.cloud_credentials.remove(params, order_id)
+ self.cloud_credentials.remove(op_id, op_params, db_vim)
)
self.lcm_tasks.register(
- "vim-account", vim_id, op_id, "cloud_credentials_remove", task
+ "vim_account", vim_id, op_id, "cloud_credentials_remove", task
)
- return
- else:
- self.lcm_tasks.cancel(topic, vim_id)
- task = asyncio.ensure_future(self.vim.delete(params, order_id))
- self.lcm_tasks.register(
- "vim_account", vim_id, order_id, "vim_delete", task
- )
- return
+ task = asyncio.ensure_future(self.vim.delete(params, order_id))
+ self.lcm_tasks.register(
+ "vim_account", vim_id, order_id, "vim_delete", task
+ )
+ return
elif command == "show":
print("not implemented show with vim_account")
sys.stdout.flush()
return
elif command in ("edit", "edited"):
- if "credentials" in vim_config.keys():
- self.logger.info("Vim Edit")
+ if "credentials" in vim_config:
+ self.logger.info("Vim update cloud credentials")
task = asyncio.ensure_future(
- self.cloud_credentials.edit(params, order_id)
+ self.cloud_credentials.edit(op_id, op_params, db_vim)
)
self.lcm_tasks.register(
"vim_account", vim_id, op_id, "cloud_credentials_update", task
)
- return
if not self.main_config.RO.ng:
task = asyncio.ensure_future(self.vim.edit(params, order_id))
self.lcm_tasks.register(
@@ -735,213 +750,322 @@
elif command == "deleted":
return # TODO cleaning of task just in case should be done
elif topic == "cluster":
- cluster_id = params["_id"]
+ if command != "get_creds":
+ op_id = params["operation_id"]
+ cluster_id = params["cluster_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+ op_params = self.get_operation_params(db_cluster, op_id)
+ db_content = {
+ "cluster": db_cluster,
+ }
if command == "create" or command == "created":
self.logger.debug("cluster_id = {}".format(cluster_id))
- task = asyncio.ensure_future(self.cluster.create(params, order_id))
+ # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]})
+ db_vim = self.db.get_one(
+ "vim_accounts", {"name": db_cluster["vim_account"]}
+ )
+ db_content["vim_account"] = db_vim
+ task = asyncio.ensure_future(
+ self.cluster.create(op_id, op_params, db_content)
+ )
self.lcm_tasks.register(
- "cluster", cluster_id, order_id, "cluster_create", task
+ "cluster", cluster_id, op_id, "cluster_create", task
)
return
elif command == "delete" or command == "deleted":
- task = asyncio.ensure_future(self.cluster.delete(params, order_id))
+ task = asyncio.ensure_future(
+ self.cluster.delete(op_id, op_params, db_content)
+ )
self.lcm_tasks.register(
- "cluster", cluster_id, order_id, "cluster_delete", task
+ "cluster", cluster_id, op_id, "cluster_delete", task
)
return
elif command == "add" or command == "added":
- add_id = params.get("_id")
- task = asyncio.ensure_future(self.cluster.add(params, order_id))
+ profile_type = params["profile_type"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(
+ profile_collection, {"_id": params["profile_id"]}
+ )
+ db_profile["profile_type"] = profile_type
+ db_content["profile"] = db_profile
+ task = asyncio.ensure_future(
+ self.cluster.attach_profile(op_id, op_params, db_content)
+ )
self.lcm_tasks.register(
- "cluster", add_id, order_id, "profile_add", task
+ "cluster", cluster_id, op_id, "profile_add", task
)
return
elif command == "remove" or command == "removed":
- remove_id = params.get("_id")
- task = asyncio.ensure_future(self.cluster.remove(params, order_id))
+ profile_type = params["profile_type"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(
+ profile_collection, {"_id": params["profile_id"]}
+ )
+ db_profile["profile_type"] = profile_type
+ db_content["profile"] = db_profile
+ task = asyncio.ensure_future(
+ self.cluster.detach_profile(op_id, op_params, db_content)
+ )
self.lcm_tasks.register(
- "cluster", remove_id, order_id, "profile_remove", task
+ "cluster", cluster_id, op_id, "profile_remove", task
)
return
elif command == "register" or command == "registered":
- cluster_id = params.get("_id")
- task = asyncio.ensure_future(self.cluster.register(params, order_id))
+ task = asyncio.ensure_future(
+ self.cluster.register(op_id, op_params, db_content)
+ )
self.lcm_tasks.register(
- "cluster", cluster_id, order_id, "cluster_register", task
+ "cluster", cluster_id, op_id, "cluster_register", task
)
return
elif command == "deregister" or command == "deregistered":
- cluster_id = params.get("_id")
- task = asyncio.ensure_future(self.cluster.deregister(params, order_id))
+ task = asyncio.ensure_future(
+ self.cluster.deregister(op_id, op_params, db_content)
+ )
self.lcm_tasks.register(
- "cluster", cluster_id, order_id, "cluster_deregister", task
+ "cluster", cluster_id, op_id, "cluster_deregister", task
)
return
elif command == "get_creds":
- task = asyncio.ensure_future(self.cluster.get_creds(params, order_id))
- # self.logger.info("task: {}".format(task))
+ cluster_id = params["_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+ task = asyncio.ensure_future(self.cluster.get_creds(db_cluster))
self.lcm_tasks.register(
- "k8sclus", cluster_id, order_id, "k8sclus_get_creds", task
+ "cluster", cluster_id, cluster_id, "cluster_get_credentials", task
)
return
elif command == "upgrade" or command == "scale":
- task = asyncio.ensure_future(self.cluster.update(params, order_id))
- # self.logger.info("task: {}".format(task))
- if command == "upgrade":
- self.lcm_tasks.register(
- "k8sclus", cluster_id, order_id, "k8sclus_upgrade", task
- )
+ # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]})
+ db_vim = self.db.get_one(
+ "vim_accounts", {"name": db_cluster["vim_account"]}
+ )
+ db_content["vim_account"] = db_vim
+ task = asyncio.ensure_future(
+ self.cluster.update(op_id, op_params, db_content)
+ )
+ self.lcm_tasks.register(
+ "cluster", cluster_id, op_id, "cluster_update", task
+ )
return
elif topic == "k8s_app":
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+ db_profile = self.db.get_one("k8sapp", {"_id": profile_id})
+ db_profile["profile_type"] = "applications"
+ op_params = self.get_operation_params(db_profile, op_id)
if command == "profile_create" or command == "profile_created":
- k8s_app_id = params.get("_id")
- self.logger.debug("k8s_app_id = {}".format(k8s_app_id))
- task = asyncio.ensure_future(self.k8s_app.create(params, order_id))
+ self.logger.debug("Create k8s_app_id = {}".format(profile_id))
+ task = asyncio.ensure_future(
+ self.k8s_app.create(op_id, op_params, db_profile)
+ )
self.lcm_tasks.register(
- "k8s_app", k8s_app_id, order_id, "k8s_app_create", task
+ "k8s_app", profile_id, op_id, "k8s_app_create", task
)
return
elif command == "delete" or command == "deleted":
- k8s_app_id = params.get("_id")
- task = asyncio.ensure_future(self.k8s_app.delete(params, order_id))
+ self.logger.debug("Delete k8s_app_id = {}".format(profile_id))
+ task = asyncio.ensure_future(
+ self.k8s_app.delete(op_id, op_params, db_profile)
+ )
self.lcm_tasks.register(
- "k8s_app", k8s_app_id, order_id, "k8s_app_delete", task
+ "k8s_app", profile_id, op_id, "k8s_app_delete", task
)
return
elif topic == "k8s_resource":
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+ db_profile = self.db.get_one("k8sresource", {"_id": profile_id})
+ db_profile["profile_type"] = "managed-resources"
+ op_params = self.get_operation_params(db_profile, op_id)
if command == "profile_create" or command == "profile_created":
- k8s_resource_id = params.get("_id")
- self.logger.debug("k8s_resource_id = {}".format(k8s_resource_id))
- task = asyncio.ensure_future(self.k8s_resource.create(params, order_id))
+ self.logger.debug("Create k8s_resource_id = {}".format(profile_id))
+ task = asyncio.ensure_future(
+ self.k8s_resource.create(op_id, op_params, db_profile)
+ )
self.lcm_tasks.register(
"k8s_resource",
- k8s_resource_id,
- order_id,
+ profile_id,
+ op_id,
"k8s_resource_create",
task,
)
return
elif command == "delete" or command == "deleted":
- k8s_resource_id = params.get("_id")
- task = asyncio.ensure_future(self.k8s_resource.delete(params, order_id))
+ self.logger.debug("Delete k8s_resource_id = {}".format(profile_id))
+ task = asyncio.ensure_future(
+ self.k8s_resource.delete(op_id, op_params, db_profile)
+ )
self.lcm_tasks.register(
"k8s_resource",
- k8s_resource_id,
- order_id,
+ profile_id,
+ op_id,
"k8s_resource_delete",
task,
)
return
elif topic == "k8s_infra_controller":
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+ db_profile = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
+ db_profile["profile_type"] = "infra-controllers"
+ op_params = self.get_operation_params(db_profile, op_id)
if command == "profile_create" or command == "profile_created":
- k8s_infra_controller_id = params.get("_id")
self.logger.debug(
- "k8s_infra_controller_id = {}".format(k8s_infra_controller_id)
+ "Create k8s_infra_controller_id = {}".format(profile_id)
)
task = asyncio.ensure_future(
- self.k8s_infra_controller.create(params, order_id)
+ self.k8s_infra_controller.create(op_id, op_params, db_profile)
)
self.lcm_tasks.register(
"k8s_infra_controller",
- k8s_infra_controller_id,
- order_id,
+ profile_id,
+ op_id,
"k8s_infra_controller_create",
task,
)
return
elif command == "delete" or command == "deleted":
- k8s_infra_controller_id = params.get("_id")
+ self.logger.debug(
+ "Delete k8s_infra_controller_id = {}".format(profile_id)
+ )
task = asyncio.ensure_future(
- self.k8s_infra_controller.delete(params, order_id)
+ self.k8s_infra_controller.delete(op_id, op_params, db_profile)
)
self.lcm_tasks.register(
"k8s_infra_controller",
- k8s_infra_controller_id,
- order_id,
+ profile_id,
+ op_id,
"k8s_infra_controller_delete",
task,
)
return
elif topic == "k8s_infra_config":
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+ db_profile = self.db.get_one("k8sinfra_config", {"_id": profile_id})
+ db_profile["profile_type"] = "infra-configs"
+ op_params = self.get_operation_params(db_profile, op_id)
if command == "profile_create" or command == "profile_created":
- k8s_infra_config_id = params.get("_id")
- self.logger.debug(
- "k8s_infra_config_id = {}".format(k8s_infra_config_id)
- )
+ self.logger.debug("Create k8s_infra_config_id = {}".format(profile_id))
task = asyncio.ensure_future(
- self.k8s_infra_config.create(params, order_id)
+ self.k8s_infra_config.create(op_id, op_params, db_profile)
)
self.lcm_tasks.register(
"k8s_infra_config",
- k8s_infra_config_id,
- order_id,
+ profile_id,
+ op_id,
"k8s_infra_config_create",
task,
)
return
elif command == "delete" or command == "deleted":
- k8s_infra_config_id = params.get("_id")
+ self.logger.debug("Delete k8s_infra_config_id = {}".format(profile_id))
task = asyncio.ensure_future(
- self.k8s_infra_config.delete(params, order_id)
+ self.k8s_infra_config.delete(op_id, op_params, db_profile)
)
self.lcm_tasks.register(
"k8s_infra_config",
- k8s_infra_config_id,
- order_id,
+ profile_id,
+ op_id,
"k8s_infra_config_delete",
task,
)
return
elif topic == "oka":
- # self.logger.info("Oka Elif")
- oka_id = params["_id"]
- # self.logger.info("Command: {}".format(command))
+ op_id = params["operation_id"]
+ oka_id = params["oka_id"]
+ db_oka = self.db.get_one("okas", {"_id": oka_id})
+ op_params = self.get_operation_params(db_oka, op_id)
if command == "create":
- task = asyncio.ensure_future(self.oka.create(params, order_id))
- # self.logger.info("Task: {}".format(task))
- self.lcm_tasks.register("oka", oka_id, order_id, "oka_create", task)
+ task = asyncio.ensure_future(self.oka.create(op_id, op_params, db_oka))
+ self.lcm_tasks.register("oka", oka_id, op_id, "oka_create", task)
return
elif command == "edit":
- task = asyncio.ensure_future(self.oka.edit(params, order_id))
- # self.logger.info("Task: {}".format(task))
- self.lcm_tasks.register("oka", oka_id, order_id, "oka_edit", task)
+ task = asyncio.ensure_future(self.oka.edit(op_id, op_params, db_oka))
+ self.lcm_tasks.register("oka", oka_id, op_id, "oka_edit", task)
return
elif command == "delete":
- task = asyncio.ensure_future(self.oka.delete(params, order_id))
- # self.logger.info("Task: {}".format(task))
- self.lcm_tasks.register("oka", oka_id, order_id, "oka_delete", task)
+ task = asyncio.ensure_future(self.oka.delete(op_id, op_params, db_oka))
+ self.lcm_tasks.register("oka", oka_id, op_id, "oka_delete", task)
return
elif topic == "ksu":
- # self.logger.info("Ksu Elif")
- ksu_id = params["_id"]
+ op_id = params["operation_id"]
+ op_params = None
+ db_content = None
+ if not (command == "clone" or command == "move"):
+ # op_params is a list
+ # db_content is a list of KSU
+ db_content = []
+ op_params = []
+ for ksu_id in params["ksus_list"]:
+ db_ksu = self.db.get_one("ksus", {"_id": ksu_id})
+ db_content.append(db_ksu)
+ ksu_params = {}
+ if command == "delete":
+ ksu_params["profile"] = {}
+ ksu_params["profile"]["profile_type"] = db_ksu["profile"][
+ "profile_type"
+ ]
+ ksu_params["profile"]["_id"] = db_ksu["profile"]["_id"]
+ else:
+ ksu_params = self.get_operation_params(db_ksu, op_id)
+ # Update ksu_params["profile"] with profile name and age-pubkey
+ profile_type = ksu_params["profile"]["profile_type"]
+ profile_id = ksu_params["profile"]["_id"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(
+ profile_collection, {"_id": profile_id}
+ )
+ ksu_params["profile"]["name"] = db_profile["name"]
+ ksu_params["profile"]["age_pubkey"] = db_profile.get(
+ "age_pubkey", ""
+ )
+ if command == "create" or command == "edit" or command == "edited":
+ # Update ksu_params["oka"] with sw_catalog_path (when missing)
+ for oka in ksu_params["oka"]:
+ if "sw_catalog_path" not in oka:
+ oka_id = oka["_id"]
+ db_oka = self.db.get_one("okas", {"_id": oka_id})
+ oka[
+ "sw_catalog_path"
+ ] = f"infra-controllers/{db_oka['git_name']}"
+ op_params.append(ksu_params)
+ else:
+ # db_content and op_params are single items
+ db_content = self.db.get_one("ksus", {"_id": params["_id"]})
+ db_content = db_ksu
+ op_params = self.get_operation_params(db_ksu, op_id)
if command == "create":
- task = asyncio.ensure_future(self.ksu.create(params, order_id))
- # self.logger.info("task: {}".format(task))
- self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_create", task)
+ task = asyncio.ensure_future(
+ self.ksu.create(op_id, op_params, db_content)
+ )
+ self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_create", task)
return
elif command == "edit" or command == "edited":
- task = asyncio.ensure_future(self.ksu.edit(params, order_id))
- # self.logger.info("Task: {}".format(task))
- self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_edit", task)
+ task = asyncio.ensure_future(
+ self.ksu.edit(op_id, op_params, db_content)
+ )
+ self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_edit", task)
return
elif command == "delete":
- task = asyncio.ensure_future(self.ksu.delete(params, order_id))
- # self.logger.info("Task: {}".format(task))
- self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_delete", task)
+ task = asyncio.ensure_future(
+ self.ksu.delete(op_id, op_params, db_content)
+ )
+ self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_delete", task)
return
elif command == "clone":
- # self.logger.info("KSU clone")
- task = asyncio.ensure_future(self.ksu.edit(params, order_id))
- # self.logger.info("Task: {}".format(task))
- self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_clone", task)
+ task = asyncio.ensure_future(
+ self.ksu.clone(op_id, op_params, db_content)
+ )
+ self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_clone", task)
return
elif command == "move":
- # self.logger.info("KSU move")
- task = asyncio.ensure_future(self.ksu.edit(params, order_id))
- # self.logger.info("Task: {}".format(task))
- self.lcm_tasks.register("ksu", ksu_id, order_id, "ksu_move", task)
+ task = asyncio.ensure_future(
+ self.ksu.move(op_id, op_params, db_content)
+ )
+ self.lcm_tasks.register("ksu", ksu_id, op_id, "ksu_move", task)
return
self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
@@ -983,11 +1107,6 @@
self.consecutive_errors, self.first_start
)
)
- # self.logger.info(
- # "Consecutive errors: {} first start: {}".format(
- # self.consecutive_errors, self.first_start
- # )
- # )
topics_admin = ("admin",)
await asyncio.gather(
self.msg.aioread(