Add VIM create Workflow 95/12995/18
authorPatricia Reinoso <patricia.reinoso@canonical.com>
Wed, 22 Feb 2023 17:57:53 +0000 (17:57 +0000)
committerMark Beierl <mark.beierl@canonical.com>
Wed, 29 Mar 2023 18:41:17 +0000 (18:41 +0000)
Change-Id: I6b97b13e1d503f1608d241d16ec5bea4887e6e98
Signed-off-by: Patricia Reinoso <patricia.reinoso@canonical.com>
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
osm_nbi/admin_topics.py
osm_nbi/nbi.cfg
osm_nbi/nbi.py
osm_nbi/temporal/__init__.py [new file with mode: 0644]
osm_nbi/temporal/nbi_temporal.py [new file with mode: 0644]
osm_nbi/tests/test_admin_topics.py

index b2def67..6510d5e 100644 (file)
@@ -47,6 +47,7 @@ from osm_nbi.base_topic import BaseTopic, EngineException
 from osm_nbi.authconn import AuthconnNotFoundException, AuthconnConflictException
 from osm_common.dbbase import deep_update_rfc7396
 import copy
+from osm_nbi.temporal.nbi_temporal import NbiTemporal
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -323,6 +324,18 @@ class CommonVimWimSdn(BaseTopic):
 
         return final_content
 
