from copy import deepcopy
from osm_lcm import odu_workflows
from osm_lcm import vim_sdn
+from osm_lcm.data_utils.list_utils import find_in_list
class GitOpsLcm(LcmBase):
db_collection = "gitops"
+ profile_collection_mapping = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "resource_profiles": "k8sresource",
+ "app_profiles": "k8sapp",
+ }
+
def __init__(self, msg, lcm_tasks, config):
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
}
return db_cluster_copy
+ def get_operation_params(self, item, operation_id):
+ operation_history = item.get("operationHistory", [])
+ operation = find_in_list(
+ operation_history, lambda op: op["op_id"] == operation_id
+ )
+ return operation.get("operationParams", {})
+
class ClusterLcm(GitOpsLcm):
db_collection = "clusters"
}
self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
- async def create(self, op_id, op_params, content):
+ async def create(self, params, order_id):
self.logger.info("cluster Create Enter")
- db_cluster = content["cluster"]
+ # To get the cluster details
+ cluster_id = params["cluster_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+
+ # To get the operation params details
+ op_id = params["operation_id"]
+ op_params = self.get_operation_params(db_cluster, op_id)
+
+ # To initialize the operation states
+ self.initialize_operation(cluster_id, op_id)
+
+ # To copy the cluster content and decrypting the key to use in workflows
db_cluster_copy = self.decrypting_key(db_cluster)
- # vim account details
+ # To get the vim account details
db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
db_cluster_copy["vim_account"] = db_vim
db_cluster["current_operation"] = None
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
self.update_profile_state(db_cluster, workflow_status, resource_status)
+
+ db_register = self.db.get_one("k8sclusters", {"name": db_cluster["name"]})
+
+ if db_cluster["resourceState"] == "READY" and db_cluster["state"] == "CREATED":
+ result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
+ # To call the lcm.py for registering the cluster in k8scluster lcm.
+ db_register["credentials"] = cluster_creds
+ self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
+ register = await self.regist.create(db_register, order_id)
+ self.logger.debug(f"Register is : {register}")
+ else:
+ db_register["_admin"]["operationalState"] = "ERROR"
+ result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
+ # To call the lcm.py for registering the cluster in k8scluster lcm.
+ db_register["credentials"] = cluster_creds
+ self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
+
return
async def check_create_cluster(self, op_id, op_params, content):
"app_profiles",
"resource_profiles",
]
+ """
profiles_collection = {
"infra_controller_profiles": "k8sinfra_controller",
"infra_config_profiles": "k8sinfra_config",
"app_profiles": "k8sapp",
"resource_profiles": "k8sresource",
}
+ """
self.logger.info("the db_cluster is :{}".format(db_cluster))
for profile_type in profiles:
profile_id = db_cluster[profile_type]
- self.logger.info("profile id is : {}".format(profile_id))
- db_collection = profiles_collection[profile_type]
- self.logger.info("the db_collection is :{}".format(db_collection))
+ # db_collection = profiles_collection[profile_type]
+ db_collection = self.profile_collection_mapping[profile_type]
db_profile = self.db.get_one(db_collection, {"_id": profile_id})
- self.logger.info("the db_profile is :{}".format(db_profile))
op_id = db_profile["operationHistory"][-1].get("op_id")
db_profile["state"] = db_cluster["state"]
db_profile["resourceState"] = db_cluster["resourceState"]
db_profile = self.update_operation_history(
db_profile, op_id, workflow_status, resource_status
)
- self.logger.info("the db_profile is :{}".format(db_profile))
self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile)
- async def delete(self, op_id, op_params, content):
+ async def delete(self, params, order_id):
self.logger.info("cluster delete Enter")
- db_cluster = content["cluster"]
+
+ # To get the cluster details
+ cluster_id = params["cluster_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+
+ # To get the operation params details
+ op_id = params["operation_id"]
+ op_params = self.get_operation_params(db_cluster, op_id)
+
+ # To initialize the operation states
+ self.initialize_operation(cluster_id, op_id)
+
+ # To copy the cluster content and decrypting the key to use in workflows
+ db_cluster_copy = self.decrypting_key(db_cluster)
+
if db_cluster["created"] == "false":
- return await self.deregister(op_id, op_params, content)
+ return await self.deregister(params, order_id)
_, workflow_name = await self.odu.launch_workflow(
- "delete_cluster", op_id, op_params, content
+ "delete_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info("workflow_name is :{}".format(workflow_name))
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
- "delete_cluster", op_id, op_params, content
+ "delete_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
# To delete it from DB
if db_cluster["state"] == "DELETED":
self.delete_cluster(db_cluster)
+
+ # To delete it from k8scluster collection
+ self.db.del_one("k8sclusters", {"name": db_cluster["name"]})
+
return
def delete_cluster(self, db_cluster):
"app_profiles",
"resource_profiles",
]
+ """
profiles_collection = {
"infra_controller_profiles": "k8sinfra_controller",
"infra_config_profiles": "k8sinfra_config",
"app_profiles": "k8sapp",
"resource_profiles": "k8sresource",
}
+ """
for profile_type in profiles_to_detach:
if db_cluster.get(profile_type):
- self.logger.debug("the profile_type is :{}".format(profile_type))
profile_ids = db_cluster[profile_type]
- self.logger.debug("the profile_ids is :{}".format(profile_ids))
profile_ids_copy = deepcopy(profile_ids)
- self.logger.debug(
- "the profile_ids_copy is :{}".format(profile_ids_copy)
- )
for profile_id in profile_ids_copy:
- self.logger.debug("the profile_id is :{}".format(profile_id))
- db_collection = profiles_collection[profile_type]
- self.logger.debug("the db_collection is :{}".format(db_collection))
+ # db_collection = profiles_collection[profile_type]
+ db_collection = self.profile_collection_mapping[profile_type]
db_profile = self.db.get_one(db_collection, {"_id": profile_id})
self.logger.debug("the db_profile is :{}".format(db_profile))
self.logger.debug(
"the db_profile name is :{}".format(db_profile["name"])
)
if db_cluster["name"] == db_profile["name"]:
- self.logger.debug("it is getting into if default")
self.db.del_one(db_collection, {"_id": profile_id})
else:
- self.logger.debug("it is getting into else non default")
profile_ids.remove(profile_id)
update_dict = {profile_type: profile_ids}
- self.logger.debug(f"the update dict is :{update_dict}")
self.db.set_one(
"clusters", {"_id": db_cluster["_id"]}, update_dict
)
self.db.del_one("clusters", {"_id": db_cluster["_id"]})
- self.logger.debug("the id is :{}".format(db_cluster["_id"]))
- async def attach_profile(self, op_id, op_params, content):
+ async def attach_profile(self, params, order_id):
self.logger.info("profile attach Enter")
- db_cluster = content["cluster"]
- db_profile = content["profile"]
- profile_type = db_profile["profile_type"]
- profile_id = db_profile["_id"]
- self.logger.info("profile type is : {}".format(profile_type))
- self.logger.info("profile id is : {}".format(profile_id))
+
+ # To get the cluster details
+ cluster_id = params["cluster_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+ # content = {
+ # "cluster": db_cluster,
+ # }
+
+ # To get the operation params details
+ op_id = params["operation_id"]
+ op_params = self.get_operation_params(db_cluster, op_id)
+
+ # To initialize the operation states
+ self.initialize_operation(cluster_id, op_id)
+
+ # To copy the cluster content and decrypting the key to use in workflows
+ db_cluster_copy = self.decrypting_key(db_cluster)
+
+ # To get the profile details
+ profile_id = params["profile_id"]
+ profile_type = params["profile_type"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+ db_profile["profile_type"] = profile_type
+ # content["profile"] = db_profile
+ db_cluster_copy["profile"] = db_profile
_, workflow_name = await self.odu.launch_workflow(
- "attach_profile_to_cluster", op_id, op_params, content
+ "attach_profile_to_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info("workflow_name is :{}".format(workflow_name))
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
- "attach_profile_to_cluster", op_id, op_params, content
+ "attach_profile_to_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
db_cluster, op_id, workflow_status, resource_status
)
profile_list = db_cluster[profile_type]
- self.logger.info("profile list is : {}".format(profile_list))
if resource_status:
- self.logger.info("it is getting into resource status true")
profile_list.append(profile_id)
- self.logger.info("profile list is : {}".format(profile_list))
db_cluster[profile_type] = profile_list
- self.logger.info("db cluster is : {}".format(db_cluster))
- # update_dict = {item: profile_list}
- # self.logger.info("the update_dict is :{}".format(update_dict))
- # self.db.set_one(self.topic, filter_q, update_dict)
db_cluster["current_operation"] = None
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
- async def detach_profile(self, op_id, op_params, content):
+ async def detach_profile(self, params, order_id):
self.logger.info("profile dettach Enter")
- db_cluster = content["cluster"]
- db_profile = content["profile"]
- profile_type = db_profile["profile_type"]
- profile_id = db_profile["_id"]
- self.logger.info("profile type is : {}".format(profile_type))
- self.logger.info("profile id is : {}".format(profile_id))
+
+ # To get the cluster details
+ cluster_id = params["cluster_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+ # content = {
+ # "cluster": db_cluster,
+ # }
+
+ # To get the operation params details
+ op_id = params["operation_id"]
+ op_params = self.get_operation_params(db_cluster, op_id)
+
+ # To initialize the operation states
+ self.initialize_operation(cluster_id, op_id)
+
+ # To copy the cluster content and decrypting the key to use in workflows
+ db_cluster_copy = self.decrypting_key(db_cluster)
+
+ # To get the profile details
+ profile_id = params["profile_id"]
+ profile_type = params["profile_type"]
+ profile_collection = self.profile_collection_mapping[profile_type]
+ db_profile = self.db.get_one(profile_collection, {"_id": profile_id})
+ db_profile["profile_type"] = profile_type
+ # content["profile"] = db_profile
+ db_cluster_copy["profile"] = db_profile
_, workflow_name = await self.odu.launch_workflow(
- "detach_profile_from_cluster", op_id, op_params, content
+ "detach_profile_from_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info("workflow_name is :{}".format(workflow_name))
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
- "detach_profile_from_cluster", op_id, op_params, content
+ "detach_profile_from_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
profile_list = db_cluster[profile_type]
self.logger.info("profile list is : {}".format(profile_list))
if resource_status:
- self.logger.info("it is getting into resource status true")
profile_list.remove(profile_id)
- self.logger.info("profile list is : {}".format(profile_list))
db_cluster[profile_type] = profile_list
- self.logger.info("db cluster is : {}".format(db_cluster))
- # update_dict = {item: profile_list}
- # self.logger.info("the update_dict is :{}".format(update_dict))
- # self.db.set_one(self.topic, filter_q, update_dict)
db_cluster["current_operation"] = None
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
- async def register(self, op_id, op_params, content):
+ async def register(self, params, order_id):
self.logger.info("cluster register enter")
- db_cluster = content["cluster"]
+ # To get the cluster details
+ cluster_id = params["cluster_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+ # content = {
+ # "cluster": db_cluster,
+ # }
+
+ # To get the operation params details
+ op_id = params["operation_id"]
+ op_params = self.get_operation_params(db_cluster, op_id)
+
+ # To initialize the operation states
+ self.initialize_operation(cluster_id, op_id)
+
+ # To copy the cluster content and decrypting the key to use in workflows
db_cluster_copy = self.decrypting_key(db_cluster)
_, workflow_name = await self.odu.launch_workflow(
)
db_cluster["current_operation"] = None
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+
+ db_register = self.db.get_one("k8sclusters", {"name": db_cluster["name"]})
+ db_register["credentials"] = db_cluster["credentials"]
+ self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
+
+ if db_cluster["resourceState"] == "READY" and db_cluster["state"] == "CREATED":
+ # To call the lcm.py for registering the cluster in k8scluster lcm.
+ register = await self.regist.create(db_register, order_id)
+ self.logger.debug(f"Register is : {register}")
+ else:
+ db_register["_admin"]["operationalState"] = "ERROR"
+ self.db.set_one("k8sclusters", {"_id": db_register["_id"]}, db_register)
+
return
- async def deregister(self, op_id, op_params, content):
+ async def deregister(self, params, order_id):
self.logger.info("cluster deregister enter")
- db_cluster = content["cluster"]
- self.logger.info("db_cluster is : {}".format(db_cluster))
+ # To get the cluster details
+ cluster_id = params["cluster_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+ # content = {
+ # "cluster": db_cluster,
+ # }
+
+ # To get the operation params details
+ op_id = params["operation_id"]
+ op_params = self.get_operation_params(db_cluster, op_id)
+
+ # To initialize the operation states
+ self.initialize_operation(cluster_id, op_id)
+
+ # To copy the cluster content and decrypting the key to use in workflows
+ db_cluster_copy = self.decrypting_key(db_cluster)
_, workflow_name = await self.odu.launch_workflow(
- "deregister_cluster", op_id, op_params, content
+ "deregister_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info("workflow_name is :{}".format(workflow_name))
# Clean items used in the workflow or in the cluster, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
- "deregister_cluster", op_id, op_params, content
+ "deregister_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
- "deregister_cluster", op_id, op_params, content
+ "deregister_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
# To delete it from DB
if db_cluster["state"] == "DELETED":
self.db.del_one("clusters", {"_id": db_cluster["_id"]})
+
+ # To delete it from k8scluster collection
+ self.db.del_one("k8sclusters", {"name": db_cluster["name"]})
+
return
- async def get_creds(self, op_id, db_cluster):
+ async def get_creds(self, params, order_id):
self.logger.info("Cluster get creds Enter")
+ cluster_id = params["cluster_id"]
+ op_id = params["operation_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
if result:
db_cluster["credentials"] = cluster_creds
op_len += 1
db_cluster["current_operation"] = None
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+ self.logger.info("Cluster Get Creds Exit")
return
- async def update(self, op_id, op_params, content):
+ async def update(self, params, order_id):
self.logger.info("Cluster update Enter")
- db_cluster = content["cluster"]
+ # To get the cluster details
+ cluster_id = params["cluster_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+
+ # To get the operation params details
+ op_id = params["operation_id"]
+ op_params = self.get_operation_params(db_cluster, op_id)
db_cluster_copy = self.decrypting_key(db_cluster)
class K8sAppLcm(GitOpsLcm):
+ db_collection = "k8sapp"
+
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
"""
super().__init__(msg, lcm_tasks, config)
- async def create(self, op_id, op_params, content):
+ async def create(self, params, order_id):
self.logger.info("App Create Enter")
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+
+ # To initialize the operation states
+ self.initialize_operation(profile_id, op_id)
+
+ content = self.db.get_one("k8sapp", {"_id": profile_id})
+ content["profile_type"] = "applications"
+ op_params = self.get_operation_params(content, op_id)
+ self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+
_, workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
return
- async def delete(self, op_id, op_params, content):
+ async def delete(self, params, order_id):
self.logger.info("App delete Enter")
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+
+ # To initialize the operation states
+ self.initialize_operation(profile_id, op_id)
+
+ content = self.db.get_one("k8sapp", {"_id": profile_id})
+ op_params = self.get_operation_params(content, op_id)
+
_, workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
class K8sResourceLcm(GitOpsLcm):
+ db_collection = "k8sresource"
+
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
"""
super().__init__(msg, lcm_tasks, config)
- async def create(self, op_id, op_params, content):
+ async def create(self, params, order_id):
self.logger.info("Resource Create Enter")
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+
+ # To initialize the operation states
+ self.initialize_operation(profile_id, op_id)
+
+ content = self.db.get_one("k8sresource", {"_id": profile_id})
+ content["profile_type"] = "managed-resources"
+ op_params = self.get_operation_params(content, op_id)
+ self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
+
_, workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
return
- async def delete(self, op_id, op_params, content):
+ async def delete(self, params, order_id):
self.logger.info("Resource delete Enter")
- content = self.db.get_one("k8sresource", {"_id": content["_id"]})
+
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+
+ # To initialize the operation states
+ self.initialize_operation(profile_id, op_id)
+
+ content = self.db.get_one("k8sresource", {"_id": profile_id})
+ op_params = self.get_operation_params(content, op_id)
_, workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
class K8sInfraControllerLcm(GitOpsLcm):
+ db_collection = "k8sinfra_controller"
+
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
"""
super().__init__(msg, lcm_tasks, config)
- async def create(self, op_id, op_params, content):
+ async def create(self, params, order_id):
self.logger.info("Infra controller Create Enter")
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+
+ # To initialize the operation states
+ self.initialize_operation(profile_id, op_id)
+
+ content = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
+ content["profile_type"] = "infra-controllers"
+ op_params = self.get_operation_params(content, op_id)
+ self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
+
_, workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
return
- async def delete(self, op_id, op_params, content):
+ async def delete(self, params, order_id):
self.logger.info("Infra controller delete Enter")
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+
+ # To initialize the operation states
+ self.initialize_operation(profile_id, op_id)
+
+ content = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
+ op_params = self.get_operation_params(content, op_id)
+
_, workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
class K8sInfraConfigLcm(GitOpsLcm):
+ db_collection = "k8sinfra_config"
+
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
"""
super().__init__(msg, lcm_tasks, config)
- async def create(self, op_id, op_params, content):
+ async def create(self, params, order_id):
self.logger.info("Infra config Create Enter")
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+
+ # To initialize the operation states
+ self.initialize_operation(profile_id, op_id)
+
+ content = self.db.get_one("k8sinfra_config", {"_id": profile_id})
+ content["profile_type"] = "infra-configs"
+ op_params = self.get_operation_params(content, op_id)
+ self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
+
_, workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
return
- async def delete(self, op_id, op_params, content):
+ async def delete(self, params, order_id):
self.logger.info("Infra config delete Enter")
+ op_id = params["operation_id"]
+ profile_id = params["profile_id"]
+
+ # To initialize the operation states
+ self.initialize_operation(profile_id, op_id)
+
+ content = self.db.get_one("k8sinfra_config", {"_id": profile_id})
+ op_params = self.get_operation_params(content, op_id)
+
_, workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
elif command == "deleted":
return # TODO cleaning of task just in case should be done
elif topic == "cluster":
- if command != "get_creds":
- op_id = params["operation_id"]
- cluster_id = params["cluster_id"]
- self.cluster.initialize_operation(cluster_id, op_id)
- db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
- """
- if command in (
- "create",
- "created",
- "register",
- "registered",
- "upgrade",
- "scale",
- ):
- self.db.encrypt_decrypt_fields(
- db_cluster,
- "decrypt",
- ["age_pubkey", "age_privkey"],
- schema_version="1.11",
- salt=cluster_id,
- )
- """
- op_params = self.get_operation_params(db_cluster, op_id)
- db_content = {
- "cluster": db_cluster,
- }
+ cluster_id = params["cluster_id"]
+ op_id = params["operation_id"]
if command == "create" or command == "created":
self.logger.debug("cluster_id = {}".format(cluster_id))
- # db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]})
- """
- db_vim = self.db.get_one(
- "vim_accounts", {"name": db_cluster["vim_account"]}
- )
- db_content["vim_account"] = db_vim
- """
- task = asyncio.ensure_future(
- self.cluster.create(op_id, op_params, db_content)
- )
+ task = asyncio.ensure_future(self.cluster.create(params, order_id))
self.lcm_tasks.register(
"cluster", cluster_id, op_id, "cluster_create", task
)
return
elif command == "delete" or command == "deleted":
- task = asyncio.ensure_future(
- self.cluster.delete(op_id, op_params, db_content)
- )
+ task = asyncio.ensure_future(self.cluster.delete(params, order_id))
self.lcm_tasks.register(
"cluster", cluster_id, op_id, "cluster_delete", task
)
return
elif command == "add" or command == "added":
- profile_type = params["profile_type"]
- profile_collection = self.profile_collection_mapping[profile_type]
- db_profile = self.db.get_one(
- profile_collection, {"_id": params["profile_id"]}
- )
- db_profile["profile_type"] = profile_type
- db_content["profile"] = db_profile
task = asyncio.ensure_future(
- self.cluster.attach_profile(op_id, op_params, db_content)
+ self.cluster.attach_profile(params, order_id)
)
self.lcm_tasks.register(
"cluster", cluster_id, op_id, "profile_add", task
)
return
elif command == "remove" or command == "removed":
- profile_type = params["profile_type"]
- profile_collection = self.profile_collection_mapping[profile_type]
- db_profile = self.db.get_one(
- profile_collection, {"_id": params["profile_id"]}
- )
- db_profile["profile_type"] = profile_type
- db_content["profile"] = db_profile
task = asyncio.ensure_future(
- self.cluster.detach_profile(op_id, op_params, db_content)
+ self.cluster.detach_profile(params, order_id)
)
self.lcm_tasks.register(
"cluster", cluster_id, op_id, "profile_remove", task
)
return
elif command == "register" or command == "registered":
- task = asyncio.ensure_future(
- self.cluster.register(op_id, op_params, db_content)
- )
+ task = asyncio.ensure_future(self.cluster.register(params, order_id))
self.lcm_tasks.register(
"cluster", cluster_id, op_id, "cluster_register", task
)
return
elif command == "deregister" or command == "deregistered":
- task = asyncio.ensure_future(
- self.cluster.deregister(op_id, op_params, db_content)
- )
+ task = asyncio.ensure_future(self.cluster.deregister(params, order_id))
self.lcm_tasks.register(
"cluster", cluster_id, op_id, "cluster_deregister", task
)
return
elif command == "get_creds":
- cluster_id = params["cluster_id"]
- op_id = params["operation_id"]
- db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
- task = asyncio.ensure_future(self.cluster.get_creds(op_id, db_cluster))
+ task = asyncio.ensure_future(self.cluster.get_creds(params, order_id))
self.lcm_tasks.register(
"cluster", cluster_id, cluster_id, "cluster_get_credentials", task
)
return
elif command == "upgrade" or command == "scale" or command == "update":
+ cluster_id = params["cluster_id"]
+ op_id = params["operation_id"]
# db_vim = self.db.get_one("vim_accounts", {"_id": db_cluster["vim_account"]})
"""
db_vim = self.db.get_one(
)
db_content["vim_account"] = db_vim
"""
- task = asyncio.ensure_future(
- self.cluster.update(op_id, op_params, db_content)
- )
+ task = asyncio.ensure_future(self.cluster.update(params, order_id))
self.lcm_tasks.register(
"cluster", cluster_id, op_id, "cluster_update", task
)
elif topic == "k8s_app":
op_id = params["operation_id"]
profile_id = params["profile_id"]
- self.k8s_app.initialize_operation(profile_id, op_id)
- db_profile = self.db.get_one("k8sapp", {"_id": profile_id})
- db_profile["profile_type"] = "applications"
- op_params = self.get_operation_params(db_profile, op_id)
if command == "profile_create" or command == "profile_created":
self.logger.debug("Create k8s_app_id = {}".format(profile_id))
- task = asyncio.ensure_future(
- self.k8s_app.create(op_id, op_params, db_profile)
- )
+ task = asyncio.ensure_future(self.k8s_app.create(params, order_id))
self.lcm_tasks.register(
"k8s_app", profile_id, op_id, "k8s_app_create", task
)
return
elif command == "delete" or command == "deleted":
self.logger.debug("Delete k8s_app_id = {}".format(profile_id))
- task = asyncio.ensure_future(
- self.k8s_app.delete(op_id, op_params, db_profile)
- )
+ task = asyncio.ensure_future(self.k8s_app.delete(params, order_id))
self.lcm_tasks.register(
"k8s_app", profile_id, op_id, "k8s_app_delete", task
)
elif topic == "k8s_resource":
op_id = params["operation_id"]
profile_id = params["profile_id"]
- self.k8s_resource.initialize_operation(profile_id, op_id)
- db_profile = self.db.get_one("k8sresource", {"_id": profile_id})
- db_profile["profile_type"] = "managed-resources"
- op_params = self.get_operation_params(db_profile, op_id)
if command == "profile_create" or command == "profile_created":
self.logger.debug("Create k8s_resource_id = {}".format(profile_id))
- task = asyncio.ensure_future(
- self.k8s_resource.create(op_id, op_params, db_profile)
- )
+ task = asyncio.ensure_future(self.k8s_resource.create(params, order_id))
self.lcm_tasks.register(
"k8s_resource",
profile_id,
return
elif command == "delete" or command == "deleted":
self.logger.debug("Delete k8s_resource_id = {}".format(profile_id))
- task = asyncio.ensure_future(
- self.k8s_resource.delete(op_id, op_params, db_profile)
- )
+ task = asyncio.ensure_future(self.k8s_resource.delete(params, order_id))
self.lcm_tasks.register(
"k8s_resource",
profile_id,
elif topic == "k8s_infra_controller":
op_id = params["operation_id"]
profile_id = params["profile_id"]
- self.k8s_infra_controller.initialize_operation(profile_id, op_id)
- db_profile = self.db.get_one("k8sinfra_controller", {"_id": profile_id})
- db_profile["profile_type"] = "infra-controllers"
- op_params = self.get_operation_params(db_profile, op_id)
if command == "profile_create" or command == "profile_created":
self.logger.debug(
"Create k8s_infra_controller_id = {}".format(profile_id)
)
task = asyncio.ensure_future(
- self.k8s_infra_controller.create(op_id, op_params, db_profile)
+ self.k8s_infra_controller.create(params, order_id)
)
self.lcm_tasks.register(
"k8s_infra_controller",
"Delete k8s_infra_controller_id = {}".format(profile_id)
)
task = asyncio.ensure_future(
- self.k8s_infra_controller.delete(op_id, op_params, db_profile)
+ self.k8s_infra_controller.delete(params, order_id)
)
self.lcm_tasks.register(
"k8s_infra_controller",
elif topic == "k8s_infra_config":
op_id = params["operation_id"]
profile_id = params["profile_id"]
- self.k8s_infra_config.initialize_operation(profile_id, op_id)
- db_profile = self.db.get_one("k8sinfra_config", {"_id": profile_id})
- db_profile["profile_type"] = "infra-configs"
- op_params = self.get_operation_params(db_profile, op_id)
if command == "profile_create" or command == "profile_created":
self.logger.debug("Create k8s_infra_config_id = {}".format(profile_id))
task = asyncio.ensure_future(
- self.k8s_infra_config.create(op_id, op_params, db_profile)
+ self.k8s_infra_config.create(params, order_id)
)
self.lcm_tasks.register(
"k8s_infra_config",
elif command == "delete" or command == "deleted":
self.logger.debug("Delete k8s_infra_config_id = {}".format(profile_id))
task = asyncio.ensure_future(
- self.k8s_infra_config.delete(op_id, op_params, db_profile)
+ self.k8s_infra_config.delete(params, order_id)
)
self.lcm_tasks.register(
"k8s_infra_config",