From: Mark Beierl Date: Thu, 6 Apr 2023 15:12:46 +0000 (+0000) Subject: VNF/VDU workflow X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F68%2F13168%2F1;p=osm%2FLCM.git VNF/VDU workflow Change-Id: I9c7d538236098105c1af036a56e73959688fc797 Signed-off-by: Mark Beierl --- diff --git a/osm_lcm/nglcm.py b/osm_lcm/nglcm.py index d69121c7..41f76718 100644 --- a/osm_lcm/nglcm.py +++ b/osm_lcm/nglcm.py @@ -40,6 +40,8 @@ from osm_lcm.temporal.vim_workflows import ( VimUpdateWorkflow, ) from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow +from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow +from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations from temporalio.client import Client from temporalio.worker import Worker @@ -132,6 +134,8 @@ class NGLcm: vim_data_activity_instance = VimDbActivity(self.db) paas_connector_instance = JujuPaasConnector(self.db) nslcm_activity_instance = NsLcmActivity(self.db) + vnf_operation_instance = VnfOperations(self.db) + vnf_data_activity_instance = VnfDbActivity(self.db) workflows = [ NsNoOpWorkflow, @@ -139,6 +143,7 @@ class NGLcm: VimDeleteWorkflow, VimUpdateWorkflow, VduInstantiateWorkflow, + VnfInstantiateWorkflow, ] activities = [ vim_data_activity_instance.update_vim_operation_state, @@ -150,6 +155,10 @@ class NGLcm: paas_connector_instance.create_model_if_doesnt_exist, paas_connector_instance.deploy_charm, paas_connector_instance.check_charm_status, + vnf_operation_instance.get_task_queue, + vnf_data_activity_instance.change_nf_state, + vnf_data_activity_instance.change_nf_instantiation_state, + vnf_data_activity_instance.change_nf_notification_state, ] # Check if we are running under a debugger diff --git a/osm_lcm/temporal/juju_paas_activities.py b/osm_lcm/temporal/juju_paas_activities.py index 74d47630..75918834 100644 --- a/osm_lcm/temporal/juju_paas_activities.py +++ b/osm_lcm/temporal/juju_paas_activities.py @@ -26,6 +26,7 @@ from osm_common.dataclasses.temporal_dataclasses import ( TestVimConnectivityInput, CreateModelInput, VduInstantiateInput, + CharmInfo, ) @@ -202,3 +203,61 @@ class JujuPaasConnector: and wait on each connection attempt. """ pass + + +class CharmInfoUtils: + @staticmethod + def get_charm_info(vdu: dict, sw_image_descs: list) -> CharmInfo: + """Extract the charm info of a VDU. + Args: + vdu (dict): contains the charm information. + sw_image_descs (list): list of images in the VNF. + + Returns: + CharmInfo (object) + """ + app_name = vdu.get("id") + channel = CharmInfoUtils._get_property_value( + "channel", vdu.get("configurable-properties") + ) + entity_url = CharmInfoUtils._get_entity_url( + vdu.get("sw-image-desc"), sw_image_descs + ) + return CharmInfo(app_name, channel, entity_url) + + @staticmethod + def _get_property_value(wanted_property: str, configurable_properties: list) -> str: + """Extract the value corresponding to a given key-value pairs in + vdu.configurable-properties + + Args: + wanted_property (str): property used as key. + configurable_properties (list): list of configurable-properties in VDU. + + Returns: + value of the wanted property (str) + """ + filtered_property = next( + filter( + lambda property: property.get("key") == wanted_property, + configurable_properties, + ), + None, + ) + return filtered_property.get("value") if filtered_property else None + + @staticmethod + def _get_entity_url(sw_image_desc_id: str, sw_image_descs: list) -> str: + """Extract the image field for a given image_id + Args: + sw_image_desc_id (str): ID of the image used by a VDU. + sw_image_descs (list): information of available images in the VNF. + + Returns: + image of the sw_image_desc_id (str) + """ + filtered_image = next( + filter(lambda image: image.get("id") == sw_image_desc_id, sw_image_descs), + None, + ) + return filtered_image.get("image") if filtered_image else None diff --git a/osm_lcm/temporal/vdu_workflows.py b/osm_lcm/temporal/vdu_workflows.py index ceb19c30..09cd93d0 100644 --- a/osm_lcm/temporal/vdu_workflows.py +++ b/osm_lcm/temporal/vdu_workflows.py @@ -20,11 +20,12 @@ from temporalio import workflow from temporalio.common import RetryPolicy from osm_common.dataclasses.temporal_dataclasses import VduInstantiateInput + from osm_common.temporal_constants import ( - WORKFLOW_VDU_INSTANTIATE, ACTIVITY_DEPLOY_CHARM, ACTIVITY_CHECK_CHARM_STATUS, LCM_TASK_QUEUE, + WORKFLOW_VDU_INSTANTIATE, ) _SANDBOXED = False diff --git a/osm_lcm/temporal/vnf_activities.py b/osm_lcm/temporal/vnf_activities.py new file mode 100644 index 00000000..3eaeb445 --- /dev/null +++ b/osm_lcm/temporal/vnf_activities.py @@ -0,0 +1,153 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from temporalio import activity +from osm_common.temporal_constants import ( + ACTIVITY_CHANGE_NF_STATE, + ACTIVITY_CHANGE_NF_INSTANTIATION_STATE, + ACTIVITY_CHANGE_NF_NOTIFICATION_STATE, + ACTIVITY_GET_TASK_QUEUE, + vim_type_task_queue_mappings, +) +from osm_common.dataclasses.temporal_dataclasses import ( + ChangeNFInstantiationStateInput, + ChangeNFStateInput, + GetTaskQueueInput, + GetTaskQueueOutput, +) + + +class VnfOperations: + def __init__(self, db): + self.db = db + self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") + + @activity.defn(name=ACTIVITY_GET_TASK_QUEUE) + async def get_task_queue( + self, get_task_queue_input: GetTaskQueueInput + ) -> GetTaskQueueOutput: + """Finds the appropriate task queue according to VIM type of VNF. + + Collaborators: + DB Access Object + + Raises (retryable): + DbException: If DB read operations fail + + Activity Lifecycle: + This activity should complete relatively quickly (less than a + second). However, it would be reasonable to wait up to 10 + seconds. + + This activity will not report a heartbeat due to its + short-running nature. + + It is not necessary to implement a back-off strategy for this + activity, the operation is idempotent. + + """ + vnfrs = self.db.get_list("vnfrs", {"_id": get_task_queue_input.vnfr_uuid}) + vim_record = self.db.get_list("vim_accounts", {"_id": vnfrs["vim-account-id"]}) + task_queue = vim_type_task_queue_mappings[vim_record["vim-type"]] + self.logger.debug(f"Got the task queue {task_queue} for VNF operations.") + return GetTaskQueueOutput(task_queue) + + +class VnfDbActivity: + + """Perform Database operations for NS accounts. + + Args: + db (object): Data Access Object + """ + + def __init__(self, db): + self.db = db + self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") + + @activity.defn(name=ACTIVITY_CHANGE_NF_STATE) + async def change_nf_state(self, nf_state_input: ChangeNFStateInput) -> None: + """Updates the VNF State in VNFR. + + Collaborators: + DB Access Object + + Raises (retryable): + DbException: If DB access/update fails, the collection or DB record ID does not exist. + + Activity Lifecycle: + This activity should complete relatively quickly (less than a + second). However, it would be reasonable to wait up to 10 + seconds. + + This activity will not report a heartbeat due to its + short-running nature. + + It is not necessary to implement a back-off strategy for this + activity, the operation is idempotent. + + """ + update_nf_state = {"vnfState": nf_state_input.nf_state} + self.db.set_one("vnfrs", {"_id": nf_state_input.vnfr_uuid}, update_nf_state) + self.logger.debug( + f"VNF {nf_state_input.vnfr_uuid} state is updated to {nf_state_input.nf_state}." + ) + + @activity.defn(name=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE) + async def change_nf_instantiation_state( + self, nf_instantiation_state_input: ChangeNFInstantiationStateInput + ) -> None: + """Updates the VNF Instantiation State in VNFR. + + Collaborators: + DB Access Object + + Raises (retryable): + DbException: If DB access or update fails + + Activity Lifecycle: + This activity should complete relatively quickly (less than a + second). However, it would be reasonable to wait up to 10 + seconds. + + This activity will not report a heartbeat due to its + short-running nature. + + It is not necessary to implement a back-off strategy for this + activity, the operation is idempotent. + + """ + update_nf_instantiation_state = { + "vnfState": nf_instantiation_state_input.nf_instantiation_state + } + self.db.set_one( + "vnfrs", + {"_id": nf_instantiation_state_input.vnfr_uuid}, + update_nf_instantiation_state, + ) + self.logger.debug( + f"VNF {nf_instantiation_state_input.vnfr_uuid} state is updated to {nf_instantiation_state_input.nf_instantiation_state}." + ) + + @activity.defn(name=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE) + async def change_nf_notification_state(self) -> None: + """If VNF LCM operation state changes, send notification updates. + + This activity does nothing. + + """ + pass diff --git a/osm_lcm/temporal/vnf_workflows.py b/osm_lcm/temporal/vnf_workflows.py new file mode 100644 index 00000000..4f1f376e --- /dev/null +++ b/osm_lcm/temporal/vnf_workflows.py @@ -0,0 +1,179 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import timedelta +import logging +from temporalio import workflow +from temporalio.common import RetryPolicy + +from osm_common.dataclasses.temporal_dataclasses import ( + VnfInstantiationState, + VnfState, + VnfInstantiateInput, + VduInstantiateInput, + ChangeNFInstantiationStateInput, + ChangeNFStateInput, + GetTaskQueueInput, + PrepareVnfInput, +) + +from osm_common.temporal_constants import ( + ACTIVITY_CHANGE_NF_INSTANTIATION_STATE, + ACTIVITY_CHANGE_NF_NOTIFICATION_STATE, + ACTIVITY_CHANGE_NF_STATE, + ACTIVITY_GET_TASK_QUEUE, + LCM_TASK_QUEUE, + WORKFLOW_VDU_INSTANTIATE, + WORKFLOW_VNF_INSTANTIATE, + WORKFLOW_VNF_PREPARE, +) +from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils + +_SANDBOXED = False +retry_policy = RetryPolicy(maximum_attempts=3) +default_schedule_to_close_timeout = timedelta(minutes=10) + + +@workflow.defn(name=WORKFLOW_VNF_INSTANTIATE, sandboxed=_SANDBOXED) +class VnfInstantiateWorkflow: + """Instantiate a VNF. + + Workflow Identifier: + It is recommended that the ID for the VNF is referred as a workflow + ID when invoking this workflow. + """ + + def __init__(self): + self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}") + self.nf_instantiation_state = ChangeNFInstantiationStateInput( + None, VnfInstantiationState.NOT_INSTANTIATED + ) + self.nf_state = ChangeNFStateInput(None, VnfState.STOPPED) + + @workflow.run + async def run(self, input: VnfInstantiateInput) -> None: + try: + self.nf_state.vnfr_uuid = ( + self.nf_instantiation_state.vnfr_uuid + ) = input.vnfr_uuid + + await self.update_nf_instantiation_state() + await self.update_nf_state() + vnf_task_queue = await workflow.execute_activity( + activity=ACTIVITY_GET_TASK_QUEUE, + arg=GetTaskQueueInput(input.vnf_uuid), + activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnf_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + + await workflow.execute_child_workflow( + workflow=WORKFLOW_VNF_PREPARE, + arg=PrepareVnfInput(input.vnf_uuid), + task_queue=vnf_task_queue, + id=f"{WORKFLOW_VNF_PREPARE}-{input.vnf_uuid}", + ) + + await self.instantiate_vdus(input.vnfr_uuid, vnf_task_queue) + + self.nf_instantiation_state.state = VnfInstantiationState.INSTANTIATED + self.nf_state.state = VnfState.STARTED + + except Exception as e: + workflow.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {str(e)}") + self.nf_instantiation_state.state = VnfInstantiationState.NOT_INSTANTIATED + self.nf_state.state = VnfState.STOPPED + raise e + + finally: + await self.update_nf_instantiation_state() + await self.update_nf_state() + await self.update_nf_notification_state() + + async def update_nf_state(self): + await workflow.execute_activity( + activity=ACTIVITY_CHANGE_NF_STATE, + arg=self.nf_state, + activity_id=f"{ACTIVITY_CHANGE_NF_STATE}-{self.nf_state.vnfr_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + + async def update_nf_instantiation_state(self): + await workflow.execute_activity( + activity=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE, + arg=self.nf_instantiation_state, + activity_id=f"{ACTIVITY_CHANGE_NF_INSTANTIATION_STATE}-{self.nf_instantiation_state.vnfr_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + + @staticmethod + async def update_nf_notification_state(): + await workflow.execute_activity( + activity=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE, + arg=None, + activity_id=f"{ACTIVITY_CHANGE_NF_NOTIFICATION_STATE}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + + async def instantiate_vdus(self, vnfr_uuid, vnf_task_queue): + # TODO: see OSMENG-989 + + vnfr = {"uuid", vnfr_uuid} # self.db.get_one("vnfrs", {"_id": vnfr_uuid}) + vnfd = {} # self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]}) + for vdu in vnfd.get("vdu"): + ( + vdu_instantiate_input, + vdu_instantiate_workflow_id, + ) = VnfInstantiateWorkflow.get_vdu_instantiate_input(vnfr, vnfd, vdu) + await workflow.execute_child_workflow( + workflow=WORKFLOW_VDU_INSTANTIATE, + arg=vdu_instantiate_input, + task_queue=vnf_task_queue, + id=vdu_instantiate_workflow_id, + ) + + @staticmethod + def get_vdu_instantiate_input(vnfr: dict, vnfd: dict, vdu: dict): + model_name = vnfr.get("namespace") + vim_id = vnfr.get("vim-account-id") + sw_image_descs = vnfd.get("sw-image-desc") + vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs) + vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info) + vdu_instantiate_workflow_id = ( + vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name + ) + return vdu_instantiate_input, vdu_instantiate_workflow_id + + +@workflow.defn(name=WORKFLOW_VNF_PREPARE, sandboxed=_SANDBOXED) +class PrepareVnfWorkflow: + """Prepare a VNF. + + Workflow Identifier: + It is recommended that the ID for the VNF is referred as a workflow + ID when invoking this workflow. + """ + + @workflow.run + async def run(self, input: PrepareVnfInput) -> None: + pass