+    def _validate_input_edit(self, input, content, force=False):
+        """
+        Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind
+        :param input: user input content for the new topic
+        :param force: may be used for being more tolerant
+        :return: The same input content, or a changed version of it.
+        """
+
+        if "vim_type" in content:
+            input["vim_type"] = content["vim_type"]
+        return super()._validate_input_edit(input, content, force)
+
     def format_on_edit(self, final_content, edit_content):
         """
         Modifies final_content inserting admin information upon edition
@@ -480,8 +493,15 @@ class CommonVimWimSdn(BaseTopic):
         if session["force"]:
             self.db.del_one(self.topic, {"_id": _id})
             op_id = None
+            message = {"_id": _id, "op_id": op_id}
+            # The vim_type is a temporary hack to shim in temporal workflows in the create
+            if "vim_type" in db_content:
+                message["vim_type"] = db_content["vim_type"]
+
             self._send_msg(
-                "deleted", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg
+                "deleted",
+                message,
+                not_send_msg=not_send_msg,
             )
         else:
             update_dict = {"_admin.to_delete": True}
@@ -496,8 +516,15 @@ class CommonVimWimSdn(BaseTopic):
             op_id = "{}:{}".format(
                 db_content["_id"], len(db_content["_admin"]["operations"])
             )
+            message = {"_id": _id, "op_id": op_id}
+            # The vim_type is a temporary hack to shim in temporal workflows in the create
+            if "vim_type" in db_content:
+                message["vim_type"] = db_content["vim_type"]
+
             self._send_msg(
-                "delete", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg
+                "delete",
+                message,
+                not_send_msg=not_send_msg,
             )
         return op_id
 
@@ -519,6 +546,7 @@ class VimAccountTopic(CommonVimWimSdn):
         ),
     }
     valid_paas_providers = ["juju"]
+    temporal = NbiTemporal()
 
     def check_conflict_on_new(self, session, indata):
         super().check_conflict_on_new(session, indata)
@@ -563,7 +591,9 @@ class VimAccountTopic(CommonVimWimSdn):
 
     def _send_msg(self, action, content, not_send_msg=None):
         if self._is_paas_vim_type(content):
+            self.temporal.start_vim_workflow(action, content)
             return
+
         super()._send_msg(action, content, not_send_msg)
 
 
index 383b462..141d460 100644 (file)
@@ -91,6 +91,10 @@ loglevel:  "DEBUG"
 #logfile: /var/log/osm/nbi-message.log
 group_id: "nbi-server"
 
+[temporal]
+host: "temporal"
+port: 7233
+
 [authentication]
 backend: "keystone"         # internal or keystone or tacacs
 # for keystone backend a comma separated list of user adn project _domain_name list can ba provided.
index fa414f0..fed3205 100644 (file)
@@ -32,6 +32,7 @@ from osm_nbi.validation import ValidationError
 from osm_common.dbbase import DbException
 from osm_common.fsbase import FsException
 from osm_common.msgbase import MsgException
+from osm_common.wftemporal import WFTemporal
 from http import HTTPStatus
 from codecs import getreader
 from os import environ, path
@@ -904,7 +905,7 @@ class Server(object):
         project_name=None,
         ns_id=None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         if topic == "alarms":
             try:
@@ -1385,7 +1386,7 @@ class Server(object):
         _id=None,
         item=None,
         *args,
-        **kwargs
+        **kwargs,
     ):
         token_info = None
         outdata = None
@@ -1854,7 +1855,7 @@ def _start_service():
                 update_dict["server.socket_host"] = v
             elif k1 in ("server", "test", "auth", "log"):
                 update_dict[k1 + "." + k2] = v
-            elif k1 in ("message", "database", "storage", "authentication"):
+            elif k1 in ("message", "database", "storage", "authentication", "temporal"):
                 # k2 = k2.replace('_', '.')
                 if k2 in ("port", "db_port"):
                     engine_config[k1][k2] = int(v)
@@ -1940,6 +1941,10 @@ def _start_service():
     subscription_thread.start()
     # Do not capture except SubscriptionException
 
+    WFTemporal.temporal_api = (
+        f'{engine_config["temporal"]["host"]}:{engine_config["temporal"]["port"]}'
+    )
+
     backend = engine_config["authentication"]["backend"]
     cherrypy.log.error(
         "Starting OSM NBI Version '{} {}' with '{}' authentication backend".format(
diff --git a/osm_nbi/temporal/__init__.py b/osm_nbi/temporal/__init__.py
new file mode 100644 (file)
index 0000000..d36bef6
--- /dev/null
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/osm_nbi/temporal/nbi_temporal.py b/osm_nbi/temporal/nbi_temporal.py
new file mode 100644 (file)
index 0000000..af654ea
--- /dev/null
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+from osm_common.dataclasses.temporal_dataclasses import VimOperationInput
+from osm_common.temporal_constants import (
+    LCM_TASK_QUEUE,
+    WORKFLOW_VIM_CREATE,
+    WORKFLOW_VIM_DELETE,
+    WORKFLOW_VIM_UPDATE,
+)
+from osm_common.wftemporal import WFTemporal
+
+
+class NbiTemporal:
+    workflow_mappings = {
+        "created": WORKFLOW_VIM_CREATE,
+        "edited": WORKFLOW_VIM_UPDATE,
+        "delete": WORKFLOW_VIM_DELETE,
+    }
+
+    def start_vim_workflow(self, action: str, content: dict) -> None:
+        vim_uuid = content["_id"]
+        # Derive the operation id (although for a create it will always be 0)
+        op_id = content["op_id"].split(":")[1]
+
+        workflow = NbiTemporal.workflow_mappings[action]
+
+        if workflow is not None:
+            workflow_data = VimOperationInput(vim_uuid, op_id)
+
+            asyncio.run(
+                WFTemporal(logger_name="nbi.vim_workflow").start_workflow(
+                    task_queue=LCM_TASK_QUEUE,
+                    workflow_name=workflow,
+                    workflow_data=workflow_data,
+                    id=vim_uuid,
+                )
+            )
index ce6a21b..6c8083f 100755 (executable)
@@ -1694,10 +1694,12 @@ class TestVimAccountTopic(TestCase):
             "config": {"paas_provider": "juju"},
         }
         rollback = []
+        self.topic.temporal = Mock()
 
         self.topic.new(rollback, self.fake_session, indata)
         self.assertEqual(len(rollback), 1, "Wrong rollback length")
         self.msg.write.assert_not_called()
+        self.topic.temporal.start_vim_workflow.assert_called_once()
 
     def test_kafka_message_is_sent_if_not_paas_vim(self, mock_common_vim_wim_sdn):
         indata = {