Do not delete when there is other projects using it. Project reference is deleted. Real deletion is done when last project deletes it.
Change-Id: I49363349950ef90837f3e1a7e702e598541cad20
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
fail_on_more=False):
raise EngineException("project '{}' does not exist".format(p), HTTPStatus.CONFLICT)
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check if deletion can be done because of dependencies if it is not force. To override
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: internal _id
+ :param db_content: The database content of this item _id
+ :return: None if ok or raises EngineException with the conflict
+ """
if _id == session["username"]:
raise EngineException("You cannot delete your own user", http_code=HTTPStatus.CONFLICT)
# Removed so that the UUID is kept, to allow Project Name modification
# content["_id"] = content["name"]
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check if deletion can be done because of dependencies if it is not force. To override
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: internal _id
+ :param db_content: The database content of this item _id
+ :return: None if ok or raises EngineException with the conflict
+ """
if _id in session["project_id"]:
raise EngineException("You cannot delete your own project", http_code=HTTPStatus.CONFLICT)
if session["force"]:
raise EngineException("You cannot remove system_admin role from admin user",
http_code=HTTPStatus.FORBIDDEN)
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
"""
Check if deletion can be done because of dependencies if it is not force. To override
-
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: internal _id
+ :param db_content: The database content of this item _id
:return: None if ok or raises EngineException with the conflict
"""
if _id == session["username"]:
:param dry_run: make checking but do not delete
:return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
"""
- self.check_conflict_on_del(session, _id)
+ self.check_conflict_on_del(session, _id, None)
if not dry_run:
v = self.auth.delete_user(_id)
return v
if project in project_list:
raise EngineException("project '{}' exists".format(project), HTTPStatus.CONFLICT)
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
"""
Check if deletion can be done because of dependencies if it is not force. To override
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: internal _id
+ :param db_content: The database content of this item _id
:return: None if ok or raises EngineException with the conflict
"""
projects = self.auth.get_project_list()
:param dry_run: make checking but do not delete
:return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
"""
- self.check_conflict_on_del(session, _id)
+ self.check_conflict_on_del(session, _id, None)
if not dry_run:
v = self.auth.delete_project(_id)
return v
if _id == system_admin_role["_id"]:
raise EngineException("You cannot edit system_admin role", http_code=HTTPStatus.FORBIDDEN)
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
"""
Check if deletion can be done because of dependencies if it is not force. To override
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: internal _id
+ :param db_content: The database content of this item _id
:return: None if ok or raises EngineException with the conflict
"""
roles = self.auth.get_role_list()
:param dry_run: make checking but do not delete
:return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
"""
- self.check_conflict_on_del(session, _id)
- filter_q = self._get_project_filter(session, write=True, show_all=True)
+ self.check_conflict_on_del(session, _id, None)
+ filter_q = self._get_project_filter(session)
filter_q["_id"] = _id
if not dry_run:
self.auth.delete_role(_id)
# Override descriptor with query string kwargs
if kwargs:
- BaseTopic._update_input_with_kwargs(indata, kwargs)
+ self._update_input_with_kwargs(indata, kwargs)
try:
indata = self._validate_input_edit(indata, force=session["force"])
content.pop("_admin", None)
self.msg.write(self.topic_msg, action, content)
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
"""
Check if deletion can be done because of dependencies if it is not force. To override
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: internal _id
+ :param db_content: The database content of this item _id
:return: None if ok or raises EngineException with the conflict
"""
pass
filter_q.update(self._get_project_filter(session))
return self.db.del_list(self.topic, filter_q)
- def delete_extra(self, session, _id):
+ def delete_extra(self, session, _id, db_content):
"""
Delete other things apart from database entry of a item _id.
e.g.: other associated elements at database and other file system storage
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: server internal id
+ :param db_content: The database content of the _id. It is already deleted when reached this method, but the
+ content is needed in same cases
+ :return: None if ok or raises EngineException with the problem
"""
pass
:param dry_run: make checking but do not delete
:return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
"""
+
+ # To allow addressing projects and users by name AS WELL AS by _id
+ filter_q = {BaseTopic.id_field(self.topic, _id): _id}
+ item_content = self.db.get_one(self.topic, filter_q)
+
# TODO add admin to filter, validate rights
# data = self.get_item(topic, _id)
- self.check_conflict_on_del(session, _id)
- filter_q = self._get_project_filter(session)
- # To allow project addressing by name AS WELL AS _id
- filter_q[BaseTopic.id_field(self.topic, _id)] = _id
+ self.check_conflict_on_del(session, _id, item_content)
if dry_run:
return None
+
+ filter_q.update(self._get_project_filter(session))
if self.multiproject and session["project_id"]:
# remove reference from project_read. If not last delete
self.db.set_one(self.topic, filter_q, update_dict=None,
return v
else:
v = self.db.del_one(self.topic, filter_q)
- self.delete_extra(session, _id)
+ self.delete_extra(session, _id, item_content)
self._send_msg("deleted", {"_id": _id})
return v
content["_admin"]["operationalState"] = "DISABLED"
content["_admin"]["usageState"] = "NOT_IN_USE"
- def delete_extra(self, session, _id):
+ def delete_extra(self, session, _id, db_content):
+ """
+ Deletes file system storage associated with the descriptor
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: server internal id
+ :param db_content: The database content of the descriptor
+ :return: None if ok or raises EngineException with the problem
+ """
self.fs.file_delete(_id, ignore_non_exist=True)
self.fs.file_delete(_id + "_", ignore_non_exist=True) # remove temp folder
final_content["_admin"]["type"] = "vnfd"
# if neither vud nor pdu do not fill type
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
"""
Check that there is not any NSD that uses this VNFD. Only NSDs belonging to this project are considered. Note
that VNFD can be public and be used by NSD of other projects. Also check there are not deployments, or vnfr
that uses this vnfd
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
- :param _id: vnfd inernal id
+ :param _id: vnfd internal id
+ :param db_content: The database content of the _id.
:return: None or raises EngineException with the conflict
"""
if session["force"]:
return
- descriptor = self.db.get_one("vnfds", {"_id": _id})
+ descriptor = db_content
descriptor_id = descriptor.get("id")
if not descriptor_id: # empty vnfd not uploaded
return
_filter = self._get_project_filter(session)
+
# check vnfrs using this vnfd
_filter["vnfd-id"] = _id
if self.db.get_list("vnfrs", _filter):
- raise EngineException("There is some VNFR that depends on this VNFD", http_code=HTTPStatus.CONFLICT)
+ raise EngineException("There is at least one VNF using this descriptor", http_code=HTTPStatus.CONFLICT)
+
+ # check NSD referencing this VNFD
del _filter["vnfd-id"]
- # check NSD using this VNFD
_filter["constituent-vnfd.ANYINDEX.vnfd-id-ref"] = descriptor_id
if self.db.get_list("nsds", _filter):
- raise EngineException("There is at least a NSD that depends on this VNFD", http_code=HTTPStatus.CONFLICT)
+ raise EngineException("There is at least one NSD referencing this descriptor",
+ http_code=HTTPStatus.CONFLICT)
def _validate_input_new(self, indata, storage_params, force=False):
indata = self.pyangbind_validation("vnfds", indata, force)
self._check_descriptor_dependencies(session, final_content)
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
"""
Check that there is not any NSR that uses this NSD. Only NSRs belonging to this project are considered. Note
that NSD can be public and be used by other projects.
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
- :param _id: vnfd inernal id
+ :param _id: nsd internal id
+ :param db_content: The database content of the _id
:return: None or raises EngineException with the conflict
"""
if session["force"]:
return
+ descriptor = db_content
+ descriptor_id = descriptor.get("id")
+ if not descriptor_id: # empty nsd not uploaded
+ return
+
+ # check NSD used by NS
_filter = self._get_project_filter(session)
- _filter["nsdId"] = _id
+ _filter["nsd-id"] = _id
if self.db.get_list("nsrs", _filter):
- raise EngineException("There is some NSR that depends on this NSD", http_code=HTTPStatus.CONFLICT)
+ raise EngineException("There is at least one NS using this descriptor", http_code=HTTPStatus.CONFLICT)
+
+ # check NSD referenced by NST
+ del _filter["nsd-id"]
+ _filter["netslice-subnet.ANYINDEX.nsd-ref"] = descriptor_id
+ if self.db.get_list("nsts", _filter):
+ raise EngineException("There is at least one NetSlice Template referencing this descriptor",
+ http_code=HTTPStatus.CONFLICT)
class NstTopic(DescriptorTopic):
self._check_descriptor_dependencies(session, final_content)
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
"""
Check that there is not any NSIR that uses this NST. Only NSIRs belonging to this project are considered. Note
that NST can be public and be used by other projects.
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: nst internal id
+ :param db_content: The database content of the _id.
:return: None or raises EngineException with the conflict
"""
# TODO: Check this method
return
# Get Network Slice Template from Database
_filter = self._get_project_filter(session)
- _filter["_id"] = _id
- nst = self.db.get_one("nsts", _filter)
-
- # Search NSIs using NST via nst-ref
- _filter = self._get_project_filter(session)
- _filter["nst-ref"] = nst["id"]
- nsis_list = self.db.get_list("nsis", _filter)
- for nsi_item in nsis_list:
- if nsi_item["_admin"].get("nsiState") != "TERMINATED":
- raise EngineException("There is some NSIS that depends on this NST", http_code=HTTPStatus.CONFLICT)
+ _filter["nst-id"] = _id
+ if self.db.get_list("nsis", _filter):
+ raise EngineException("there is at least one Netslice Instance using this descriptor",
+ http_code=HTTPStatus.CONFLICT)
class PduTopic(BaseTopic):
content["_admin"]["operationalState"] = "ENABLED"
content["_admin"]["usageState"] = "NOT_IN_USE"
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check that there is not any vnfr that uses this PDU
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: pdu internal id
+ :param db_content: The database content of the _id.
+ :return: None or raises EngineException with the conflict
+ """
if session["force"]:
return
- # TODO Is it needed to check descriptors _admin.project_read/project_write??
- _filter = {"vdur.pdu-id": _id}
+
+ _filter = self._get_project_filter(session)
+ _filter["vdur.pdu-id"] = _id
if self.db.get_list("vnfrs", _filter):
- raise EngineException("There is some NSR that uses this PDU", http_code=HTTPStatus.CONFLICT)
+ raise EngineException("There is at least one VNF using this PDU", http_code=HTTPStatus.CONFLICT)
from copy import copy, deepcopy
from validation import validate_input, ValidationError, ns_instantiate, ns_action, ns_scale, nsi_instantiate
from base_topic import BaseTopic, EngineException, get_iterable
-from descriptor_topics import DescriptorTopic
+# from descriptor_topics import DescriptorTopic
from yaml import safe_dump
from osm_common.dbbase import DbException
from re import match # For checking that additional parameter names are valid Jinja2 identifiers
BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
content["_admin"]["nsState"] = "NOT_INSTANTIATED"
- def check_conflict_on_del(self, session, _id):
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check that NSR is not instantiated
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: nsr internal id
+ :param db_content: The database content of the nsr
+ :return: None or raises EngineException with the conflict
+ """
if session["force"]:
return
- nsr = self.db.get_one("nsrs", {"_id": _id})
+ nsr = db_content
if nsr["_admin"].get("nsState") == "INSTANTIATED":
raise EngineException("nsr '{}' cannot be deleted because it is in 'INSTANTIATED' state. "
"Launch 'terminate' operation first; or force deletion".format(_id),
http_code=HTTPStatus.CONFLICT)
- def delete_extra(self, session, _id):
+ def delete_extra(self, session, _id, db_content):
+ """
+ Deletes associated nslcmops and vnfrs from database. Deletes associated filesystem.
+ Set usageState of pdu, vnfd, nsd
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: server internal id
+ :param db_content: The database content of the descriptor
+ :return: None if ok or raises EngineException with the problem
+ """
self.fs.file_delete(_id, ignore_non_exist=True)
self.db.del_list("nslcmops", {"nsInstanceId": _id})
self.db.del_list("vnfrs", {"nsr-id-ref": _id})
+
# set all used pdus as free
self.db.set_list("pdus", {"_admin.usage.nsr_id": _id},
{"_admin.usageState": "NOT_IN_USE", "_admin.usage": None})
+ # Set NSD usageState
+ nsr = db_content
+ used_nsd_id = nsr.get("nsd-id")
+ if used_nsd_id:
+ # check if used by another NSR
+ nsrs_list = self.db.get_one("nsrs", {"nsd-id": used_nsd_id},
+ fail_on_empty=False, fail_on_more=False)
+ if not nsrs_list:
+ self.db.set_one("nsds", {"_id": used_nsd_id}, {"_admin.usageState": "NOT_IN_USE"})
+
+ # Set VNFD usageState
+ used_vnfd_id_list = nsr.get("vnfd-id")
+ if used_vnfd_id_list:
+ for used_vnfd_id in used_vnfd_id_list:
+ # check if used by another NSR
+ nsrs_list = self.db.get_one("nsrs", {"vnfd-id": used_vnfd_id},
+ fail_on_empty=False, fail_on_more=False)
+ if not nsrs_list:
+ self.db.set_one("vnfds", {"_id": used_vnfd_id}, {"_admin.usageState": "NOT_IN_USE"})
+
@staticmethod
def _format_ns_request(ns_request):
formated_request = copy(ns_request)
self._update_input_with_kwargs(ns_request, kwargs)
self._validate_input_new(ns_request, session["force"])
- step = ""
# look for nsr
step = "getting nsd id='{}' from database".format(ns_request.get("nsdId"))
- _filter = {"_id": ns_request["nsdId"]}
- _filter.update(BaseTopic._get_project_filter(session))
+ _filter = self._get_project_filter(session)
+ _filter["_id"] = ns_request["nsdId"]
nsd = self.db.get_one("nsds", _filter)
+ del _filter["_id"]
nsr_id = str(uuid4())
"operational-events": [], # "id", "timestamp", "description", "event",
"nsd-ref": nsd["id"],
"nsd-id": nsd["_id"],
+ "vnfd-id": [],
"instantiate_params": self._format_ns_request(ns_request),
"additionalParamsForNs": self._format_addional_params(ns_request),
"ns-instance-config-ref": nsr_id,
member_vnf["vnfd-id-ref"], member_vnf["member-vnf-index"])
if vnfd_id not in needed_vnfds:
# Obtain vnfd
- vnfd = DescriptorTopic.get_one_by_id(self.db, session, "vnfds", vnfd_id)
+ _filter["id"] = vnfd_id
+ vnfd = self.db.get_one("vnfds", _filter, fail_on_empty=True, fail_on_more=True)
+ del _filter["id"]
vnfd.pop("_admin")
needed_vnfds[vnfd_id] = vnfd
+ nsr_descriptor["vnfd-id"].append(vnfd["_id"])
else:
vnfd = needed_vnfds[vnfd_id]
step = "filling vnfr vnfd-id='{}' constituent-vnfd='{}'".format(
raise EngineException("Descriptor error at nst-ref='{}' references a non exist nstd".format(nstd_id),
http_code=HTTPStatus.CONFLICT)
- def check_conflict_on_del(self, session, _id, ):
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check that NSI is not instantiated
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: nsi internal id
+ :param db_content: The database content of the _id
+ :return: None or raises EngineException with the conflict
+ """
if session["force"]:
return
- nsi = self.db.get_one("nsis", {"_id": _id})
+ nsi = db_content
if nsi["_admin"].get("nsiState") == "INSTANTIATED":
raise EngineException("nsi '{}' cannot be deleted because it is in 'INSTANTIATED' state. "
"Launch 'terminate' operation first; or force deletion".format(_id),
http_code=HTTPStatus.CONFLICT)
- def delete(self, session, _id, dry_run=False):
+ def delete_extra(self, session, _id, db_content):
"""
- Delete item by its internal _id
+ Deletes associated nsilcmops from database. Deletes associated filesystem.
+ Set usageState of nst
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: server internal id
- :param dry_run: make checking but do not delete
- :return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
+ :param db_content: The database content of the descriptor
+ :return: None if ok or raises EngineException with the problem
"""
- # TODO add admin to filter, validate rights
- BaseTopic.delete(self, session, _id, dry_run=True)
- if dry_run:
- return
# Deleting the nsrs belonging to nsir
- nsir = self.db.get_one("nsis", {"_id": _id})
+ nsir = db_content
for nsrs_detailed_item in nsir["_admin"]["nsrs-detailed-list"]:
nsr_id = nsrs_detailed_item["nsrId"]
if nsrs_detailed_item.get("shared"):
pass
else:
raise
- # deletes NetSlice instance object
- v = self.db.del_one("nsis", {"_id": _id})
- # makes a temporal list of nsilcmops objects related to the _id given and deletes them from db
- _filter = {"netsliceInstanceId": _id}
- self.db.del_list("nsilcmops", _filter)
+ # delete related nsilcmops database entries
+ self.db.del_list("nsilcmops", {"netsliceInstanceId": _id})
- # Search if nst is being used by other nsi
+ # Check and set used NST usage state
nsir_admin = nsir.get("_admin")
- if nsir_admin:
- if nsir_admin.get("nst-id"):
- nsis_list = self.db.get_one("nsis", {"nst-id": nsir_admin["nst-id"]},
- fail_on_empty=False, fail_on_more=False)
- if not nsis_list:
- self.db.set_one("nsts", {"_id": nsir_admin["nst-id"]}, {"_admin.usageState": "NOT_IN_USE"})
- return v
+ if nsir_admin and nsir_admin.get("nst-id"):
+ # check if used by another NSI
+ nsis_list = self.db.get_one("nsis", {"nst-id": nsir_admin["nst-id"]},
+ fail_on_empty=False, fail_on_more=False)
+ if not nsis_list:
+ self.db.set_one("nsts", {"_id": nsir_admin["nst-id"]}, {"_admin.usageState": "NOT_IN_USE"})
+
+ # def delete(self, session, _id, dry_run=False):
+ # """
+ # Delete item by its internal _id
+ # :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ # :param _id: server internal id
+ # :param dry_run: make checking but do not delete
+ # :return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
+ # """
+ # # TODO add admin to filter, validate rights
+ # BaseTopic.delete(self, session, _id, dry_run=True)
+ # if dry_run:
+ # return
+ #
+ # # Deleting the nsrs belonging to nsir
+ # nsir = self.db.get_one("nsis", {"_id": _id})
+ # for nsrs_detailed_item in nsir["_admin"]["nsrs-detailed-list"]:
+ # nsr_id = nsrs_detailed_item["nsrId"]
+ # if nsrs_detailed_item.get("shared"):
+ # _filter = {"_admin.nsrs-detailed-list.ANYINDEX.shared": True,
+ # "_admin.nsrs-detailed-list.ANYINDEX.nsrId": nsr_id,
+ # "_id.ne": nsir["_id"]}
+ # nsi = self.db.get_one("nsis", _filter, fail_on_empty=False, fail_on_more=False)
+ # if nsi: # last one using nsr
+ # continue
+ # try:
+ # self.nsrTopic.delete(session, nsr_id, dry_run=False)
+ # except (DbException, EngineException) as e:
+ # if e.http_code == HTTPStatus.NOT_FOUND:
+ # pass
+ # else:
+ # raise
+ # # deletes NetSlice instance object
+ # v = self.db.del_one("nsis", {"_id": _id})
+ #
+ # # makes a temporal list of nsilcmops objects related to the _id given and deletes them from db
+ # _filter = {"netsliceInstanceId": _id}
+ # self.db.del_list("nsilcmops", _filter)
+ #
+ # # Search if nst is being used by other nsi
+ # nsir_admin = nsir.get("_admin")
+ # if nsir_admin:
+ # if nsir_admin.get("nst-id"):
+ # nsis_list = self.db.get_one("nsis", {"nst-id": nsir_admin["nst-id"]},
+ # fail_on_empty=False, fail_on_more=False)
+ # if not nsis_list:
+ # self.db.set_one("nsts", {"_id": nsir_admin["nst-id"]}, {"_admin.usageState": "NOT_IN_USE"})
+ # return v
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
"""
step = ""
# look for nstd
step = "getting nstd id='{}' from database".format(slice_request.get("nstId"))
- _filter = {"_id": slice_request["nstId"]}
- _filter.update(BaseTopic._get_project_filter(session))
+ _filter = self._get_project_filter(session)
+ _filter["_id"] = slice_request["nstId"]
nstd = self.db.get_one("nsts", _filter)
+ del _filter["_id"]
+
nstd.pop("_admin", None)
nstd_id = nstd.pop("_id", None)
nsi_id = str(uuid4())
member_ns["nsd-ref"], member_ns["id"])
if nsd_id not in needed_nsds:
# Obtain nsd
- nsd = DescriptorTopic.get_one_by_id(self.db, session, "nsds", nsd_id)
+ _filter["id"] = nsd_id
+ nsd = self.db.get_one("nsds", _filter, fail_on_empty=True, fail_on_more=True)
+ del _filter["id"]
nsd.pop("_admin")
needed_nsds[nsd_id] = nsd
else:
_id_nsr = None
indata_ns = {}
# Is the nss shared and instantiated?
- _filter = {"_admin.nsrs-detailed-list.ANYINDEX.shared": True,
- "_admin.nsrs-detailed-list.ANYINDEX.nsd-id": service["nsd-ref"]}
+ _filter["_admin.nsrs-detailed-list.ANYINDEX.shared"] = True
+ _filter["_admin.nsrs-detailed-list.ANYINDEX.nsd-id"] = service["nsd-ref"]
nsi = self.db.get_one("nsis", _filter, fail_on_empty=False, fail_on_more=False)
if nsi and service.get("is-shared-nss"):
validate_input(indata, self.operation_schema[operation])
# get nsi from nsiInstanceId
- _filter = BaseTopic._get_project_filter(session)
+ _filter = self._get_project_filter(session)
_filter["_id"] = nsiInstanceId
nsir = self.db.get_one("nsis", _filter)
+ del _filter["_id"]
# initial checking
if not nsir["_admin"].get("nsiState") or nsir["_admin"]["nsiState"] == "NOT_INSTANTIATED":
for index, nsr_item in enumerate(nsrs_list):
nsi = None
if nsr_item.get("shared"):
- _filter = {"_admin.nsrs-detailed-list.ANYINDEX.shared": True,
- "_admin.nsrs-detailed-list.ANYINDEX.nsrId": nsr_item["nsrId"],
- "_admin.nsrs-detailed-list.ANYINDEX.nslcmop_instantiate.ne": None,
- "_id.ne": nsiInstanceId}
+ _filter["_admin.nsrs-detailed-list.ANYINDEX.shared"] = True,
+ _filter["_admin.nsrs-detailed-list.ANYINDEX.nsrId"] = nsr_item["nsrId"]
+ _filter["_admin.nsrs-detailed-list.ANYINDEX.nslcmop_instantiate.ne"] = None
+ _filter["_id.ne"] = nsiInstanceId
nsi = self.db.get_one("nsis", _filter, fail_on_empty=False, fail_on_more=False)
# looks the first nsi fulfilling the conditions but not being the current NSIR
self.test_name = "CIRROS-SCALE"
self.vnfd_filenames = ("cirros_vnf.tar.gz",)
self.nsd_filename = "cirros_2vnf_ns.tar.gz"
-
- def create_descriptors(self, engine):
- super().create_descriptors(engine)
# Modify VNFD to add scaling and count=2
self.descriptor_edit = {
"vnfd0": {