blob: 845a396002ff0b198bd842041a7f925caf4e1b7e [file] [log] [blame]
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = (
"Shrinithi R <shrinithi.r@tataelxsi.co.in>",
"Shahithya Y <shahithya.y@tataelxsi.co.in>",
)
import copy
import logging
from time import time
import traceback
from osm_lcm.lcm_utils import LcmBase
from copy import deepcopy
from osm_lcm import odu_workflows
from osm_lcm import vim_sdn
class GitOpsLcm(LcmBase):
def __init__(self, msg, lcm_tasks, config):
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
self._checkloop_kustomization_timeout = 900
self._checkloop_resource_timeout = 900
self._workflows = {}
super().__init__(msg, self.logger)
async def check_dummy_operation(self, op_id, op_params, content):
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
return True, "OK"
def update_operation_history(
self, content, workflow_status=None, resource_status=None
):
self.logger.info(
f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}"
)
self.logger.debug(f"Content: {content}")
op_id = content["current_operation"]
self.logger.debug("OP_id: {}".format(op_id))
op_num = 0
for operation in content["operationHistory"]:
self.logger.debug("Operations: {}".format(operation))
if operation["op_id"] == op_id:
self.logger.debug("Found operation number: {}".format(op_num))
now = time()
if workflow_status:
content["operationHistory"][op_num]["workflowState"] = "COMPLETED"
content["operationHistory"][op_num]["result"] = True
else:
content["operationHistory"][op_num]["workflowState"] = "ERROR"
content["operationHistory"][op_num]["operationState"] = "FAILED"
content["operationHistory"][op_num]["result"] = False
if resource_status:
content["operationHistory"][op_num]["resourceState"] = "READY"
content["operationHistory"][op_num]["operationState"] = "COMPLETED"
content["operationHistory"][op_num]["result"] = True
else:
content["operationHistory"][op_num]["resourceState"] = "NOT_READY"
content["operationHistory"][op_num]["operationState"] = "FAILED"
content["operationHistory"][op_num]["result"] = False
content["operationHistory"][op_num]["endDate"] = now
break
op_num += 1
self.logger.debug("content: {}".format(content))
return content
async def common_check_list(self, checkings_list, db_collection, db_item):
try:
for checking in checkings_list:
if checking["enable"]:
status, message = await self.odu.readiness_loop(
item=checking["item"],
name=checking["name"],
namespace=checking["namespace"],
flag=checking["flag"],
timeout=checking["timeout"],
)
if not status:
return status, message
else:
db_item["resourceState"] = checking["resourceState"]
db_item = self.update_operation_history(
db_item, "COMPLETED", checking["resourceState"]
)
self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item)
except Exception as e:
self.logger.debug(traceback.format_exc())
self.logger.debug(f"Exception: {e}", exc_info=True)
return False, f"Unexpected exception: {e}"
return True, "OK"
async def check_resource_status(self, key, op_id, op_params, content):
self.logger.info(
f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}"
)
check_resource_function = self._workflows.get(key, {}).get(
"check_resource_function"
)
self.logger.info("check_resource function : {}".format(check_resource_function))
if check_resource_function:
return await check_resource_function(op_id, op_params, content)
else:
return await self.check_dummy_operation(op_id, op_params, content)
def decrypting_key(self, content):
# This deep copy is for to be passed to ODU workflows.
cluster_copy = copy.deepcopy(content)
# decrypting the key
self.db.encrypt_decrypt_fields(
cluster_copy,
"decrypt",
["age_pubkey", "age_privkey"],
schema_version="1.11",
salt=cluster_copy["_id"],
)
db_cluster_copy = {
"cluster": cluster_copy,
}
return db_cluster_copy
class ClusterLcm(GitOpsLcm):
db_collection = "clusters"
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
super().__init__(msg, lcm_tasks, config)
self._workflows = {
"create_cluster": {
"check_resource_function": self.check_create_cluster,
},
}
self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("cluster Create Enter")
db_cluster = content["cluster"]
db_cluster_copy = self.decrypting_key(db_cluster)
# vim account details
db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
db_cluster_copy["vim_account"] = db_vim
_, workflow_name = await self.odu.launch_workflow(
"create_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["state"] = "CREATED"
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["state"] = "FAILED_CREATION"
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
"create_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"create_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
self.update_profile_state(db_cluster, workflow_status, resource_status)
return
async def check_create_cluster(self, op_id, op_params, content):
self.logger.info(
f"check_create_cluster Operation {op_id}. Params: {op_params}."
)
# self.logger.debug(f"Content: {content}")
db_cluster = content["cluster"]
cluster_name = db_cluster["git_name"].lower()
cluster_kustomization_name = cluster_name
db_vim_account = content["vim_account"]
cloud_type = db_vim_account["vim_type"]
nodepool_name = ""
if cloud_type == "aws":
nodepool_name = f"{cluster_name}-nodegroup"
cluster_name = f"{cluster_name}-cluster"
elif cloud_type == "gcp":
nodepool_name = f"nodepool-{cluster_name}"
bootstrap = op_params.get("bootstrap", True)
if cloud_type in ("azure", "gcp", "aws"):
checkings_list = [
{
"item": "kustomization",
"name": cluster_kustomization_name,
"namespace": "managed-resources",
"flag": "Ready",
"timeout": self._checkloop_kustomization_timeout,
"enable": True,
"resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
},
{
"item": f"cluster_{cloud_type}",
"name": cluster_name,
"namespace": "",
"flag": "Synced",
"timeout": self._checkloop_resource_timeout,
"enable": True,
"resourceState": "IN_PROGRESS.RESOURCE_SYNCED.CLUSTER",
},
{
"item": f"cluster_{cloud_type}",
"name": cluster_name,
"namespace": "",
"flag": "Ready",
"timeout": self._checkloop_resource_timeout,
"enable": True,
"resourceState": "IN_PROGRESS.RESOURCE_READY.CLUSTER",
},
{
"item": "kustomization",
"name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
"namespace": "managed-resources",
"flag": "Ready",
"timeout": self._checkloop_kustomization_timeout,
"enable": bootstrap,
"resourceState": "IN_PROGRESS.BOOTSTRAP_OK",
},
]
else:
return False, "Not suitable VIM account to check cluster status"
if nodepool_name:
nodepool_check = {
"item": f"nodepool_{cloud_type}",
"name": nodepool_name,
"namespace": "",
"flag": "Ready",
"timeout": self._checkloop_resource_timeout,
"enable": True,
"resourceState": "IN_PROGRESS.RESOURCE_READY.NODEPOOL",
}
checkings_list.insert(3, nodepool_check)
return await self.common_check_list(checkings_list, "clusters", db_cluster)
def update_profile_state(self, db_cluster, workflow_status, resource_status):
profiles = [
"infra_controller_profiles",
"infra_config_profiles",
"app_profiles",
"resource_profiles",
]
profiles_collection = {
"infra_controller_profiles": "k8sinfra_controller",
"infra_config_profiles": "k8sinfra_config",
"app_profiles": "k8sapp",
"resource_profiles": "k8sresource",
}
for profile_type in profiles:
profile_id = db_cluster[profile_type]
self.logger.info("profile id is : {}".format(profile_id))
db_collection = profiles_collection[profile_type]
self.logger.info("the db_collection is :{}".format(db_collection))
db_profile = self.db.get_one(db_collection, {"_id": profile_id})
self.logger.info("the db_profile is :{}".format(db_profile))
db_profile["state"] = db_cluster["state"]
db_profile["resourceState"] = db_cluster["resourceState"]
db_profile["operatingState"] = db_cluster["operatingState"]
db_profile["age_pubkey"] = db_cluster["age_pubkey"]
db_profile["age_privkey"] = db_profile["age_privkey"]
db_profile = self.update_operation_history(
db_profile, workflow_status, resource_status
)
self.logger.info("the db_profile is :{}".format(db_profile))
self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile)
async def delete(self, op_id, op_params, content):
self.logger.info("cluster delete Enter")
db_cluster = content["cluster"]
_, workflow_name = await self.odu.launch_workflow(
"delete_cluster", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["state"] = "DELETED"
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["state"] = "FAILED_DELETION"
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"delete_cluster", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
# To delete it from DB
if db_cluster["state"] == "DELETED":
self.delete_cluster(db_cluster)
return
def delete_cluster(self, db_cluster):
# Actually, item_content is equal to db_cluster
# item_content = self.db.get_one("clusters", {"_id": db_cluster["_id"]})
# self.logger.debug("item_content is : {}".format(item_content))
# detach profiles
update_dict = None
profiles_to_detach = [
"infra_controller_profiles",
"infra_config_profiles",
"app_profiles",
"resource_profiles",
]
profiles_collection = {
"infra_controller_profiles": "k8sinfra_controller",
"infra_config_profiles": "k8sinfra_config",
"app_profiles": "k8sapp",
"resource_profiles": "k8sresource",
}
for profile_type in profiles_to_detach:
if db_cluster.get(profile_type):
self.logger.debug("the profile_type is :{}".format(profile_type))
profile_ids = db_cluster[profile_type]
self.logger.debug("the profile_ids is :{}".format(profile_ids))
profile_ids_copy = deepcopy(profile_ids)
self.logger.debug(
"the profile_ids_copy is :{}".format(profile_ids_copy)
)
for profile_id in profile_ids_copy:
self.logger.debug("the profile_id is :{}".format(profile_id))
db_collection = profiles_collection[profile_type]
self.logger.debug("the db_collection is :{}".format(db_collection))
db_profile = self.db.get_one(db_collection, {"_id": profile_id})
self.logger.debug("the db_profile is :{}".format(db_profile))
self.logger.debug(
"the item_content name is :{}".format(db_cluster["name"])
)
self.logger.debug(
"the db_profile name is :{}".format(db_profile["name"])
)
if db_cluster["name"] == db_profile["name"]:
self.logger.debug("it is getting into if default")
self.db.del_one(db_collection, {"_id": profile_id})
else:
self.logger.debug("it is getting into else non default")
profile_ids.remove(profile_id)
update_dict = {profile_type: profile_ids}
self.logger.debug(f"the update dict is :{update_dict}")
self.db.set_one(
"clusters", {"_id": db_cluster["_id"]}, update_dict
)
self.db.del_one("clusters", {"_id": db_cluster["_id"]})
self.logger.debug("the id is :{}".format(db_cluster["_id"]))
async def attach_profile(self, op_id, op_params, content):
self.logger.info("profile attach Enter")
db_cluster = content["cluster"]
db_profile = content["profile"]
profile_type = db_profile["profile_type"]
profile_id = db_profile["_id"]
self.logger.info("profile type is : {}".format(profile_type))
self.logger.info("profile id is : {}".format(profile_id))
_, workflow_name = await self.odu.launch_workflow(
"attach_profile_to_cluster", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"attach_profile_to_cluster", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
profile_list = db_cluster[profile_type]
self.logger.info("profile list is : {}".format(profile_list))
if resource_status:
self.logger.info("it is getting into resource status true")
profile_list.append(profile_id)
self.logger.info("profile list is : {}".format(profile_list))
db_cluster[profile_type] = profile_list
self.logger.info("db cluster is : {}".format(db_cluster))
# update_dict = {item: profile_list}
# self.logger.info("the update_dict is :{}".format(update_dict))
# self.db.set_one(self.topic, filter_q, update_dict)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
async def detach_profile(self, op_id, op_params, content):
self.logger.info("profile dettach Enter")
db_cluster = content["cluster"]
db_profile = content["profile"]
profile_type = db_profile["profile_type"]
profile_id = db_profile["_id"]
self.logger.info("profile type is : {}".format(profile_type))
self.logger.info("profile id is : {}".format(profile_id))
_, workflow_name = await self.odu.launch_workflow(
"detach_profile_from_cluster", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"detach_profile_from_cluster", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
profile_list = db_cluster[profile_type]
self.logger.info("profile list is : {}".format(profile_list))
if resource_status:
self.logger.info("it is getting into resource status true")
profile_list.remove(profile_id)
self.logger.info("profile list is : {}".format(profile_list))
db_cluster[profile_type] = profile_list
self.logger.info("db cluster is : {}".format(db_cluster))
# update_dict = {item: profile_list}
# self.logger.info("the update_dict is :{}".format(update_dict))
# self.db.set_one(self.topic, filter_q, update_dict)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
async def register(self, op_id, op_params, content):
self.logger.info("cluster register enter")
db_cluster = content["cluster"]
db_cluster_copy = self.decrypting_key(db_cluster)
_, workflow_name = await self.odu.launch_workflow(
"register_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["state"] = "CREATED"
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["state"] = "FAILED_CREATION"
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
"register_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"register_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
async def deregister(self, op_id, op_params, content):
self.logger.info("cluster deregister enter")
db_cluster = content["cluster"]
self.logger.info("db_cluster is : {}".format(db_cluster))
_, workflow_name = await self.odu.launch_workflow(
"deregister_cluster", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["state"] = "DELETED"
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["state"] = "FAILED_DELETION"
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
# Clean items used in the workflow or in the cluster, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
"deregister_cluster", op_id, op_params, content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"deregister_cluster", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
# To delete it from DB
if db_cluster["state"] == "DELETED":
self.db.del_one("clusters", {"_id": db_cluster["_id"]})
return
async def get_creds(self, op_id, db_cluster):
self.logger.info("Cluster get creds Enter")
result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
if result:
db_cluster["credentials"] = cluster_creds
op_len = 0
for operations in db_cluster["operationHistory"]:
if operations["op_id"] == op_id:
db_cluster["operationHistory"][op_len]["result"] = result
db_cluster["operationHistory"][op_len]["endDate"] = time()
op_len += 1
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
async def update(self, op_id, op_params, content):
self.logger.info("Cluster update Enter")
db_cluster = content["cluster"]
db_cluster_copy = self.decrypting_key(db_cluster)
# vim account details
db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
db_cluster_copy["vim_account"] = db_vim
_, workflow_name = await self.odu.launch_workflow(
"update_cluster", op_id, op_params, db_cluster_copy
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
# self.logger.info("Db content: {}".format(db_content))
# self.db.set_one(self.db_collection, {"_id": _id}, db_cluster)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
"update_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"update_cluster", op_id, op_params, db_cluster_copy
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
# self.logger.info("db_cluster: {}".format(db_cluster))
# TODO: verify enxtcondition
# For the moment, if the workflow completed successfully, then we update the db accordingly.
if workflow_status:
if "k8s_version" in op_params:
db_cluster["k8s_version"] = op_params["k8s_version"]
elif "node_count" in op_params:
db_cluster["node_count"] = op_params["node_count"]
# self.db.set_one(self.db_collection, {"_id": _id}, db_content)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
class CloudCredentialsLcm(GitOpsLcm):
db_collection = "vim_accounts"
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
super().__init__(msg, lcm_tasks, config)
async def add(self, op_id, op_params, content):
self.logger.info("Cloud Credentials create")
_, workflow_name = await self.odu.launch_workflow(
"create_cloud_credentials", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
)
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
"create_cloud_credentials", op_id, op_params, content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"create_cloud_credentials", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
content["_admin"]["operationalState"] = "ENABLED"
for operation in content["_admin"]["operations"]:
if operation["lcmOperationType"] == "create":
operation["operationState"] = "ENABLED"
self.logger.info("Content : {}".format(content))
self.db.set_one("vim_accounts", {"_id": content["_id"]}, content)
return
async def edit(self, op_id, op_params, content):
_, workflow_name = await self.odu.launch_workflow(
"update_cloud_credentials", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
)
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
"update_cloud_credentials", op_id, op_params, content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"update_cloud_credentials", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
return
async def remove(self, op_id, op_params, content):
self.logger.info("Cloud Credentials delete")
_, workflow_name = await self.odu.launch_workflow(
"delete_cloud_credentials", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"delete_cloud_credentials", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
self.db.del_one(self.db_collection, {"_id": content["_id"]})
return
class K8sAppLcm(GitOpsLcm):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("App Create Enter")
_, workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "CREATED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_CREATION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
return
async def delete(self, op_id, op_params, content):
self.logger.info("App delete Enter")
_, workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "DELETED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_DELETION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
# To delete it from DB
if content["state"] == "DELETED":
self.db.del_one("k8sapp", {"_id": content["_id"]})
return
class K8sResourceLcm(GitOpsLcm):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("Resource Create Enter")
_, workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "CREATED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_CREATION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
return
async def delete(self, op_id, op_params, content):
self.logger.info("Resource delete Enter")
content = self.db.get_one("k8sresource", {"_id": content["_id"]})
_, workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "DELETED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_DELETION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
# To delete it from DB
if content["state"] == "DELETED":
self.db.del_one("k8sresource", {"_id": content["_id"]})
return
class K8sInfraControllerLcm(GitOpsLcm):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("Infra controller Create Enter")
_, workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "CREATED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_CREATION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
return
async def delete(self, op_id, op_params, content):
self.logger.info("Infra controller delete Enter")
_, workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "DELETED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_DELETION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
# To delete it from DB
if content["state"] == "DELETED":
self.db.del_one("k8sinfra_controller", {"_id": content["_id"]})
return
class K8sInfraConfigLcm(GitOpsLcm):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("Infra config Create Enter")
_, workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "CREATED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_CREATION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
return
async def delete(self, op_id, op_params, content):
self.logger.info("Infra config delete Enter")
_, workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "DELETED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_DELETION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
resource_status, resource_msg = await self.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
# To delete it from DB
if content["state"] == "DELETED":
self.db.del_one("k8sinfra_config", {"_id": content["_id"]})
return
class OkaLcm(GitOpsLcm):
db_collection = "okas"
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("OKA Create Enter")
db_content = content
_, workflow_name = await self.odu.launch_workflow(
"create_oka", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_content["state"] = "CREATED"
db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_content["state"] = "FAILED_CREATION"
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(db_content, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"create_oka", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_content["resourceState"] = "READY"
else:
db_content["resourceState"] = "ERROR"
# self.logger.info("Db content: {}".format(db_content))
db_content = self.update_operation_history(
db_content, workflow_status, resource_status
)
db_content["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
return
async def edit(self, op_id, op_params, content):
self.logger.info("OKA Edit Enter")
db_content = content
_, workflow_name = await self.odu.launch_workflow(
"update_oka", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(db_content, workflow_status, None)
# self.logger.info("Db content: {}".format(db_content))
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"update_oka", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_content["resourceState"] = "READY"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(
db_content, workflow_status, resource_status
)
db_content["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
return
async def delete(self, op_id, op_params, content):
self.logger.info("OKA delete Enter")
db_content = content
_, workflow_name = await self.odu.launch_workflow(
"delete_oka", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_content["state"] = "DELETED"
db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_content["state"] = "FAILED_DELETION"
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(db_content, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"delete_oka", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_content["resourceState"] = "READY"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(
db_content, workflow_status, resource_status
)
db_content["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if db_content["state"] == "DELETED":
self.db.del_one(self.db_collection, {"_id": db_content["_id"]})
return
class KsuLcm(GitOpsLcm):
db_collection = "ksus"
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("ksu Create Enter")
_, workflow_name = await self.odu.launch_workflow(
"create_ksus", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
for db_ksu in content:
if workflow_status:
db_ksu["state"] = "CREATED"
db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_ksu["state"] = "FAILED_CREATION"
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(db_ksu, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
"create_ksus", op_id, op_params, content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"create_ksus", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
for db_ksu in content:
if resource_status:
db_ksu["resourceState"] = "READY"
else:
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(
db_ksu, workflow_status, resource_status
)
for db_ksu in content:
db_ksu["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
return
async def edit(self, op_id, op_params, content):
self.logger.info("ksu edit Enter")
_, workflow_name = await self.odu.launch_workflow(
"update_ksus", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
for db_ksu in content:
if workflow_status:
db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(db_ksu, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
# Clean items used in the workflow, no matter if the workflow succeeded
clean_status, clean_msg = await self.odu.clean_items_workflow(
"create_ksus", op_id, op_params, content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"update_ksus", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
for db_ksu in content:
if resource_status:
db_ksu["resourceState"] = "READY"
else:
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(
db_ksu, workflow_status, resource_status
)
for db_ksu, ksu_params in zip(content, op_params):
db_ksu["operatingState"] = "IDLE"
if workflow_status:
db_ksu["name"] = ksu_params["name"]
db_ksu["description"] = ksu_params["description"]
db_ksu["profile"]["profile_type"] = ksu_params["profile"][
"profile_type"
]
db_ksu["profile"]["_id"] = ksu_params["profile"]["_id"]
db_ksu["oka"] = ksu_params["oka"]
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
return
async def delete(self, op_id, op_params, content):
self.logger.info("ksu delete Enter")
_, workflow_name = await self.odu.launch_workflow(
"delete_ksus", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
for db_ksu in content:
if workflow_status:
db_ksu["state"] = "DELETED"
db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_ksu["state"] = "FAILED_DELETION"
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(db_ksu, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"delete_ksus", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
for db_ksu in content:
if resource_status:
db_ksu["resourceState"] = "READY"
else:
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(
db_ksu, workflow_status, resource_status
)
for db_ksu in content:
db_ksu["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
if db_ksu["state"] == "DELETED":
self.db.del_one(self.db_collection, {"_id": db_ksu["_id"]})
return
async def clone(self, op_id, op_params, db_content):
self.logger.info("ksu clone Enter")
_, workflow_name = await self.odu.launch_workflow(
"clone_ksus", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(db_content, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"clone_ksus", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_content["resourceState"] = "READY"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(
db_content, workflow_status, resource_status
)
db_content["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
return
async def move(self, op_id, op_params, db_content):
self.logger.info("ksu move Enter")
_, workflow_name = await self.odu.launch_workflow(
"move_ksus", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(db_content, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
resource_status, resource_msg = await self.check_resource_status(
"move_ksus", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_content["resourceState"] = "READY"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(
db_content, workflow_status, resource_status
)
db_content["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
return