From: Patricia Reinoso Date: Wed, 5 Apr 2023 15:35:48 +0000 (+0000) Subject: NS Instantiate Workflow and Activities X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F20%2F13120%2F13;p=osm%2FLCM.git NS Instantiate Workflow and Activities Change-Id: I446e9ec5a98724e23aad273f3d1fafa98c8272c7 Signed-off-by: Patricia Reinoso Signed-off-by: Mark Beierl --- diff --git a/osm_lcm/nglcm.py b/osm_lcm/nglcm.py index 41f7671..4c21d6e 100644 --- a/osm_lcm/nglcm.py +++ b/osm_lcm/nglcm.py @@ -42,6 +42,8 @@ from osm_lcm.temporal.vim_workflows import ( from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations +from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow +from osm_lcm.temporal.ns_activities import NsOperations, NsDbActivity from temporalio.client import Client from temporalio.worker import Worker @@ -131,13 +133,17 @@ class NGLcm: f"{self.main_config.temporal.host}:{str(self.main_config.temporal.port)}" ) client = await Client.connect(temporal_api) - vim_data_activity_instance = VimDbActivity(self.db) - paas_connector_instance = JujuPaasConnector(self.db) + + ns_operation_instance = NsOperations(self.db) + ns_data_activity_instance = NsDbActivity(self.db) nslcm_activity_instance = NsLcmActivity(self.db) - vnf_operation_instance = VnfOperations(self.db) + paas_connector_instance = JujuPaasConnector(self.db) + vim_data_activity_instance = VimDbActivity(self.db) vnf_data_activity_instance = VnfDbActivity(self.db) + vnf_operation_instance = VnfOperations(self.db) workflows = [ + NsInstantiateWorkflow, NsNoOpWorkflow, VimCreateWorkflow, VimDeleteWorkflow, @@ -146,15 +152,19 @@ class NGLcm: VnfInstantiateWorkflow, ] activities = [ - vim_data_activity_instance.update_vim_operation_state, - vim_data_activity_instance.update_vim_state, - vim_data_activity_instance.delete_vim_record, + ns_data_activity_instance.prepare_vnf_records, + ns_data_activity_instance.update_ns_state, + ns_operation_instance.check_ns_instantiate_finished, + ns_operation_instance.deploy_ns, nslcm_activity_instance.update_ns_lcm_operation_state, nslcm_activity_instance.no_op, - paas_connector_instance.test_vim_connectivity, paas_connector_instance.create_model_if_doesnt_exist, paas_connector_instance.deploy_charm, paas_connector_instance.check_charm_status, + paas_connector_instance.test_vim_connectivity, + vim_data_activity_instance.update_vim_operation_state, + vim_data_activity_instance.update_vim_state, + vim_data_activity_instance.delete_vim_record, vnf_operation_instance.get_task_queue, vnf_data_activity_instance.change_nf_state, vnf_data_activity_instance.change_nf_instantiation_state, diff --git a/osm_lcm/temporal/juju_paas_activities.py b/osm_lcm/temporal/juju_paas_activities.py index 7591883..ecd59e6 100644 --- a/osm_lcm/temporal/juju_paas_activities.py +++ b/osm_lcm/temporal/juju_paas_activities.py @@ -120,6 +120,7 @@ class JujuPaasConnector: async def create_model_if_doesnt_exist( self, create_model_input: CreateModelInput ) -> None: + # TODO: OSM-991 """Connects to Juju Controller. Create a new model if model_name does not exist Collaborators: @@ -202,6 +203,7 @@ class JujuPaasConnector: back-off strategy for this activity, as it will naturally block and wait on each connection attempt. """ + # TODO: Implement OSM-986 pass diff --git a/osm_lcm/temporal/lcm_activities.py b/osm_lcm/temporal/lcm_activities.py index 6a2e8bc..32ebc29 100644 --- a/osm_lcm/temporal/lcm_activities.py +++ b/osm_lcm/temporal/lcm_activities.py @@ -36,7 +36,6 @@ class NsLcmActivity: def __init__(self, db): self.db = db self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") - self.count = 0 @activity.defn(name=ACTIVITY_NSLCM_NO_OP) async def no_op(self, input: NsLcmOperationInput) -> None: diff --git a/osm_lcm/temporal/lcm_workflows.py b/osm_lcm/temporal/lcm_workflows.py index 0af6324..837e586 100644 --- a/osm_lcm/temporal/lcm_workflows.py +++ b/osm_lcm/temporal/lcm_workflows.py @@ -84,6 +84,7 @@ class LcmOperationWorkflow(ABC): self.op_state.op_state = LcmOperationState.COMPLETED except ActivityError as e: + # TODO: Deteremine the best content for Activity Errors OSM-994 self.logger.exception(e) self.op_state.op_state = LcmOperationState.FAILED self.op_state.detailed_status = str(e.cause.with_traceback(e.__traceback__)) diff --git a/osm_lcm/temporal/ns_activities.py b/osm_lcm/temporal/ns_activities.py new file mode 100644 index 0000000..1a2925c --- /dev/null +++ b/osm_lcm/temporal/ns_activities.py @@ -0,0 +1,147 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from time import time +from temporalio import activity +from osm_common.temporal_constants import ( + ACTIVITY_UPDATE_NS_STATE, + ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED, + ACTIVITY_PREPARE_VNF_RECORDS, + ACTIVITY_DEPLOY_NS, +) +from osm_common.dataclasses.temporal_dataclasses import ( + NsInstantiateInput, + UpdateNsStateInput, + VduInstantiateInput, +) +from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils + + +class NsOperations: + def __init__(self, db): + self.db = db + self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") + + # TODO: Change to a workflow OSM-990 + @activity.defn(name=ACTIVITY_DEPLOY_NS) + async def deploy_ns(self, ns_instantiate_input: NsInstantiateInput) -> None: + vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid}) + for vnfr in vnfrs: + await self.deploy_vnf(vnfr) + + async def deploy_vnf(self, vnfr: dict): + vim_id = vnfr.get("vim-account-id") + model_name = vnfr.get("namespace") + vnfd_id = vnfr["vnfd-id"] + vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + sw_image_descs = vnfd.get("sw-image-desc") + for vdu in vnfd.get("vdu"): + await self.deploy_vdu(vdu, sw_image_descs, vim_id, model_name) + + async def deploy_vdu( + self, vdu: dict, sw_image_descs: str, vim_id: str, model_name: str + ) -> None: + vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs) + vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info) + workflow_id = ( + vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name + ) + self.logger.info(f"TODO: Start VDU Workflow {workflow_id}") + + @activity.defn(name=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED) + async def check_ns_instantiate_finished( + self, ns_instantiate: NsInstantiateInput + ) -> None: + # TODO: Implement OSM-993 + pass + + +class NsDbActivity: + + """Perform Database operations for NS accounts. + + Args: + db (object): Data Access Object + """ + + def __init__(self, db): + self.db = db + self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") + + @activity.defn(name=ACTIVITY_PREPARE_VNF_RECORDS) + async def prepare_vnf_records( + self, ns_instantiate_input: NsInstantiateInput + ) -> None: + """Prepare VNFs to be deployed: Add namespace to the VNFr. + + Collaborators: + DB Write: vnfrs + + Raises (Retryable): + DbException If the target DB record does not exist or DB is not reachable. + + Activity Lifecycle: + This activity will not report a heartbeat due to its + short-running nature. + + As this is a direct DB update, it is not recommended to have + any specific retry policy + + """ + vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid}) + for vnfr in vnfrs: + self._prepare_vnf_record(vnfr) + + def _get_namespace(self, ns_id: str, vim_id: str) -> str: + """The NS namespace is the combination if the NS ID and the VIM ID.""" + return ns_id[-12:] + "-" + vim_id[-12:] + + def _prepare_vnf_record(self, vnfr: dict) -> None: + """Add namespace to the VNFr.""" + ns_id = vnfr["nsr-id-ref"] + vim_id = vnfr["vim-account-id"] + namespace = self._get_namespace(ns_id, vim_id) + update_namespace = {"namespace": namespace} + self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, update_namespace) + + @activity.defn(name=ACTIVITY_UPDATE_NS_STATE) + async def update_ns_state(self, data: UpdateNsStateInput) -> None: + """ + Changes the state of the NS itself. + + Collaborators: + DB Write: nsrs + + Raises (Retryable): + DbException If the target DB record does not exist or DB is not reachable. + + Activity Lifecycle: + This activity will not report a heartbeat due to its + short-running nature. + + As this is a direct DB update, it is not recommended to have + any specific retry policy + """ + update_ns_state = { + "nsState": data.state.name, + # "errorDescription" : data.message, + "_admin.nsState": data.state.name, + "_admin.detailed-status": data.message, + "_admin.modified": time(), + } + self.db.set_one("nsrs", {"_id": data.ns_uuid}, update_ns_state) + self.logger.debug(f"Updated NS {data.ns_uuid} to {data.state.name}") diff --git a/osm_lcm/temporal/ns_workflows.py b/osm_lcm/temporal/ns_workflows.py new file mode 100644 index 0000000..bf0be65 --- /dev/null +++ b/osm_lcm/temporal/ns_workflows.py @@ -0,0 +1,95 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from temporalio import workflow +from temporalio.exceptions import ActivityError + +from osm_common.dataclasses.temporal_dataclasses import ( + NsInstantiateInput, + NsLcmOperationInput, + NsState, + UpdateNsStateInput, +) +from osm_common.temporal_constants import ( + WORKFLOW_NS_INSTANTIATE, + ACTIVITY_UPDATE_NS_STATE, + ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED, + ACTIVITY_DEPLOY_NS, + LCM_TASK_QUEUE, +) + +from osm_lcm.temporal.lcm_workflows import LcmOperationWorkflow + + +@workflow.defn(name=WORKFLOW_NS_INSTANTIATE, sandboxed=LcmOperationWorkflow._SANDBOXED) +class NsInstantiateWorkflow(LcmOperationWorkflow): + """Instantiate a NS""" + + @workflow.run + async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None: + await super().wrap_nslcmop(input=input) + + async def workflow(self, input: NsLcmOperationInput) -> None: + # Temporary until we align on what is really needed where. For now, this + # reduced set looks to be all that is needed. + input = NsInstantiateInput( + ns_uuid=input.nslcmop["nsInstanceId"], op_id=input.nslcmop["_id"] + ) + + ns_state = UpdateNsStateInput(input.ns_uuid, NsState.INSTANTIATED, "Done") + try: + # TODO: Create the model here OSM-991 + + # TODO: Change this to a workflow OSM-990 + await workflow.execute_activity( + activity=ACTIVITY_DEPLOY_NS, + arg=input, + activity_id=f"{ACTIVITY_DEPLOY_NS}-{input.ns_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, + retry_policy=LcmOperationWorkflow.no_retry_policy, + ) + + await workflow.execute_activity( + activity=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED, + arg=input, + activity_id=f"{ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED}-{input.ns_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, + retry_policy=LcmOperationWorkflow.retry_policy, + ) + + except ActivityError as e: + ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, e.cause.message) + self.logger.error( + f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e.cause.message)}" + ) + raise e + + except Exception as e: + ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, str(e)) + self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e)}") + raise e + + finally: + await workflow.execute_activity( + activity=ACTIVITY_UPDATE_NS_STATE, + arg=ns_state, + activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{input.ns_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, + retry_policy=LcmOperationWorkflow.retry_policy, + ) diff --git a/osm_lcm/temporal/tests/test_charm_info_utils.py b/osm_lcm/temporal/tests/test_charm_info_utils.py new file mode 100644 index 0000000..fd1cd9b --- /dev/null +++ b/osm_lcm/temporal/tests/test_charm_info_utils.py @@ -0,0 +1,158 @@ +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase +from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils +from osm_common.dataclasses.temporal_dataclasses import CharmInfo +import yaml + +nsr_id = "ea958ba5-4e58-4405-bf42-6e3be15d4c3a" +vim_id = "70b47595-fafa-4f63-904b-fc3ada60eebb" +expected_default_ns_model = "6e3be15d4c3a-fc3ada60eebb" + +vdu_nominal = """ +--- +vdu: + - id: test-vdu-id + name: test-vdu-name + int-cpd: + - id: internal + int-virtual-link-desc: network1 + - id: mgmt + virtual-compute-desc: compute-id + virtual-storage-desc: + - storage-id + sw-image-desc: image-test2 + configurable-properties: + - key: "track" + value: "latest" + - key: "channel" + value: "edge" +""" + +vdu_no_channel = """ +--- +vdu: + - id: test-vdu-id + name: test-vdu-name + int-cpd: + - id: internal + int-virtual-link-desc: network1 + - id: mgmt + virtual-compute-desc: compute-id + virtual-storage-desc: + - storage-id + sw-image-desc: image-test2 + configurable-properties: + - key: "track" + value: "latest" + - key: "key" + value: "edge" +""" + +vdu_invalid_image = """ +--- +vdu: + - id: test-vdu-id + name: test-vdu-name + int-cpd: + - id: internal + int-virtual-link-desc: network1 + - id: mgmt + virtual-compute-desc: compute-id + virtual-storage-desc: + - storage-id + sw-image-desc: invalid_image + configurable-properties: + - key: "track" + value: "latest" + - key: "key" + value: "edge" +""" + +vdu_no_sw_image_desc = """ +--- +vdu: + - id: test-vdu-id + name: test-vdu-name + int-cpd: + - id: internal + int-virtual-link-desc: network1 + - id: mgmt + virtual-compute-desc: compute-id + virtual-storage-desc: + - storage-id + sw-image-desc: invalid_image + configurable-properties: + - key: "track" + value: "latest" + - key: "key" + value: "edge" +""" + +sw_image_desc = """ +--- +sw-image-desc: + - id: image-test1 + name: charm-name1 + image: ch:mysql + version: "1.0" + - id: image-test2 + name: charm-name2 + image: ch:my-charm + version: "1.0" +""" + + +class TestCharmInfoUtils(TestCase): + def setUp(self): + self.charm_info_utils = CharmInfoUtils() + + def get_loaded_descriptor(self, descriptor): + return yaml.load(descriptor, Loader=yaml.Loader) + + def test_get_charm_info_nominal_case(self): + vdu_descriptor = self.get_loaded_descriptor(vdu_nominal).get("vdu") + sw_image_descs = self.get_loaded_descriptor(sw_image_desc).get("sw-image-desc") + result = self.charm_info_utils.get_charm_info(vdu_descriptor[0], sw_image_descs) + expected = CharmInfo("test-vdu-id", "edge", "ch:my-charm") + self.assertEqual(result, expected) + + def test_get_charm_info_no_channel(self): + vdu_descriptor = self.get_loaded_descriptor(vdu_no_channel).get("vdu") + sw_image_descs = self.get_loaded_descriptor(sw_image_desc).get("sw-image-desc") + result = self.charm_info_utils.get_charm_info(vdu_descriptor[0], sw_image_descs) + expected = CharmInfo("test-vdu-id", None, "ch:my-charm") + self.assertEqual(result, expected) + + def test_get_charm_info_invalid_image(self): + vdu_descriptor = self.get_loaded_descriptor(vdu_invalid_image).get("vdu") + sw_image_descs = self.get_loaded_descriptor(sw_image_desc).get("sw-image-desc") + result = self.charm_info_utils.get_charm_info(vdu_descriptor[0], sw_image_descs) + expected = CharmInfo("test-vdu-id", None, None) + self.assertEqual(result, expected) + + def test_get_charm_info_no_sw_image_desc(self): + vdu_descriptor = self.get_loaded_descriptor(vdu_no_sw_image_desc).get("vdu") + sw_image_descs = self.get_loaded_descriptor(sw_image_desc).get("sw-image-desc") + result = self.charm_info_utils.get_charm_info(vdu_descriptor[0], sw_image_descs) + expected = CharmInfo("test-vdu-id", None, None) + self.assertEqual(result, expected) + + def test_get_charm_info_empty_sw_image_descs(self): + vdu_descriptor = self.get_loaded_descriptor(vdu_nominal).get("vdu") + sw_image_descs = [] + result = self.charm_info_utils.get_charm_info(vdu_descriptor[0], sw_image_descs) + expected = CharmInfo("test-vdu-id", "edge", None) + self.assertEqual(result, expected) diff --git a/osm_lcm/temporal/vnf_activities.py b/osm_lcm/temporal/vnf_activities.py index 3eaeb44..2602eeb 100644 --- a/osm_lcm/temporal/vnf_activities.py +++ b/osm_lcm/temporal/vnf_activities.py @@ -21,7 +21,7 @@ from osm_common.temporal_constants import ( ACTIVITY_CHANGE_NF_INSTANTIATION_STATE, ACTIVITY_CHANGE_NF_NOTIFICATION_STATE, ACTIVITY_GET_TASK_QUEUE, - vim_type_task_queue_mappings, + VIM_TYPE_TASK_QUEUE_MAPPINGS, ) from osm_common.dataclasses.temporal_dataclasses import ( ChangeNFInstantiationStateInput, @@ -62,7 +62,7 @@ class VnfOperations: """ vnfrs = self.db.get_list("vnfrs", {"_id": get_task_queue_input.vnfr_uuid}) vim_record = self.db.get_list("vim_accounts", {"_id": vnfrs["vim-account-id"]}) - task_queue = vim_type_task_queue_mappings[vim_record["vim-type"]] + task_queue = VIM_TYPE_TASK_QUEUE_MAPPINGS[vim_record["vim-type"]] self.logger.debug(f"Got the task queue {task_queue} for VNF operations.") return GetTaskQueueOutput(task_queue) diff --git a/osm_lcm/temporal/vnf_workflows.py b/osm_lcm/temporal/vnf_workflows.py index 4f1f376..dc2023c 100644 --- a/osm_lcm/temporal/vnf_workflows.py +++ b/osm_lcm/temporal/vnf_workflows.py @@ -176,4 +176,5 @@ class PrepareVnfWorkflow: @workflow.run async def run(self, input: PrepareVnfInput) -> None: + # TODO: Set the model here OSM-991 pass