vim_account_new_schema, vim_account_edit_schema, sdn_new_schema, sdn_edit_schema, \
wim_account_new_schema, wim_account_edit_schema, roles_new_schema, roles_edit_schema, \
k8scluster_new_schema, k8scluster_edit_schema, k8srepo_new_schema, k8srepo_edit_schema, \
+ vca_new_schema, vca_edit_schema, \
osmrepo_new_schema, osmrepo_edit_schema, \
validate_input, ValidationError, is_valid_uuid # To check that User/Project Names don't look like UUIDs
from osm_nbi.base_topic import BaseTopic, EngineException
super().check_conflict_on_del(session, _id, db_content)
+class VcaTopic(CommonVimWimSdn):
+ topic = "vca"
+ topic_msg = "vca"
+ schema_new = vca_new_schema
+ schema_edit = vca_edit_schema
+ multiproject = True
+ password_to_encrypt = None
+
+ def format_on_new(self, content, project_id=None, make_public=False):
+ oid = super().format_on_new(content, project_id, make_public)
+ content["schema_version"] = schema_version = "1.11"
+ for key in ["secret", "cacert"]:
+ content[key] = self.db.encrypt(
+ content[key],
+ schema_version=schema_version,
+ salt=content["_id"]
+ )
+ return oid
+
+ def format_on_edit(self, final_content, edit_content):
+ oid = super().format_on_edit(final_content, edit_content)
+ schema_version = final_content.get("schema_version")
+ for key in ["secret", "cacert"]:
+ if key in edit_content:
+ final_content[key] = self.db.encrypt(
+ edit_content[key],
+ schema_version=schema_version,
+ salt=final_content["_id"]
+ )
+ return oid
+
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check if deletion can be done because of dependencies if it is not force. To override
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: internal _id
+ :param db_content: The database content of this item _id
+ :return: None if ok or raises EngineException with the conflict
+ """
+ if session["force"]:
+ return
+ # check if used by VNF
+ filter_q = {"vca": _id}
+ if session["project_id"]:
+ filter_q["_admin.projects_read.cont"] = session["project_id"]
+ if self.db.get_list("vim_accounts", filter_q):
+ raise EngineException("There is at least one VIM account using this vca", http_code=HTTPStatus.CONFLICT)
+ super().check_conflict_on_del(session, _id, db_content)
+
+
class K8sRepoTopic(CommonVimWimSdn):
topic = "k8srepos"
topic_msg = "k8srepo"
from osm_nbi.base_topic import EngineException, versiontuple
from osm_nbi.admin_topics import VimAccountTopic, WimAccountTopic, SdnTopic
from osm_nbi.admin_topics import K8sClusterTopic, K8sRepoTopic, OsmRepoTopic
+from osm_nbi.admin_topics import VcaTopic
from osm_nbi.admin_topics import UserTopicAuth, ProjectTopicAuth, RoleTopicAuth
from osm_nbi.descriptor_topics import VnfdTopic, NsdTopic, PduTopic, NstTopic, VnfPkgOpTopic
from osm_nbi.instance_topics import NsrTopic, VnfrTopic, NsLcmOpTopic, NsiTopic, NsiLcmOpTopic
"wim_accounts": WimAccountTopic,
"sdns": SdnTopic,
"k8sclusters": K8sClusterTopic,
+ "vca": VcaTopic,
"k8srepos": K8sRepoTopic,
"osmrepos": OsmRepoTopic,
"users": UserTopicAuth, # Valid for both internal and keystone authentication backends
"vnfd-ref": vnfd_id,
"vnfd-id": vnfd["_id"], # not at OSM model, but useful
"vim-account-id": None,
+ "vca-id": None,
"vdur": [],
"connection-point": [],
"ip-address": None, # mgmt-interface filled by LCM
# update vim-account-id
vim_account = indata["vimAccountId"]
+ vca_id = indata.get("vcaId")
# check instantiate parameters
for vnf_inst_params in get_iterable(indata.get("vnf")):
if vnf_inst_params["member-vnf-index"] != member_vnf_index:
continue
if vnf_inst_params.get("vimAccountId"):
vim_account = vnf_inst_params.get("vimAccountId")
+ if vnf_inst_params.get("vcaId"):
+ vca_id = vnf_inst_params.get("vcaId")
# get vnf.vdu.interface instantiation params to update vnfr.vdur.interfaces ip, mac
for vdu_inst_param in get_iterable(vnf_inst_params.get("vdu")):
vnfr_update["vim-account-id"] = vim_account
vnfr_update_rollback["vim-account-id"] = vnfr.get("vim-account-id")
+ if vca_id:
+ vnfr_update["vca-id"] = vca_id
+ vnfr_update_rollback["vca-id"] = vnfr.get("vca-id")
+
# get pdu
ifaces_forcing_vim_network = self._look_for_pdu(session, rollback, vnfr, vim_account, vnfr_update,
vnfr_update_rollback)
"ROLE_PERMISSION": "k8sclusters:id:"
}
},
+ "vca": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vca:",
+ "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "vca:id:"
+ }
+ },
"k8srepos": {"METHODS": ("GET", "POST"),
"ROLE_PERMISSION": "k8srepos:",
"<ID>": {"METHODS": ("GET", "DELETE"),
"PATCH /admin/v1/k8sclusters/<id>": "k8sclusters:id:patch"
+################################################################################
+####################################### VCAs ###################################
+################################################################################
+
+ "GET /admin/v1/vca": "vca:get"
+
+ "POST /admin/v1/vca": "vca:post"
+
+ "GET /admin/v1/vca/<id>": "vca:id:get"
+
+ "DELETE /admin/v1/vca/<id>": "vca:id:delete"
+
+ "PATCH /admin/v1/vca/<id>": "vca:id:patch"
+
################################################################################
################################# K8s Repos ##############################
################################################################################
k8sclusters: false
k8sclusters:get: true
k8sclusters:id:get: true
+ # VCA
+ vca: false
+ vca:get: true
+ vca:id:get: true
# K8s repos
k8srepos: true
# OSM repos
import unittest
from unittest import TestCase
-from unittest.mock import Mock
+from unittest.mock import Mock, patch, call
from uuid import uuid4
from http import HTTPStatus
from time import time
from random import randint
from osm_common import dbbase, fsbase, msgbase
from osm_nbi import authconn, validation
-from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn
+from osm_nbi.admin_topics import (
+ ProjectTopicAuth,
+ RoleTopicAuth,
+ UserTopicAuth,
+ CommonVimWimSdn,
+ VcaTopic,
+)
from osm_nbi.engine import EngineException
from osm_nbi.authconn import AuthconnNotFoundException
return ' '.join(str.strip().split()).lower()
+class TestVcaTopic(TestCase):
+ def setUp(self):
+ self.db = Mock(dbbase.DbBase())
+ self.fs = Mock(fsbase.FsBase())
+ self.msg = Mock(msgbase.MsgBase())
+ self.auth = Mock(authconn.Authconn(None, None, None))
+ self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
+
+ @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
+ def test_format_on_new(self, mock_super_format_on_new):
+ content = {
+ "_id": "id",
+ "secret": "encrypted_secret",
+ "cacert": "encrypted_cacert",
+ }
+ self.db.encrypt.side_effect = ["secret", "cacert"]
+ mock_super_format_on_new.return_value = "1234"
+
+ oid = self.vca_topic.format_on_new(content)
+
+ self.assertEqual(oid, "1234")
+ self.assertEqual(content["secret"], "secret")
+ self.assertEqual(content["cacert"], "cacert")
+ self.db.encrypt.assert_has_calls(
+ [
+ call("encrypted_secret", schema_version="1.11", salt="id"),
+ call("encrypted_cacert", schema_version="1.11", salt="id"),
+ ]
+ )
+ mock_super_format_on_new.assert_called_with(content, None, False)
+
+ @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
+ def test_format_on_edit(self, mock_super_format_on_edit):
+ edit_content = {
+ "_id": "id",
+ "secret": "encrypted_secret",
+ "cacert": "encrypted_cacert",
+ }
+ final_content = {
+ "_id": "id",
+ "schema_version": "1.11",
+ }
+ self.db.encrypt.side_effect = ["secret", "cacert"]
+ mock_super_format_on_edit.return_value = "1234"
+
+ oid = self.vca_topic.format_on_edit(final_content, edit_content)
+
+ self.assertEqual(oid, "1234")
+ self.assertEqual(final_content["secret"], "secret")
+ self.assertEqual(final_content["cacert"], "cacert")
+ self.db.encrypt.assert_has_calls(
+ [
+ call("encrypted_secret", schema_version="1.11", salt="id"),
+ call("encrypted_cacert", schema_version="1.11", salt="id"),
+ ]
+ )
+ mock_super_format_on_edit.assert_called()
+
+ @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
+ def test_check_conflict_on_del(self, mock_check_conflict_on_del):
+ session = {
+ "project_id": "project-id",
+ "force": False,
+ }
+ _id = "vca-id"
+ db_content = {}
+
+ self.db.get_list.return_value = None
+
+ self.vca_topic.check_conflict_on_del(session, _id, db_content)
+
+ self.db.get_list.assert_called_with(
+ "vim_accounts",
+ {"vca": _id, '_admin.projects_read.cont': 'project-id'},
+ )
+ mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
+
+ @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
+ def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
+ session = {
+ "project_id": "project-id",
+ "force": True,
+ }
+ _id = "vca-id"
+ db_content = {}
+
+ self.vca_topic.check_conflict_on_del(session, _id, db_content)
+
+ self.db.get_list.assert_not_called()
+ mock_check_conflict_on_del.assert_not_called()
+
+ @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
+ def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
+ session = {
+ "project_id": "project-id",
+ "force": False,
+ }
+ _id = "vca-id"
+ db_content = {}
+
+ self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
+
+ with self.assertRaises(EngineException) as context:
+ self.vca_topic.check_conflict_on_del(session, _id, db_content)
+ self.assertEqual(
+ context.exception,
+ EngineException(
+ "There is at least one VIM account using this vca",
+ http_code=HTTPStatus.CONFLICT
+ )
+ )
+
+ self.db.get_list.assert_called_with(
+ "vim_accounts",
+ {"vca": _id, '_admin.projects_read.cont': 'project-id'},
+ )
+ mock_check_conflict_on_del.assert_not_called()
+
+
class Test_ProjectTopicAuth(TestCase):
@classmethod
string_schema = {"type": "string", "minLength": 1, "maxLength": 255}
xml_text_schema = {"type": "string", "minLength": 1, "maxLength": 1000, "pattern": "^[^']+$"}
description_schema = {"type": ["string", "null"], "maxLength": 255, "pattern": "^[^'\"]+$"}
+long_description_schema = {"type": ["string", "null"], "maxLength": 3000, "pattern": "^[^'\"]+$"}
id_schema_fake = {"type": "string", "minLength": 2, "maxLength": 36}
bool_schema = {"type": "boolean"}
null_schema = {"type": "null"}
"items": shortname_schema,
}
+description_list_schema = {
+ "type": "array",
+ "minItems": 1,
+ "items": description_schema,
+}
ns_instantiate_vdu = {
"title": "ns action instantiate input schema for vdu",
"nsName": name_schema,
"nsDescription": {"oneOf": [description_schema, null_schema]},
"nsdId": id_schema,
+ "vcaId": id_schema,
"vimAccountId": id_schema,
"wimAccountId": {"oneOf": [id_schema, bool_schema, null_schema]},
"placement-engine": string_schema,
"properties": {
"member-vnf-index": name_schema,
"vimAccountId": id_schema,
+ "vcaId": id_schema,
"vdu": {
"type": "array",
"minItems": 1,
"vim_tenant_name": name_schema,
"vim_user": shortname_schema,
"vim_password": passwd_schema,
+ "vca": id_schema,
"config": {"type": "object"}
},
"additionalProperties": False
"vim_tenant_name": name_schema,
"vim_user": shortname_schema,
"vim_password": passwd_schema,
+ "vca": id_schema,
"config": {"type": "object"}
},
"required": ["name", "vim_url", "vim_type", "vim_user", "vim_password", "vim_tenant_name"],
"description": description_schema,
"credentials": object_schema,
"vim_account": id_schema,
+ "vca_id": id_schema,
"k8s_version": string_schema,
"nets": k8scluster_nets_schema,
"namespace": name_schema,
"description": description_schema,
"credentials": object_schema,
"vim_account": id_schema,
+ "vca_id": id_schema,
"k8s_version": string_schema,
"nets": k8scluster_nets_schema,
"namespace": name_schema,
"additionalProperties": False
}
+# VCA
+vca_new_schema = {
+ "title": "vca creation input schema",
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "schema_version": schema_version,
+ "schema_type": schema_type,
+ "name": name_schema,
+ "description": description_schema,
+ "endpoints": description_list_schema,
+ "user": shortname_schema,
+ "secret": passwd_schema,
+ "cacert": long_description_schema,
+ "lxd-cloud": shortname_schema,
+ "lxd-credentials": shortname_schema,
+ "k8s-cloud": shortname_schema,
+ "k8s-credentials": shortname_schema,
+ "model-config": object_schema,
+ },
+ "required": [
+ "name",
+ "endpoints",
+ "user",
+ "secret",
+ "cacert",
+ "lxd-cloud",
+ "lxd-credentials",
+ "k8s-cloud",
+ "k8s-credentials",
+ ],
+ "additionalProperties": False,
+}
+vca_edit_schema = {
+ "title": "vca creation input schema",
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "name": name_schema,
+ "description": description_schema,
+ "endpoints": description_list_schema,
+ "port": integer1_schema,
+ "user": shortname_schema,
+ "secret": passwd_schema,
+ "cacert": long_description_schema,
+ "lxd-cloud": shortname_schema,
+ "lxd-credentials": shortname_schema,
+ "k8s-cloud": shortname_schema,
+ "k8s-credentials": shortname_schema,
+ "model-config": object_schema,
+ },
+ "additionalProperties": False,
+}
+
# K8s Repos
k8srepo_types = {"enum": ["helm-chart", "juju-bundle"]}
k8srepo_properties = {
# PROJECTS
topics_with_quota = ["vnfds", "nsds", "slice_templates", "pduds", "ns_instances", "slice_instances", "vim_accounts",
- "wim_accounts", "sdn_controllers", "k8sclusters", "k8srepos", "osmrepos", "ns_subscriptions"]
+ "wim_accounts", "sdn_controllers", "k8sclusters", "vca", "k8srepos", "osmrepos", "ns_subscriptions"]
project_new_schema = {
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "New project schema for administrators",