blob: c48228e0290ca85cce48ab420588b04a5ca9d1c9 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = (
"Shrinithi R <shrinithi.r@tataelxsi.co.in>",
"Shahithya Y <shahithya.y@tataelxsi.co.in>",
)
import logging
from osm_lcm.lcm_utils import LcmBase
from copy import deepcopy
from osm_lcm import odu_workflows
from osm_lcm import vim_sdn
class ClusterLcm(LcmBase):
db_collection = "clusters"
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
super().__init__(msg, self.logger)
async def create(self, op_id, op_params, content):
self.logger.info("cluster Create Enter")
db_cluster = content["cluster"]
workflow_name = await self.odu.launch_workflow(
"create_cluster", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["state"] = "CREATED"
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["state"] = "FAILED_CREATION"
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"create_cluster", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
self.update_profile_state(db_cluster, workflow_status, resource_status)
return
def update_profile_state(self, db_cluster, workflow_status, resource_status):
profiles = [
"infra_controller_profiles",
"infra_config_profiles",
"app_profiles",
"resource_profiles",
]
profiles_collection = {
"infra_controller_profiles": "k8sinfra_controller",
"infra_config_profiles": "k8sinfra_config",
"app_profiles": "k8sapp",
"resource_profiles": "k8sresource",
}
for profile_type in profiles:
profile_id = db_cluster[profile_type]
self.logger.info("profile id is : {}".format(profile_id))
db_collection = profiles_collection[profile_type]
self.logger.info("the db_collection is :{}".format(db_collection))
db_profile = self.db.get_one(db_collection, {"_id": profile_id})
self.logger.info("the db_profile is :{}".format(db_profile))
db_profile["state"] = db_cluster["state"]
db_profile["resourceState"] = db_cluster["resourceState"]
db_profile["operatingState"] = db_cluster["operatingState"]
db_profile = self.update_operation_history(
db_profile, workflow_status, resource_status
)
self.logger.info("the db_profile is :{}".format(db_profile))
self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile)
async def delete(self, op_id, op_params, content):
self.logger.info("cluster delete Enter")
db_cluster = content["cluster"]
workflow_name = await self.odu.launch_workflow(
"delete_cluster", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["state"] = "DELETED"
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["state"] = "FAILED_DELETION"
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"delete_cluster", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
# To delete it from DB
if db_cluster["state"] == "DELETED":
self.delete_cluster(db_cluster)
return
def delete_cluster(self, db_cluster):
# Actually, item_content is equal to db_cluster
# item_content = self.db.get_one("clusters", {"_id": db_cluster["_id"]})
# self.logger.debug("item_content is : {}".format(item_content))
# detach profiles
update_dict = None
profiles_to_detach = [
"infra_controller_profiles",
"infra_config_profiles",
"app_profiles",
"resource_profiles",
]
profiles_collection = {
"infra_controller_profiles": "k8sinfra_controller",
"infra_config_profiles": "k8sinfra_config",
"app_profiles": "k8sapp",
"resource_profiles": "k8sresource",
}
for profile_type in profiles_to_detach:
if db_cluster.get(profile_type):
self.logger.info("the profile_type is :{}".format(profile_type))
profile_ids = db_cluster[profile_type]
self.logger.info("the profile_ids is :{}".format(profile_ids))
profile_ids_copy = deepcopy(profile_ids)
self.logger.info("the profile_ids_copy is :{}".format(profile_ids_copy))
for profile_id in profile_ids_copy:
self.logger.info("the profile_id is :{}".format(profile_id))
db_collection = profiles_collection[profile_type]
self.logger.info("the db_collection is :{}".format(db_collection))
db_profile = self.db.get_one(db_collection, {"_id": profile_id})
self.logger.info("the db_profile is :{}".format(db_profile))
self.logger.info(
"the item_content name is :{}".format(db_cluster["name"])
)
self.logger.info(
"the db_profile name is :{}".format(db_profile["name"])
)
if db_cluster["name"] == db_profile["name"]:
self.logger.info("it is getting into if default")
self.db.del_one(db_collection, {"_id": profile_id})
else:
self.logger.info("it is getting into else non default")
profile_ids.remove(profile_id)
update_dict = {profile_type: profile_ids}
self.logger.info(f"the update dict is :{update_dict}")
self.db.set_one(
"clusters", {"_id": db_cluster["_id"]}, update_dict
)
self.db.del_one("clusters", {"_id": db_cluster["_id"]})
self.logger.info("the id is :{}".format(db_cluster["_id"]))
async def attach_profile(self, op_id, op_params, content):
self.logger.info("profile attach Enter")
db_cluster = content["cluster"]
db_profile = content["profile"]
profile_type = db_profile["profile_type"]
profile_id = db_profile["_id"]
self.logger.info("profile type is : {}".format(profile_type))
self.logger.info("profile id is : {}".format(profile_id))
workflow_name = await self.odu.launch_workflow(
"attach_profile_to_cluster", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"attach_profile_to_cluster", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
profile_list = db_cluster[profile_type]
self.logger.info("profile list is : {}".format(profile_list))
if resource_status:
self.logger.info("it is getting into resource status true")
profile_list.append(profile_id)
self.logger.info("profile list is : {}".format(profile_list))
db_cluster[profile_type] = profile_list
self.logger.info("db cluster is : {}".format(db_cluster))
# update_dict = {item: profile_list}
# self.logger.info("the update_dict is :{}".format(update_dict))
# self.db.set_one(self.topic, filter_q, update_dict)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
async def detach_profile(self, op_id, op_params, content):
self.logger.info("profile dettach Enter")
db_cluster = content["cluster"]
db_profile = content["profile"]
profile_type = db_profile["profile_type"]
profile_id = db_profile["_id"]
self.logger.info("profile type is : {}".format(profile_type))
self.logger.info("profile id is : {}".format(profile_id))
workflow_name = await self.odu.launch_workflow(
"detach_profile_from_cluster", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"detach_profile_from_cluster", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
profile_list = db_cluster[profile_type]
self.logger.info("profile list is : {}".format(profile_list))
if resource_status:
self.logger.info("it is getting into resource status true")
profile_list.remove(profile_id)
self.logger.info("profile list is : {}".format(profile_list))
db_cluster[profile_type] = profile_list
self.logger.info("db cluster is : {}".format(db_cluster))
# update_dict = {item: profile_list}
# self.logger.info("the update_dict is :{}".format(update_dict))
# self.db.set_one(self.topic, filter_q, update_dict)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
async def register(self, op_id, op_params, content):
self.logger.info("cluster register enter")
db_cluster = content["cluster"]
workflow_name = await self.odu.launch_workflow(
"register_cluster", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["state"] = "CREATED"
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["state"] = "FAILED_CREATION"
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"register_cluster", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
self.update_profile_state(db_cluster, workflow_status, resource_status)
return
async def deregister(self, op_id, op_params, content):
self.logger.info("cluster deregister enter")
db_cluster = content["cluster"]
self.logger.info("db_cluster is : {}".format(db_cluster))
workflow_name = await self.odu.launch_workflow(
"deregister_cluster", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["state"] = "DELETED"
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["state"] = "FAILED_DELETION"
db_cluster["resourceState"] = "ERROR"
# has to call update_operation_history return content
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"deregister_cluster", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
# To delete it from DB
if db_cluster["state"] == "DELETED":
self.db.del_one("clusters", {"_id": db_cluster["_id"]})
return
async def get_creds(self, db_cluster):
self.logger.info("Cluster get creds Enter")
result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
if result:
db_cluster["credentials"] = cluster_creds
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
async def update(self, op_id, op_params, content):
self.logger.info("Cluster update Enter")
db_cluster = content["cluster"]
workflow_name = await self.odu.launch_workflow(
"update_cluster", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
# self.logger.info("Db content: {}".format(db_content))
# self.db.set_one(self.db_collection, {"_id": _id}, db_cluster)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"update_cluster", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_cluster["resourceState"] = "READY"
else:
db_cluster["resourceState"] = "ERROR"
db_cluster["operatingState"] = "IDLE"
db_cluster = self.update_operation_history(
db_cluster, workflow_status, resource_status
)
# self.logger.info("db_cluster: {}".format(db_cluster))
# TODO: verify enxtcondition
# For the moment, if the workflow completed successfully, then we update the db accordingly.
if workflow_status:
if "k8s_version" in op_params:
db_cluster["k8s_version"] = op_params["k8s_version"]
elif "node_count" in op_params:
db_cluster["node_count"] = op_params["node_count"]
# self.db.set_one(self.db_collection, {"_id": _id}, db_content)
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
class CloudCredentialsLcm(LcmBase):
db_collection = "vim_accounts"
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
super().__init__(msg, self.logger)
async def add(self, op_id, op_params, content):
self.logger.info("Cloud Credentials create")
workflow_name = await self.odu.launch_workflow(
"create_cloud_credentials", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"create_cloud_credentials", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
return
async def edit(self, op_id, op_params, content):
workflow_name = await self.odu.launch_workflow(
"update_cloud_credentials", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"update_cloud_credentials", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
return
async def remove(self, op_id, op_params, content):
self.logger.info("Cloud Credentials delete")
workflow_name = await self.odu.launch_workflow(
"delete_cloud_credentials", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg)
)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"delete_cloud_credentials", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
self.db.del_one(self.db_collection, {"_id": content["_id"]})
return
class K8sAppLcm(LcmBase):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
super().__init__(msg, self.logger)
async def create(self, op_id, op_params, content):
self.logger.info("App Create Enter")
workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "CREATED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_CREATION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
return
async def delete(self, op_id, op_params, content):
self.logger.info("App delete Enter")
workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "DELETED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_DELETION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
# To delete it from DB
if content["state"] == "DELETED":
self.db.del_one("k8sapp", {"_id": content["_id"]})
return
class K8sResourceLcm(LcmBase):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
super().__init__(msg, self.logger)
async def create(self, op_id, op_params, content):
self.logger.info("Resource Create Enter")
workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "CREATED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_CREATION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
return
async def delete(self, op_id, op_params, content):
self.logger.info("Resource delete Enter")
content = self.db.get_one("k8sresource", {"_id": content["_id"]})
workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "DELETED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_DELETION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
# To delete it from DB
if content["state"] == "DELETED":
self.db.del_one("k8sresource", {"_id": content["_id"]})
return
class K8sInfraControllerLcm(LcmBase):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
super().__init__(msg, self.logger)
async def create(self, op_id, op_params, content):
self.logger.info("Infra controller Create Enter")
workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "CREATED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_CREATION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
return
async def delete(self, op_id, op_params, content):
self.logger.info("Infra controller delete Enter")
workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "DELETED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_DELETION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
# To delete it from DB
if content["state"] == "DELETED":
self.db.del_one("k8sinfra_controller", {"_id": content["_id"]})
return
class K8sInfraConfigLcm(LcmBase):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
super().__init__(msg, self.logger)
async def create(self, op_id, op_params, content):
self.logger.info("Infra config Create Enter")
workflow_name = await self.odu.launch_workflow(
"create_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "CREATED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_CREATION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
return
async def delete(self, op_id, op_params, content):
self.logger.info("Infra config delete Enter")
workflow_name = await self.odu.launch_workflow(
"delete_profile", op_id, op_params, content
)
self.logger.info("workflow_name is :{}".format(workflow_name))
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"workflow_status is :{} and workflow_msg is :{}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
content["state"] = "DELETED"
content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
content["state"] = "FAILED_DELETION"
content["resourceState"] = "ERROR"
# has to call update_operation_history return content
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
resource_status, resource_msg = await self.odu.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
"resource_status is :{} and resource_msg is :{}".format(
resource_status, resource_msg
)
)
if resource_status:
content["resourceState"] = "READY"
else:
content["resourceState"] = "ERROR"
content["operatingState"] = "IDLE"
content = self.update_operation_history(
content, workflow_status, resource_status
)
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
# To delete it from DB
if content["state"] == "DELETED":
self.db.del_one("k8sinfra_config", {"_id": content["_id"]})
return
class OkaLcm(LcmBase):
db_collection = "okas"
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
super().__init__(msg, self.logger)
async def create(self, op_id, op_params, content):
self.logger.info("OKA Create Enter")
db_content = content
workflow_name = await self.odu.launch_workflow(
"create_oka", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_content["state"] = "CREATED"
db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_content["state"] = "FAILED_CREATION"
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(db_content, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"create_oka", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_content["resourceState"] = "READY"
else:
db_content["resourceState"] = "ERROR"
# self.logger.info("Db content: {}".format(db_content))
db_content = self.update_operation_history(
db_content, workflow_status, resource_status
)
db_content["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
return
async def edit(self, op_id, op_params, content):
self.logger.info("OKA Edit Enter")
db_content = content
workflow_name = await self.odu.launch_workflow(
"update_oka", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(db_content, workflow_status, None)
# self.logger.info("Db content: {}".format(db_content))
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"update_oka", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_content["resourceState"] = "READY"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(
db_content, workflow_status, resource_status
)
db_content["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
return
async def delete(self, op_id, op_params, content):
self.logger.info("OKA delete Enter")
db_content = content
workflow_name = await self.odu.launch_workflow(
"delete_oka", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_content["state"] = "DELETED"
db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_content["state"] = "FAILED_DELETION"
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(db_content, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"delete_oka", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_content["resourceState"] = "READY"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(
db_content, workflow_status, resource_status
)
db_content["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if db_content["state"] == "DELETED":
self.db.del_one(self.db_collection, {"_id": db_content["_id"]})
return
class KsuLcm(LcmBase):
db_collection = "ksus"
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
super().__init__(msg, self.logger)
async def create(self, op_id, op_params, content):
self.logger.info("ksu Create Enter")
workflow_name = await self.odu.launch_workflow(
"create_ksus", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
for db_ksu in content:
if workflow_status:
db_ksu["state"] = "CREATED"
db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_ksu["state"] = "FAILED_CREATION"
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(db_ksu, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"create_ksus", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
for db_ksu in content:
if resource_status:
db_ksu["resourceState"] = "READY"
else:
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(
db_ksu, workflow_status, resource_status
)
for db_ksu in content:
db_ksu["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
return
async def edit(self, op_id, op_params, content):
self.logger.info("ksu edit Enter")
workflow_name = await self.odu.launch_workflow(
"update_ksus", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
for db_ksu in content:
if workflow_status:
db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(db_ksu, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"update_ksus", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
for db_ksu in content:
if resource_status:
db_ksu["resourceState"] = "READY"
else:
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(
db_ksu, workflow_status, resource_status
)
for db_ksu, ksu_params in zip(content, op_params):
db_ksu["operatingState"] = "IDLE"
if workflow_status:
db_ksu["name"] = ksu_params["name"]
db_ksu["description"] = ksu_params["description"]
db_ksu["profile"]["profile_type"] = ksu_params["profile"][
"profile_type"
]
db_ksu["profile"]["_id"] = ksu_params["profile"]["_id"]
db_ksu["oka"] = ksu_params["oka"]
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
return
async def delete(self, op_id, op_params, content):
self.logger.info("ksu delete Enter")
workflow_name = await self.odu.launch_workflow(
"delete_ksus", op_id, op_params, content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
for db_ksu in content:
if workflow_status:
db_ksu["state"] = "DELETED"
db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_ksu["state"] = "FAILED_DELETION"
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(db_ksu, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"delete_ksus", op_id, op_params, content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
for db_ksu in content:
if resource_status:
db_ksu["resourceState"] = "READY"
else:
db_ksu["resourceState"] = "ERROR"
db_ksu = self.update_operation_history(
db_ksu, workflow_status, resource_status
)
for db_ksu in content:
db_ksu["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
if db_ksu["state"] == "DELETED":
self.db.del_one(self.db_collection, {"_id": db_ksu["_id"]})
return
async def clone(self, op_id, op_params, db_content):
self.logger.info("ksu clone Enter")
workflow_name = await self.odu.launch_workflow(
"clone_ksus", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(db_content, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"clone_ksus", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_content["resourceState"] = "READY"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(
db_content, workflow_status, resource_status
)
db_content["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
return
async def move(self, op_id, op_params, db_content):
self.logger.info("ksu move Enter")
workflow_name = await self.odu.launch_workflow(
"move_ksus", op_id, op_params, db_content
)
workflow_status, workflow_msg = await self.odu.check_workflow_status(
workflow_name
)
self.logger.info(
"Workflow Status: {} Workflow Message: {}".format(
workflow_status, workflow_msg
)
)
if workflow_status:
db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(db_content, workflow_status, None)
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
resource_status, resource_msg = await self.odu.check_resource_status(
"move_ksus", op_id, op_params, db_content
)
self.logger.info(
"Resource Status: {} Resource Message: {}".format(
resource_status, resource_msg
)
)
if resource_status:
db_content["resourceState"] = "READY"
else:
db_content["resourceState"] = "ERROR"
db_content = self.update_operation_history(
db_content, workflow_status, resource_status
)
db_content["operatingState"] = "IDLE"
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
return