import yaml
from osm_common.dbbase import DbException
+from osm_common.temporal_constants import LCM_TASK_QUEUE
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.lcm_config import LcmCfg
from osm_lcm.lcm_utils import LcmException
from os import path
-from temporalio import workflow
+from osm_lcm.temporal.vim_activities import VimDbActivity, JujuPaasConnector
+from osm_lcm.temporal.vim_workflows import (
+ VimCreateWorkflow,
+ VimDeleteWorkflow,
+ VimUpdateWorkflow,
+)
from temporalio.client import Client
from temporalio.worker import Worker
self.logger.critical("starting osm/nglcm")
async def start(self):
- # do some temporal stuff here
temporal_api = (
f"{self.main_config.temporal.host}:{str(self.main_config.temporal.port)}"
)
- self.logger.info(f"Attempting to register with Temporal at {temporal_api}")
client = await Client.connect(temporal_api)
-
- task_queue = "lcm-task-queue"
- workflows = [
- Heartbeat,
+ data_activity_instance = VimDbActivity(self.db)
+ paas_connector_instance = JujuPaasConnector(self.db)
+
+ workflows = [VimCreateWorkflow, VimDeleteWorkflow, VimUpdateWorkflow]
+ activities = [
+ data_activity_instance.update_vim_operation_state,
+ data_activity_instance.update_vim_state,
+ data_activity_instance.delete_vim_record,
+ paas_connector_instance.test_vim_connectivity,
]
- activities = []
worker = Worker(
- client, task_queue=task_queue, workflows=workflows, activities=activities
+ client,
+ task_queue=LCM_TASK_QUEUE,
+ workflows=workflows,
+ activities=activities,
)
- self.logger.info(f"Registered for queue {task_queue}")
- self.logger.info(f"Registered workflows {workflows}")
- self.logger.info(f"Registered activites {activities}")
-
+ self.logger.info("Starting LCM temporal worker")
await worker.run()
-@workflow.defn
-class Heartbeat:
- @workflow.run
- async def run(self) -> str:
- return "alive"
-
-
if __name__ == "__main__":
try:
opts, args = getopt.getopt(
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from temporalio import activity
+from time import time
+from n2vc.temporal_libjuju import ConnectionInfo, Libjuju
+from osm_common.temporal_constants import (
+ ACTIVITY_DELETE_VIM,
+ ACTIVITY_TEST_VIM_CONNECTIVITY,
+ ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+ ACTIVITY_UPDATE_VIM_STATE,
+)
+from osm_common.dataclasses.temporal_dataclasses import (
+ DeleteVimInput,
+ TestVimConnectivityInput,
+ UpdateVimOperationStateInput,
+ UpdateVimStateInput,
+)
+
+activity.logger = logging.getLogger("lcm.temporal.vim_activities")
+
+
+class JujuPaasConnector:
+ """Handles Juju Controller operations.
+
+ Args:
+ db (object): Data Access Object
+ """
+
+ def __init__(self, db):
+ self.db = db
+
+ def _decrypt_password(self, vim_content: dict) -> str:
+ """Decrypt a password.
+ vim_content (dict): VIM details as a dictionary
+
+ Returns:
+ plain text password (str)
+ """
+ return self.db.decrypt(
+ vim_content["vim_password"],
+ schema_version=vim_content["schema_version"],
+ salt=vim_content["_id"],
+ )
+
+ def _get_connection_info(self, vim_id: str) -> ConnectionInfo:
+ """Get VIM details from database using vim_id and returns
+ the Connection Info to connect Juju Controller.
+
+ Args:
+ vim_id (str): VIM ID
+
+ Returns:
+ ConnectionInfo (object)
+ """
+ vim_content = self.db.get_one("vim_accounts", {"_id": vim_id})
+ endpoint = vim_content["vim_url"]
+ username = vim_content["vim_user"]
+ vim_config = vim_content["config"]
+ cacert = vim_config["ca_cert_content"]
+ cloud_name = vim_config["cloud"]
+ cloud_credentials = vim_config["cloud_credentials"]
+ password = self._decrypt_password(vim_content)
+ return ConnectionInfo(
+ endpoint, username, password, cacert, cloud_name, cloud_credentials
+ )
+
+ @activity.defn(name=ACTIVITY_TEST_VIM_CONNECTIVITY)
+ async def test_vim_connectivity(
+ self, test_connectivity_input: TestVimConnectivityInput
+ ) -> None:
+ """Validates the credentials by attempting to connect to the given Juju Controller.
+
+ Collaborators:
+ DB Read: vim_accounts
+ Juju Controller: Connect only
+
+ Raises (Retryable):
+ ApplicationError If any of password, cacert, cloud_credentials is invalid
+ or Juju controller is not reachable
+
+ Activity Lifecycle:
+ This activity should complete relatively quickly (in a few seconds).
+ However, it would be reasonable to wait more than 72 seconds (network timeout)
+ incase there are network issues.
+
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ It is recommended, although not necessary to implement a
+ back-off strategy for this activity, as it will naturally block
+ and wait on each connection attempt.
+ """
+ vim_id = test_connectivity_input.vim_uuid
+ controller = None
+ try:
+ connection_info = self._get_connection_info(vim_id)
+ libjuju = Libjuju(connection_info)
+ controller = await libjuju.get_controller()
+ message = f"Connection to juju controller succeeded for {vim_id}"
+ activity.logger.info(message)
+ finally:
+ await libjuju.disconnect_controller(controller)
+
+
+class VimDbActivity:
+ """Perform Database operations for VIM accounts.
+
+ Args:
+ db (object): Data Access Object
+ """
+
+ def __init__(self, db):
+ self.db = db
+
+ @activity.defn(name=ACTIVITY_UPDATE_VIM_STATE)
+ async def update_vim_state(self, data: UpdateVimStateInput) -> None:
+ """
+ Changes the state of the VIM itself. Should be either
+ ENABLED or ERROR, however this activity does not validate
+ the state as no validation was done in OSM previously.
+
+ Collaborators:
+ DB Write: vim_accounts
+
+ Raises (Retryable):
+ DbException If the target DB record does not exist or DB is not reachable.
+
+ Activity Lifecycle:
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ As this is a direct DB update, it is not recommended to have
+ any specific retry policy
+ """
+ update_vim_state = {
+ "_admin.operationalState": data.operational_state.name,
+ "_admin.detailed-status": data.message,
+ "_admin.modified": time(),
+ }
+
+ self.db.set_one("vim_accounts", {"_id": data.vim_uuid}, update_vim_state)
+ activity.logger.debug(
+ f"Updated VIM {data.vim_uuid} to {data.operational_state.name}"
+ )
+
+ @activity.defn(name=ACTIVITY_UPDATE_VIM_OPERATION_STATE)
+ async def update_vim_operation_state(
+ self, data: UpdateVimOperationStateInput
+ ) -> None:
+ """
+ Changes the state of a VIM operation task. Should be done to
+ indicate progress, or completion of the task itself.
+
+ Collaborators:
+ DB Write: vim_accounts
+
+ Raises (Retryable):
+ DbException If the target DB record does not exist or DB is not reachable.
+
+ Activity Lifecycle:
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ As this is a direct DB update, it is not recommended to have
+ any specific retry policy
+ """
+ update_operation_state = {
+ f"_admin.operations.{format(data.op_id)}.operationState": data.op_state.name,
+ f"_admin.operations.{format(data.op_id)}.detailed-status": data.message,
+ "_admin.current_operation": None,
+ }
+
+ self.db.set_one("vim_accounts", {"_id": data.vim_uuid}, update_operation_state)
+ activity.logger.debug(
+ f"Updated VIM {data.vim_uuid} OP ID {data.op_id} to {data.op_state.name}"
+ )
+
+ @activity.defn(name=ACTIVITY_DELETE_VIM)
+ async def delete_vim_record(self, data: DeleteVimInput) -> None:
+ """
+ Deletes the VIM record from the database.
+
+ Collaborators:
+ DB Delete: vim_accounts
+
+ Raises (Retryable):
+ DbException If the target DB record does not exist or DB is not reachable.
+
+ Activity Lifecycle:
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ As this is a direct DB update, it is not recommended to have
+ any specific retry policy
+ """
+
+ self.db.del_one("vim_accounts", {"_id": data.vim_uuid})
+ activity.logger.debug(f"Removed VIM {data.vim_uuid}")
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import timedelta
+import logging
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+from temporalio.exceptions import ActivityError
+
+from osm_common.dataclasses.temporal_dataclasses import (
+ DeleteVimInput,
+ TestVimConnectivityInput,
+ UpdateVimOperationStateInput,
+ UpdateVimStateInput,
+ VimOperationInput,
+ VimState,
+ VimOperationState,
+)
+from osm_common.temporal_constants import (
+ ACTIVITY_TEST_VIM_CONNECTIVITY,
+ ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+ ACTIVITY_UPDATE_VIM_STATE,
+ ACTIVITY_DELETE_VIM,
+ LCM_TASK_QUEUE,
+ WORKFLOW_VIM_CREATE,
+ WORKFLOW_VIM_UPDATE,
+ WORKFLOW_VIM_DELETE,
+)
+
+_SANDBOXED = False
+retry_policy = RetryPolicy(maximum_attempts=3)
+default_schedule_to_close_timeout = timedelta(minutes=10)
+
+workflow.logger = logging.getLogger("lcm.temporal.vim_workflows")
+
+
+@workflow.defn(name=WORKFLOW_VIM_CREATE, sandboxed=_SANDBOXED)
+class VimCreateWorkflow:
+ """Updates VIM account state by validating the VIM connectivity."""
+
+ @workflow.run
+ async def run(self, input: VimOperationInput) -> None:
+ vim_state = UpdateVimStateInput(input.vim_uuid, VimState.ENABLED, "Done")
+ op_state = UpdateVimOperationStateInput(
+ input.vim_uuid, input.op_id, VimOperationState.COMPLETED, "Done"
+ )
+ try:
+ await workflow.execute_activity(
+ activity=ACTIVITY_TEST_VIM_CONNECTIVITY,
+ arg=TestVimConnectivityInput(input.vim_uuid),
+ activity_id=f"{ACTIVITY_TEST_VIM_CONNECTIVITY}-{input.vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+
+ except ActivityError as e:
+ vim_state = UpdateVimStateInput(
+ input.vim_uuid, VimState.ERROR, e.cause.message
+ )
+ op_state = UpdateVimOperationStateInput(
+ input.vim_uuid, input.op_id, VimOperationState.FAILED, e.cause.message
+ )
+
+ workflow.logger.error(
+ f"{WORKFLOW_VIM_CREATE} failed with {str(e.cause.message)}"
+ )
+ raise e
+
+ except Exception as e:
+ vim_state = UpdateVimStateInput(input.vim_uuid, VimState.ERROR, str(e))
+ op_state = UpdateVimOperationStateInput(
+ input.vim_uuid, input.op_id, VimOperationState.FAILED, str(e)
+ )
+
+ workflow.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {str(e)}")
+ raise e
+
+ finally:
+ await workflow.execute_activity(
+ activity=ACTIVITY_UPDATE_VIM_STATE,
+ arg=vim_state,
+ activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{input.vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+
+ await workflow.execute_activity(
+ activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+ arg=op_state,
+ activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{input.vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+
+
+@workflow.defn(name=WORKFLOW_VIM_UPDATE, sandboxed=_SANDBOXED)
+class VimUpdateWorkflow(VimCreateWorkflow):
+ """Updates VIM account state by validating the VIM connectivity."""
+
+ @workflow.run
+ async def run(self, input: VimOperationInput) -> None:
+ await super().run(input)
+
+
+@workflow.defn(name=WORKFLOW_VIM_DELETE, sandboxed=_SANDBOXED)
+class VimDeleteWorkflow:
+ """Deletes VIM accounts."""
+
+ @workflow.run
+ async def run(self, input: VimOperationInput) -> None:
+ await workflow.execute_activity(
+ activity=ACTIVITY_DELETE_VIM,
+ arg=DeleteVimInput(input.vim_uuid),
+ activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{input.vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )