# See the License for the specific language governing permissions and
# limitations under the License.
+from pyrage import x25519
import logging
+import random
+import string
from uuid import uuid4
from http import HTTPStatus
from time import time
-from osm_common.dbbase import deep_update_rfc7396
+from osm_common.dbbase import deep_update_rfc7396, DbException
+from osm_common.msgbase import MsgException
+from osm_common.fsbase import FsException
from osm_nbi.validation import validate_input, ValidationError, is_valid_uuid
from yaml import safe_load, YAMLError
super(Exception, self).__init__(message)
+class NBIBadArgumentsException(Exception):
+ """
+ Bad argument values exception
+ """
+
+ def __init__(self, message: str = "", bad_args: list = None):
+ Exception.__init__(self, message)
+ self.message = message
+ self.bad_args = bad_args
+
+ def __str__(self):
+ return "{}, Bad arguments: {}".format(self.message, self.bad_args)
+
+
def deep_get(target_dict, key_list):
"""
Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
return target_dict
+def detect_descriptor_usage(descriptor: dict, db_collection: str, db: object) -> bool:
+ """Detect the descriptor usage state.
+
+ Args:
+ descriptor (dict): VNF or NS Descriptor as dictionary
+ db_collection (str): collection name which is looked for in DB
+ db (object): name of db object
+
+ Returns:
+ True if descriptor is in use else None
+
+ """
+ try:
+ if not descriptor:
+ raise NBIBadArgumentsException(
+ "Argument is mandatory and can not be empty", "descriptor"
+ )
+
+ if not db:
+ raise NBIBadArgumentsException("A valid DB object should be provided", "db")
+
+ search_dict = {
+ "vnfds": ("vnfrs", "vnfd-id"),
+ "nsds": ("nsrs", "nsd-id"),
+ "ns_config_template": ("ns_config_template", "_id"),
+ }
+
+ if db_collection not in search_dict:
+ raise NBIBadArgumentsException(
+ "db_collection should be equal to vnfds or nsds", "db_collection"
+ )
+
+ record_list = db.get_list(
+ search_dict[db_collection][0],
+ {search_dict[db_collection][1]: descriptor["_id"]},
+ )
+
+ if record_list:
+ return True
+
+ except (DbException, KeyError, NBIBadArgumentsException) as error:
+ raise EngineException(
+ f"Error occured while detecting the descriptor usage: {error}"
+ )
+
+
+def update_descriptor_usage_state(
+ descriptor: dict, db_collection: str, db: object
+) -> None:
+ """Updates the descriptor usage state.
+
+ Args:
+ descriptor (dict): VNF or NS Descriptor as dictionary
+ db_collection (str): collection name which is looked for in DB
+ db (object): name of db object
+
+ Returns:
+ None
+
+ """
+ try:
+ descriptor_update = {
+ "_admin.usageState": "NOT_IN_USE",
+ }
+
+ if detect_descriptor_usage(descriptor, db_collection, db):
+ descriptor_update = {
+ "_admin.usageState": "IN_USE",
+ }
+
+ db.set_one(
+ db_collection, {"_id": descriptor["_id"]}, update_dict=descriptor_update
+ )
+
+ except (DbException, KeyError, NBIBadArgumentsException) as error:
+ raise EngineException(
+ f"Error occured while updating the descriptor usage state: {error}"
+ )
+
+
def get_iterable(input_var):
"""
Returns an iterable, in case input_var is None it just returns an empty tuple
self.db = db
self.fs = fs
self.msg = msg
- self.logger = logging.getLogger("nbi.engine")
+ self.logger = logging.getLogger("nbi.base")
self.auth = auth
@staticmethod
content["_admin"]["projects_write"] = list(project_id)
return None
+ @staticmethod
+ def format_on_operation(content, operation_type, operation_params):
+ if content["current_operation"] is None:
+ op_id = str(uuid4())
+ content["current_operation"] = op_id
+ else:
+ op_id = content["current_operation"]
+ now = time()
+ if "operationHistory" not in content:
+ content["operationHistory"] = []
+
+ operation = {}
+ operation["operationType"] = operation_type
+ operation["git_operation_info"] = None
+ operation["op_id"] = op_id
+ operation["result"] = None
+ operation["workflowState"] = "PROCESSING"
+ operation["resourceState"] = "NOT_READY"
+ operation["creationDate"] = now
+ operation["endDate"] = None
+ operation["operationParams"] = operation_params
+
+ content["operationHistory"].append(operation)
+ return op_id
+
@staticmethod
def format_on_edit(final_content, edit_content):
"""
# Only perform SOL005 projection if we are serving an external request
if api_req:
self.sol005_projection(data)
-
return data
# TODO transform data for SOL005 URL requests
HTTPStatus.INTERNAL_SERVER_ERROR,
)
+ def create_gitname(self, content, session, _id=None):
+ if not self.multiproject:
+ _filter = {}
+ else:
+ _filter = self._get_project_filter(session)
+ _filter["git_name"] = content["name"]
+ if _id:
+ _filter["_id.neq"] = _id
+ if self.db.get_one(
+ self.topic, _filter, fail_on_empty=False, fail_on_more=False
+ ):
+ n = 5
+ # using random.choices()
+ # generating random strings
+ res = "".join(random.choices(string.ascii_lowercase + string.digits, k=n))
+ res1 = content["name"]
+ new_name1 = res1 + res
+ new_name = new_name1.lower()
+ return new_name
+ else:
+ return content["name"]
+
+ def new_profile(self, rollback, session, indata=None, kwargs=None, headers=None):
+ step = "name unique check"
+ try:
+ self.check_unique_name(session, indata["name"])
+
+ step = "validating input parameters"
+ profile_request = self._remove_envelop(indata)
+ self._update_input_with_kwargs(profile_request, kwargs)
+ profile_request = self._validate_input_new(
+ profile_request, session["force"]
+ )
+ operation_params = profile_request
+
+ step = "filling profile details from input data"
+ profile_create = self._create_profile(profile_request, session)
+
+ step = "creating profile at database"
+ self.format_on_new(
+ profile_create, session["project_id"], make_public=session["public"]
+ )
+ profile_create["current_operation"] = None
+ op_id = self.format_on_operation(
+ profile_create,
+ "create",
+ operation_params,
+ )
+
+ _id = self.db.create(self.topic, profile_create)
+ pubkey, privkey = self._generate_age_key()
+ profile_create["age_pubkey"] = self.db.encrypt(
+ pubkey, schema_version="1.11", salt=_id
+ )
+ profile_create["age_privkey"] = self.db.encrypt(
+ privkey, schema_version="1.11", salt=_id
+ )
+ rollback.append({"topic": self.topic, "_id": _id})
+ self.db.set_one(self.topic, {"_id": _id}, profile_create)
+ if op_id:
+ profile_create["op_id"] = op_id
+ self._send_msg("profile_create", {"profile_id": _id, "operation_id": op_id})
+
+ return _id, None
+ except (
+ ValidationError,
+ EngineException,
+ DbException,
+ MsgException,
+ FsException,
+ ) as e:
+ raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
+
+ def _create_profile(self, profile_request, session):
+ profile_desc = {
+ "name": profile_request["name"],
+ "description": profile_request["description"],
+ "default": False,
+ "git_name": self.create_gitname(profile_request, session),
+ "state": "IN_CREATION",
+ "operatingState": "IN_PROGRESS",
+ "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
+ }
+ return profile_desc
+
+ def default_profile(
+ self, rollback, session, indata=None, kwargs=None, headers=None
+ ):
+ step = "validating input parameters"
+ try:
+ profile_request = self._remove_envelop(indata)
+ self._update_input_with_kwargs(profile_request, kwargs)
+ operation_params = profile_request
+
+ step = "filling profile details from input data"
+ profile_create = self._create_default_profile(profile_request, session)
+
+ step = "creating profile at database"
+ self.format_on_new(
+ profile_create, session["project_id"], make_public=session["public"]
+ )
+ profile_create["current_operation"] = None
+ self.format_on_operation(
+ profile_create,
+ "create",
+ operation_params,
+ )
+ _id = self.db.create(self.topic, profile_create)
+ rollback.append({"topic": self.topic, "_id": _id})
+ return _id
+ except (
+ ValidationError,
+ EngineException,
+ DbException,
+ MsgException,
+ FsException,
+ ) as e:
+ raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
+
+ def _create_default_profile(self, profile_request, session):
+ profile_desc = {
+ "name": profile_request["name"],
+ "description": f"{self.topic} profile for cluster {profile_request['name']}",
+ "default": True,
+ "git_name": self.create_gitname(profile_request, session),
+ "state": "IN_CREATION",
+ "operatingState": "IN_PROGRESS",
+ "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
+ }
+ return profile_desc
+
def delete_list(self, session, filter_q=None):
"""
Delete a several entries of a topic. This is for internal usage and test only, not exposed to NBI API
:param not_send_msg: To not send message (False) or store content (list) instead
:return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
"""
-
# To allow addressing projects and users by name AS WELL AS by _id
if not self.multiproject:
filter_q = {}
else:
filter_q = self._get_project_filter(session)
filter_q[self.id_field(self.topic, _id)] = _id
+
item_content = self.db.get_one(self.topic, filter_q)
+ nsd_id = item_content.get("_id")
+
+ if (
+ self.topic == "k8sinfra_controller"
+ or self.topic == "k8sinfra_config"
+ or self.topic == "k8sapp"
+ or self.topic == "k8sresource"
+ or self.topic == "clusters"
+ ):
+ if "state" in item_content:
+ item_content["state"] = "IN_DELETION"
+ item_content["operatingState"] = "PROCESSING"
+ self.db.set_one(self.topic, {"_id": _id}, item_content)
+
+ item_content_1 = self.db.get_one(self.topic, filter_q)
+ item_content_1["current_operation"] = None
+ op_id = self.format_on_operation(
+ item_content_1,
+ "delete",
+ None,
+ )
self.check_conflict_on_del(session, _id, item_content)
+
+ # While deteling ns descriptor associated ns config template should also get deleted.
+ if self.topic == "nsds":
+ ns_config_template_content = self.db.get_list(
+ "ns_config_template", {"nsdId": _id}
+ )
+ for template_content in ns_config_template_content:
+ if template_content is not None:
+ if template_content.get("nsdId") == nsd_id:
+ ns_config_template_id = template_content.get("_id")
+ self.db.del_one("ns_config_template", {"nsdId": nsd_id})
+ self.delete_extra(
+ session,
+ ns_config_template_id,
+ template_content,
+ not_send_msg=not_send_msg,
+ )
if dry_run:
return None
-
if self.multiproject and session["project_id"]:
# remove reference from project_read if there are more projects referencing it. If it last one,
# do not remove reference, but delete
),
None,
)
-
# check if there are projects referencing it (apart from ANY, that means, public)....
if other_projects_referencing:
# remove references but not delete
"You have not write permission to delete it",
http_code=HTTPStatus.UNAUTHORIZED,
)
-
# delete
- self.db.del_one(self.topic, filter_q)
- self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg)
- self._send_msg("deleted", {"_id": _id}, not_send_msg=not_send_msg)
- return None
+ if (
+ self.topic == "k8sinfra_controller"
+ or self.topic == "k8sinfra_config"
+ or self.topic == "k8sapp"
+ or self.topic == "k8sresource"
+ ):
+ self.db.set_one(self.topic, {"_id": _id}, item_content_1)
+ self._send_msg(
+ "delete",
+ {"profile_id": _id, "operation_id": op_id},
+ not_send_msg=not_send_msg,
+ )
+ elif self.topic == "clusters":
+ self.db.set_one("clusters", {"_id": _id}, item_content_1)
+ self._send_msg(
+ "delete",
+ {"cluster_id": _id, "operation_id": op_id},
+ not_send_msg=not_send_msg,
+ )
+ else:
+ self.db.del_one(self.topic, filter_q)
+ self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg)
+ self._send_msg("deleted", {"_id": _id}, not_send_msg=not_send_msg)
+ return _id
def edit(self, session, _id, indata=None, kwargs=None, content=None):
"""
if kwargs:
self._update_input_with_kwargs(indata, kwargs)
try:
+ if (
+ self.topic == "k8sinfra_controller"
+ or self.topic == "k8sinfra_config"
+ or self.topic == "k8sapp"
+ or self.topic == "k8sresource"
+ ):
+ check = self.db.get_one(self.topic, {"_id": _id})
+ if check["default"] is True:
+ raise EngineException(
+ "Cannot edit default profiles",
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ if "name" in indata:
+ if check["name"] == indata["name"]:
+ pass
+ else:
+ self.check_unique_name(session, indata["name"])
if indata and session.get("set_project"):
raise EngineException(
"Cannot edit content and set to project (query string SET_PROJECT) at same time",
if op_id:
indata["op_id"] = op_id
indata["_id"] = _id
- self._send_msg("edited", indata)
+ if (
+ self.topic == "k8sinfra_controller"
+ or self.topic == "k8sinfra_config"
+ or self.topic == "k8sapp"
+ or self.topic == "k8sresource"
+ ):
+ pass
+ else:
+ self._send_msg("edited", indata)
return op_id
except ValidationError as e:
raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+
+ def detach(self, session, _id, profile_type):
+ # To detach the profiles from every cluster
+ filter_q = {}
+ existing_clusters = self.db.get_list("clusters", filter_q)
+ existing_clusters_profiles = [
+ profile["_id"]
+ for profile in existing_clusters
+ if profile.get("profile_type", _id)
+ ]
+ update_dict = None
+ for profile in existing_clusters_profiles:
+ filter_q = {"_id": profile}
+ data = self.db.get_one("clusters", filter_q)
+ if profile_type in data:
+ profile_ids = data[profile_type]
+ if _id in profile_ids:
+ profile_ids.remove(_id)
+ update_dict = {profile_type: profile_ids}
+ self.db.set_one("clusters", filter_q, update_dict)
+
+ def _generate_age_key(self):
+ ident = x25519.Identity.generate()
+ # gets the public key
+ pubkey = str(ident.to_public())
+ # gets the private key
+ privkey = str(ident)
+ # return both public and private key
+ return pubkey, privkey