VNF/VDU workflow 68/13168/1
authorMark Beierl <mark.beierl@canonical.com>
Thu, 6 Apr 2023 15:12:46 +0000 (15:12 +0000)
committerMark Beierl <mark.beierl@canonical.com>
Thu, 6 Apr 2023 15:12:46 +0000 (15:12 +0000)
Change-Id: I9c7d538236098105c1af036a56e73959688fc797
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
osm_lcm/nglcm.py
osm_lcm/temporal/juju_paas_activities.py
osm_lcm/temporal/vdu_workflows.py
osm_lcm/temporal/vnf_activities.py [new file with mode: 0644]
osm_lcm/temporal/vnf_workflows.py [new file with mode: 0644]

index d69121c..41f7671 100644 (file)
@@ -40,6 +40,8 @@ from osm_lcm.temporal.vim_workflows import (
     VimUpdateWorkflow,
 )
 from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow
+from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow
+from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations
 from temporalio.client import Client
 from temporalio.worker import Worker
 
@@ -132,6 +134,8 @@ class NGLcm:
         vim_data_activity_instance = VimDbActivity(self.db)
         paas_connector_instance = JujuPaasConnector(self.db)
         nslcm_activity_instance = NsLcmActivity(self.db)
+        vnf_operation_instance = VnfOperations(self.db)
+        vnf_data_activity_instance = VnfDbActivity(self.db)
 
         workflows = [
             NsNoOpWorkflow,
@@ -139,6 +143,7 @@ class NGLcm:
             VimDeleteWorkflow,
             VimUpdateWorkflow,
             VduInstantiateWorkflow,
+            VnfInstantiateWorkflow,
         ]
         activities = [
             vim_data_activity_instance.update_vim_operation_state,
@@ -150,6 +155,10 @@ class NGLcm:
             paas_connector_instance.create_model_if_doesnt_exist,
             paas_connector_instance.deploy_charm,
             paas_connector_instance.check_charm_status,
+            vnf_operation_instance.get_task_queue,
+            vnf_data_activity_instance.change_nf_state,
+            vnf_data_activity_instance.change_nf_instantiation_state,
+            vnf_data_activity_instance.change_nf_notification_state,
         ]
 
         # Check if we are running under a debugger
index 74d4763..7591883 100644 (file)
@@ -26,6 +26,7 @@ from osm_common.dataclasses.temporal_dataclasses import (
     TestVimConnectivityInput,
     CreateModelInput,
     VduInstantiateInput,
+    CharmInfo,
 )
 
 
