From: Patricia Reinoso Date: Wed, 22 Feb 2023 17:57:53 +0000 (+0000) Subject: Add VIM create Workflow X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F95%2F12995%2F18;p=osm%2FNBI.git Add VIM create Workflow Change-Id: I6b97b13e1d503f1608d241d16ec5bea4887e6e98 Signed-off-by: Patricia Reinoso Signed-off-by: Mark Beierl --- diff --git a/osm_nbi/admin_topics.py b/osm_nbi/admin_topics.py index b2def67..6510d5e 100644 --- a/osm_nbi/admin_topics.py +++ b/osm_nbi/admin_topics.py @@ -47,6 +47,7 @@ from osm_nbi.base_topic import BaseTopic, EngineException from osm_nbi.authconn import AuthconnNotFoundException, AuthconnConflictException from osm_common.dbbase import deep_update_rfc7396 import copy +from osm_nbi.temporal.nbi_temporal import NbiTemporal __author__ = "Alfonso Tierno " @@ -323,6 +324,18 @@ class CommonVimWimSdn(BaseTopic): return final_content + def _validate_input_edit(self, input, content, force=False): + """ + Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind + :param input: user input content for the new topic + :param force: may be used for being more tolerant + :return: The same input content, or a changed version of it. + """ + + if "vim_type" in content: + input["vim_type"] = content["vim_type"] + return super()._validate_input_edit(input, content, force) + def format_on_edit(self, final_content, edit_content): """ Modifies final_content inserting admin information upon edition @@ -480,8 +493,15 @@ class CommonVimWimSdn(BaseTopic): if session["force"]: self.db.del_one(self.topic, {"_id": _id}) op_id = None + message = {"_id": _id, "op_id": op_id} + # The vim_type is a temporary hack to shim in temporal workflows in the create + if "vim_type" in db_content: + message["vim_type"] = db_content["vim_type"] + self._send_msg( - "deleted", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg + "deleted", + message, + not_send_msg=not_send_msg, ) else: update_dict = {"_admin.to_delete": True} @@ -496,8 +516,15 @@ class CommonVimWimSdn(BaseTopic): op_id = "{}:{}".format( db_content["_id"], len(db_content["_admin"]["operations"]) ) + message = {"_id": _id, "op_id": op_id} + # The vim_type is a temporary hack to shim in temporal workflows in the create + if "vim_type" in db_content: + message["vim_type"] = db_content["vim_type"] + self._send_msg( - "delete", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg + "delete", + message, + not_send_msg=not_send_msg, ) return op_id @@ -519,6 +546,7 @@ class VimAccountTopic(CommonVimWimSdn): ), } valid_paas_providers = ["juju"] + temporal = NbiTemporal() def check_conflict_on_new(self, session, indata): super().check_conflict_on_new(session, indata) @@ -563,7 +591,9 @@ class VimAccountTopic(CommonVimWimSdn): def _send_msg(self, action, content, not_send_msg=None): if self._is_paas_vim_type(content): + self.temporal.start_vim_workflow(action, content) return + super()._send_msg(action, content, not_send_msg) diff --git a/osm_nbi/nbi.cfg b/osm_nbi/nbi.cfg index 383b462..141d460 100644 --- a/osm_nbi/nbi.cfg +++ b/osm_nbi/nbi.cfg @@ -91,6 +91,10 @@ loglevel: "DEBUG" #logfile: /var/log/osm/nbi-message.log group_id: "nbi-server" +[temporal] +host: "temporal" +port: 7233 + [authentication] backend: "keystone" # internal or keystone or tacacs # for keystone backend a comma separated list of user adn project _domain_name list can ba provided. diff --git a/osm_nbi/nbi.py b/osm_nbi/nbi.py index fa414f0..fed3205 100644 --- a/osm_nbi/nbi.py +++ b/osm_nbi/nbi.py @@ -32,6 +32,7 @@ from osm_nbi.validation import ValidationError from osm_common.dbbase import DbException from osm_common.fsbase import FsException from osm_common.msgbase import MsgException +from osm_common.wftemporal import WFTemporal from http import HTTPStatus from codecs import getreader from os import environ, path @@ -904,7 +905,7 @@ class Server(object): project_name=None, ns_id=None, *args, - **kwargs + **kwargs, ): if topic == "alarms": try: @@ -1385,7 +1386,7 @@ class Server(object): _id=None, item=None, *args, - **kwargs + **kwargs, ): token_info = None outdata = None @@ -1854,7 +1855,7 @@ def _start_service(): update_dict["server.socket_host"] = v elif k1 in ("server", "test", "auth", "log"): update_dict[k1 + "." + k2] = v - elif k1 in ("message", "database", "storage", "authentication"): + elif k1 in ("message", "database", "storage", "authentication", "temporal"): # k2 = k2.replace('_', '.') if k2 in ("port", "db_port"): engine_config[k1][k2] = int(v) @@ -1940,6 +1941,10 @@ def _start_service(): subscription_thread.start() # Do not capture except SubscriptionException + WFTemporal.temporal_api = ( + f'{engine_config["temporal"]["host"]}:{engine_config["temporal"]["port"]}' + ) + backend = engine_config["authentication"]["backend"] cherrypy.log.error( "Starting OSM NBI Version '{} {}' with '{}' authentication backend".format( diff --git a/osm_nbi/temporal/__init__.py b/osm_nbi/temporal/__init__.py new file mode 100644 index 0000000..d36bef6 --- /dev/null +++ b/osm_nbi/temporal/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/osm_nbi/temporal/nbi_temporal.py b/osm_nbi/temporal/nbi_temporal.py new file mode 100644 index 0000000..af654ea --- /dev/null +++ b/osm_nbi/temporal/nbi_temporal.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from osm_common.dataclasses.temporal_dataclasses import VimOperationInput +from osm_common.temporal_constants import ( + LCM_TASK_QUEUE, + WORKFLOW_VIM_CREATE, + WORKFLOW_VIM_DELETE, + WORKFLOW_VIM_UPDATE, +) +from osm_common.wftemporal import WFTemporal + + +class NbiTemporal: + workflow_mappings = { + "created": WORKFLOW_VIM_CREATE, + "edited": WORKFLOW_VIM_UPDATE, + "delete": WORKFLOW_VIM_DELETE, + } + + def start_vim_workflow(self, action: str, content: dict) -> None: + vim_uuid = content["_id"] + # Derive the operation id (although for a create it will always be 0) + op_id = content["op_id"].split(":")[1] + + workflow = NbiTemporal.workflow_mappings[action] + + if workflow is not None: + workflow_data = VimOperationInput(vim_uuid, op_id) + + asyncio.run( + WFTemporal(logger_name="nbi.vim_workflow").start_workflow( + task_queue=LCM_TASK_QUEUE, + workflow_name=workflow, + workflow_data=workflow_data, + id=vim_uuid, + ) + ) diff --git a/osm_nbi/tests/test_admin_topics.py b/osm_nbi/tests/test_admin_topics.py index ce6a21b..6c8083f 100755 --- a/osm_nbi/tests/test_admin_topics.py +++ b/osm_nbi/tests/test_admin_topics.py @@ -1694,10 +1694,12 @@ class TestVimAccountTopic(TestCase): "config": {"paas_provider": "juju"}, } rollback = [] + self.topic.temporal = Mock() self.topic.new(rollback, self.fake_session, indata) self.assertEqual(len(rollback), 1, "Wrong rollback length") self.msg.write.assert_not_called() + self.topic.temporal.start_vim_workflow.assert_called_once() def test_kafka_message_is_sent_if_not_paas_vim(self, mock_common_vim_wim_sdn): indata = {