from osm_nbi.authconn import AuthconnNotFoundException, AuthconnConflictException
from osm_common.dbbase import deep_update_rfc7396
import copy
+from osm_nbi.temporal.nbi_temporal import NbiTemporal
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
return final_content
+ def _validate_input_edit(self, input, content, force=False):
+ """
+ Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind
+ :param input: user input content for the new topic
+ :param force: may be used for being more tolerant
+ :return: The same input content, or a changed version of it.
+ """
+
+ if "vim_type" in content:
+ input["vim_type"] = content["vim_type"]
+ return super()._validate_input_edit(input, content, force)
+
def format_on_edit(self, final_content, edit_content):
"""
Modifies final_content inserting admin information upon edition
if session["force"]:
self.db.del_one(self.topic, {"_id": _id})
op_id = None
+ message = {"_id": _id, "op_id": op_id}
+ # The vim_type is a temporary hack to shim in temporal workflows in the create
+ if "vim_type" in db_content:
+ message["vim_type"] = db_content["vim_type"]
+
self._send_msg(
- "deleted", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg
+ "deleted",
+ message,
+ not_send_msg=not_send_msg,
)
else:
update_dict = {"_admin.to_delete": True}
op_id = "{}:{}".format(
db_content["_id"], len(db_content["_admin"]["operations"])
)
+ message = {"_id": _id, "op_id": op_id}
+ # The vim_type is a temporary hack to shim in temporal workflows in the create
+ if "vim_type" in db_content:
+ message["vim_type"] = db_content["vim_type"]
+
self._send_msg(
- "delete", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg
+ "delete",
+ message,
+ not_send_msg=not_send_msg,
)
return op_id
),
}
valid_paas_providers = ["juju"]
+ temporal = NbiTemporal()
def check_conflict_on_new(self, session, indata):
super().check_conflict_on_new(session, indata)
def _send_msg(self, action, content, not_send_msg=None):
if self._is_paas_vim_type(content):
+ self.temporal.start_vim_workflow(action, content)
return
+
super()._send_msg(action, content, not_send_msg)
#logfile: /var/log/osm/nbi-message.log
group_id: "nbi-server"
+[temporal]
+host: "temporal"
+port: 7233
+
[authentication]
backend: "keystone" # internal or keystone or tacacs
# for keystone backend a comma separated list of user adn project _domain_name list can ba provided.
from osm_common.dbbase import DbException
from osm_common.fsbase import FsException
from osm_common.msgbase import MsgException
+from osm_common.wftemporal import WFTemporal
from http import HTTPStatus
from codecs import getreader
from os import environ, path
project_name=None,
ns_id=None,
*args,
- **kwargs
+ **kwargs,
):
if topic == "alarms":
try:
_id=None,
item=None,
*args,
- **kwargs
+ **kwargs,
):
token_info = None
outdata = None
update_dict["server.socket_host"] = v
elif k1 in ("server", "test", "auth", "log"):
update_dict[k1 + "." + k2] = v
- elif k1 in ("message", "database", "storage", "authentication"):
+ elif k1 in ("message", "database", "storage", "authentication", "temporal"):
# k2 = k2.replace('_', '.')
if k2 in ("port", "db_port"):
engine_config[k1][k2] = int(v)
subscription_thread.start()
# Do not capture except SubscriptionException
+ WFTemporal.temporal_api = (
+ f'{engine_config["temporal"]["host"]}:{engine_config["temporal"]["port"]}'
+ )
+
backend = engine_config["authentication"]["backend"]
cherrypy.log.error(
"Starting OSM NBI Version '{} {}' with '{}' authentication backend".format(
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+from osm_common.dataclasses.temporal_dataclasses import VimOperationInput
+from osm_common.temporal_constants import (
+ LCM_TASK_QUEUE,
+ WORKFLOW_VIM_CREATE,
+ WORKFLOW_VIM_DELETE,
+ WORKFLOW_VIM_UPDATE,
+)
+from osm_common.wftemporal import WFTemporal
+
+
+class NbiTemporal:
+ workflow_mappings = {
+ "created": WORKFLOW_VIM_CREATE,
+ "edited": WORKFLOW_VIM_UPDATE,
+ "delete": WORKFLOW_VIM_DELETE,
+ }
+
+ def start_vim_workflow(self, action: str, content: dict) -> None:
+ vim_uuid = content["_id"]
+ # Derive the operation id (although for a create it will always be 0)
+ op_id = content["op_id"].split(":")[1]
+
+ workflow = NbiTemporal.workflow_mappings[action]
+
+ if workflow is not None:
+ workflow_data = VimOperationInput(vim_uuid, op_id)
+
+ asyncio.run(
+ WFTemporal(logger_name="nbi.vim_workflow").start_workflow(
+ task_queue=LCM_TASK_QUEUE,
+ workflow_name=workflow,
+ workflow_data=workflow_data,
+ id=vim_uuid,
+ )
+ )
"config": {"paas_provider": "juju"},
}
rollback = []
+ self.topic.temporal = Mock()
self.topic.new(rollback, self.fake_session, indata)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
self.msg.write.assert_not_called()
+ self.topic.temporal.start_vim_workflow.assert_called_once()
def test_kafka_message_is_sent_if_not_paas_vim(self, mock_common_vim_wim_sdn):
indata = {