@@ -202,3 +203,61 @@ class JujuPaasConnector:
             and wait on each connection attempt.
         """
         pass
+
+
+class CharmInfoUtils:
+    @staticmethod
+    def get_charm_info(vdu: dict, sw_image_descs: list) -> CharmInfo:
+        """Extract the charm info of a VDU.
+        Args:
+            vdu (dict):    contains the charm information.
+            sw_image_descs (list): list of images in the VNF.
+
+        Returns:
+            CharmInfo  (object)
+        """
+        app_name = vdu.get("id")
+        channel = CharmInfoUtils._get_property_value(
+            "channel", vdu.get("configurable-properties")
+        )
+        entity_url = CharmInfoUtils._get_entity_url(
+            vdu.get("sw-image-desc"), sw_image_descs
+        )
+        return CharmInfo(app_name, channel, entity_url)
+
+    @staticmethod
+    def _get_property_value(wanted_property: str, configurable_properties: list) -> str:
+        """Extract the value corresponding to a given key-value pairs in
+            vdu.configurable-properties
+
+        Args:
+            wanted_property (str): property used as key.
+            configurable_properties (list): list of configurable-properties in VDU.
+
+        Returns:
+            value of the wanted property  (str)
+        """
+        filtered_property = next(
+            filter(
+                lambda property: property.get("key") == wanted_property,
+                configurable_properties,
+            ),
+            None,
+        )
+        return filtered_property.get("value") if filtered_property else None
+
+    @staticmethod
+    def _get_entity_url(sw_image_desc_id: str, sw_image_descs: list) -> str:
+        """Extract the image field for a given image_id
+        Args:
+            sw_image_desc_id (str): ID of the image used by a VDU.
+            sw_image_descs (list): information of available images in the VNF.
+
+        Returns:
+            image of the sw_image_desc_id (str)
+        """
+        filtered_image = next(
+            filter(lambda image: image.get("id") == sw_image_desc_id, sw_image_descs),
+            None,
+        )
+        return filtered_image.get("image") if filtered_image else None
index ceb19c3..09cd93d 100644 (file)
@@ -20,11 +20,12 @@ from temporalio import workflow
 from temporalio.common import RetryPolicy
 
 from osm_common.dataclasses.temporal_dataclasses import VduInstantiateInput
+
 from osm_common.temporal_constants import (
-    WORKFLOW_VDU_INSTANTIATE,
     ACTIVITY_DEPLOY_CHARM,
     ACTIVITY_CHECK_CHARM_STATUS,
     LCM_TASK_QUEUE,
+    WORKFLOW_VDU_INSTANTIATE,
 )
 
 _SANDBOXED = False
diff --git a/osm_lcm/temporal/vnf_activities.py b/osm_lcm/temporal/vnf_activities.py
new file mode 100644 (file)
index 0000000..3eaeb44
--- /dev/null
@@ -0,0 +1,153 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from temporalio import activity
+from osm_common.temporal_constants import (
+    ACTIVITY_CHANGE_NF_STATE,
+    ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
+    ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
+    ACTIVITY_GET_TASK_QUEUE,
+    vim_type_task_queue_mappings,
+)
+from osm_common.dataclasses.temporal_dataclasses import (
+    ChangeNFInstantiationStateInput,
+    ChangeNFStateInput,
+    GetTaskQueueInput,
+    GetTaskQueueOutput,
+)
+
+
+class VnfOperations:
+    def __init__(self, db):
+        self.db = db
+        self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
+
+    @activity.defn(name=ACTIVITY_GET_TASK_QUEUE)
+    async def get_task_queue(
+        self, get_task_queue_input: GetTaskQueueInput
+    ) -> GetTaskQueueOutput:
+        """Finds the appropriate task queue according to VIM type of VNF.
+
+        Collaborators:
+            DB Access Object
+
+        Raises (retryable):
+            DbException: If DB read operations fail
+
+        Activity Lifecycle:
+            This activity should complete relatively quickly (less than a
+            second). However, it would be reasonable to wait up to 10
+            seconds.
+
+            This activity will not report a heartbeat due to its
+            short-running nature.
+
+            It is not necessary to implement a back-off strategy for this
+            activity, the operation is idempotent.
+
+        """
+        vnfrs = self.db.get_list("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
+        vim_record = self.db.get_list("vim_accounts", {"_id": vnfrs["vim-account-id"]})
+        task_queue = vim_type_task_queue_mappings[vim_record["vim-type"]]
+        self.logger.debug(f"Got the task queue {task_queue} for VNF operations.")
+        return GetTaskQueueOutput(task_queue)
+
+
+class VnfDbActivity:
+
+    """Perform Database operations for NS accounts.
+
+    Args:
+        db  (object):       Data Access Object
+    """
+
+    def __init__(self, db):
+        self.db = db
+        self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
+
+    @activity.defn(name=ACTIVITY_CHANGE_NF_STATE)
+    async def change_nf_state(self, nf_state_input: ChangeNFStateInput) -> None:
+        """Updates the VNF State in VNFR.
+
+        Collaborators:
+            DB Access Object
+
+        Raises (retryable):
+            DbException: If DB access/update fails, the collection or DB record ID does not exist.
+
+        Activity Lifecycle:
+            This activity should complete relatively quickly (less than a
+            second). However, it would be reasonable to wait up to 10
+            seconds.
+
+            This activity will not report a heartbeat due to its
+            short-running nature.
+
+            It is not necessary to implement a back-off strategy for this
+            activity, the operation is idempotent.
+
+        """
+        update_nf_state = {"vnfState": nf_state_input.nf_state}
+        self.db.set_one("vnfrs", {"_id": nf_state_input.vnfr_uuid}, update_nf_state)
+        self.logger.debug(
+            f"VNF {nf_state_input.vnfr_uuid} state is updated to {nf_state_input.nf_state}."
+        )
+
+    @activity.defn(name=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE)
+    async def change_nf_instantiation_state(
+        self, nf_instantiation_state_input: ChangeNFInstantiationStateInput
+    ) -> None:
+        """Updates the VNF Instantiation State in VNFR.
+
+        Collaborators:
+            DB Access Object
+
+        Raises (retryable):
+            DbException: If DB access or update fails
+
+        Activity Lifecycle:
+            This activity should complete relatively quickly (less than a
+            second). However, it would be reasonable to wait up to 10
+            seconds.
+
+            This activity will not report a heartbeat due to its
+            short-running nature.
+
+            It is not necessary to implement a back-off strategy for this
+            activity, the operation is idempotent.
+
+        """
+        update_nf_instantiation_state = {
+            "vnfState": nf_instantiation_state_input.nf_instantiation_state
+        }
+        self.db.set_one(
+            "vnfrs",
+            {"_id": nf_instantiation_state_input.vnfr_uuid},
+            update_nf_instantiation_state,
+        )
+        self.logger.debug(
+            f"VNF {nf_instantiation_state_input.vnfr_uuid} state is updated to {nf_instantiation_state_input.nf_instantiation_state}."
+        )
+
+    @activity.defn(name=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE)
+    async def change_nf_notification_state(self) -> None:
+        """If VNF LCM operation state changes, send notification updates.
+
+        This activity does nothing.
+
+        """
+        pass
diff --git a/osm_lcm/temporal/vnf_workflows.py b/osm_lcm/temporal/vnf_workflows.py
new file mode 100644 (file)
index 0000000..4f1f376
--- /dev/null
@@ -0,0 +1,179 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import timedelta
+import logging
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+
+from osm_common.dataclasses.temporal_dataclasses import (
+    VnfInstantiationState,
+    VnfState,
+    VnfInstantiateInput,
+    VduInstantiateInput,
+    ChangeNFInstantiationStateInput,
+    ChangeNFStateInput,
+    GetTaskQueueInput,
+    PrepareVnfInput,
+)
+
+from osm_common.temporal_constants import (
+    ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
+    ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
+    ACTIVITY_CHANGE_NF_STATE,
+    ACTIVITY_GET_TASK_QUEUE,
+    LCM_TASK_QUEUE,
+    WORKFLOW_VDU_INSTANTIATE,
+    WORKFLOW_VNF_INSTANTIATE,
+    WORKFLOW_VNF_PREPARE,
+)
+from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils
+
+_SANDBOXED = False
+retry_policy = RetryPolicy(maximum_attempts=3)
+default_schedule_to_close_timeout = timedelta(minutes=10)
+
+
+@workflow.defn(name=WORKFLOW_VNF_INSTANTIATE, sandboxed=_SANDBOXED)
+class VnfInstantiateWorkflow:
+    """Instantiate a VNF.
+
+    Workflow Identifier:
+        It is recommended that the ID for the VNF is referred as a workflow
+        ID when invoking this workflow.
+    """
+
+    def __init__(self):
+        self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
+        self.nf_instantiation_state = ChangeNFInstantiationStateInput(
+            None, VnfInstantiationState.NOT_INSTANTIATED
+        )
+        self.nf_state = ChangeNFStateInput(None, VnfState.STOPPED)
+
+    @workflow.run
+    async def run(self, input: VnfInstantiateInput) -> None:
+        try:
+            self.nf_state.vnfr_uuid = (
+                self.nf_instantiation_state.vnfr_uuid
+            ) = input.vnfr_uuid
+
+            await self.update_nf_instantiation_state()
+            await self.update_nf_state()
+            vnf_task_queue = await workflow.execute_activity(
+                activity=ACTIVITY_GET_TASK_QUEUE,
+                arg=GetTaskQueueInput(input.vnf_uuid),
+                activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnf_uuid}",
+                task_queue=LCM_TASK_QUEUE,
+                schedule_to_close_timeout=default_schedule_to_close_timeout,
+                retry_policy=retry_policy,
+            )
+
+            await workflow.execute_child_workflow(
+                workflow=WORKFLOW_VNF_PREPARE,
+                arg=PrepareVnfInput(input.vnf_uuid),
+                task_queue=vnf_task_queue,
+                id=f"{WORKFLOW_VNF_PREPARE}-{input.vnf_uuid}",
+            )
+
+            await self.instantiate_vdus(input.vnfr_uuid, vnf_task_queue)
+
+            self.nf_instantiation_state.state = VnfInstantiationState.INSTANTIATED
+            self.nf_state.state = VnfState.STARTED
+
+        except Exception as e:
+            workflow.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {str(e)}")
+            self.nf_instantiation_state.state = VnfInstantiationState.NOT_INSTANTIATED
+            self.nf_state.state = VnfState.STOPPED
+            raise e
+
+        finally:
+            await self.update_nf_instantiation_state()
+            await self.update_nf_state()
+            await self.update_nf_notification_state()
+
+    async def update_nf_state(self):
+        await workflow.execute_activity(
+            activity=ACTIVITY_CHANGE_NF_STATE,
+            arg=self.nf_state,
+            activity_id=f"{ACTIVITY_CHANGE_NF_STATE}-{self.nf_state.vnfr_uuid}",
+            task_queue=LCM_TASK_QUEUE,
+            schedule_to_close_timeout=default_schedule_to_close_timeout,
+            retry_policy=retry_policy,
+        )
+
+    async def update_nf_instantiation_state(self):
+        await workflow.execute_activity(
+            activity=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
+            arg=self.nf_instantiation_state,
+            activity_id=f"{ACTIVITY_CHANGE_NF_INSTANTIATION_STATE}-{self.nf_instantiation_state.vnfr_uuid}",
+            task_queue=LCM_TASK_QUEUE,
+            schedule_to_close_timeout=default_schedule_to_close_timeout,
+            retry_policy=retry_policy,
+        )
+
+    @staticmethod
+    async def update_nf_notification_state():
+        await workflow.execute_activity(
+            activity=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
+            arg=None,
+            activity_id=f"{ACTIVITY_CHANGE_NF_NOTIFICATION_STATE}",
+            task_queue=LCM_TASK_QUEUE,
+            schedule_to_close_timeout=default_schedule_to_close_timeout,
+            retry_policy=retry_policy,
+        )
+
+    async def instantiate_vdus(self, vnfr_uuid, vnf_task_queue):
+        # TODO: see OSMENG-989
+
+        vnfr = {"uuid", vnfr_uuid}  # self.db.get_one("vnfrs", {"_id": vnfr_uuid})
+        vnfd = {}  # self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]})
+        for vdu in vnfd.get("vdu"):
+            (
+                vdu_instantiate_input,
+                vdu_instantiate_workflow_id,
+            ) = VnfInstantiateWorkflow.get_vdu_instantiate_input(vnfr, vnfd, vdu)
+            await workflow.execute_child_workflow(
+                workflow=WORKFLOW_VDU_INSTANTIATE,
+                arg=vdu_instantiate_input,
+                task_queue=vnf_task_queue,
+                id=vdu_instantiate_workflow_id,
+            )
+
+    @staticmethod
+    def get_vdu_instantiate_input(vnfr: dict, vnfd: dict, vdu: dict):
+        model_name = vnfr.get("namespace")
+        vim_id = vnfr.get("vim-account-id")
+        sw_image_descs = vnfd.get("sw-image-desc")
+        vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
+        vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info)
+        vdu_instantiate_workflow_id = (
+            vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name
+        )
+        return vdu_instantiate_input, vdu_instantiate_workflow_id
+
+
+@workflow.defn(name=WORKFLOW_VNF_PREPARE, sandboxed=_SANDBOXED)
+class PrepareVnfWorkflow:
+    """Prepare a VNF.
+
+    Workflow Identifier:
+        It is recommended that the ID for the VNF is referred as a workflow
+        ID when invoking this workflow.
+    """
+
+    @workflow.run
+    async def run(self, input: PrepareVnfInput) -> None:
+        pass