from http import HTTPStatus
from validation import ValidationError, pdu_new_schema, pdu_edit_schema
from base_topic import BaseTopic, EngineException, get_iterable
+from osm_im.vnfd import vnfd as vnfd_im
+from osm_im.nsd import nsd as nsd_im
+from pyangbind.lib.serialise import pybindJSONDecoder
+import pyangbind.lib.pybindJSON as pybindJSON
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
"future versions", http_code=HTTPStatus.NOT_ACCEPTABLE)
return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), accept_zip
+ def pyangbind_validation(self, item, data, force=False):
+ try:
+ if item == "vnfds":
+ myvnfd = vnfd_im()
+ pybindJSONDecoder.load_ietf_json({'vnfd:vnfd-catalog': {'vnfd': [data]}}, None, None, obj=myvnfd,
+ path_helper=True, skip_unknown=force)
+ out = pybindJSON.dumps(myvnfd, mode="ietf")
+ elif item == "nsds":
+ mynsd = nsd_im()
+ pybindJSONDecoder.load_ietf_json({'nsd:nsd-catalog': {'nsd': [data]}}, None, None, obj=mynsd,
+ path_helper=True, skip_unknown=force)
+ out = pybindJSON.dumps(mynsd, mode="ietf")
+ else:
+ raise EngineException("Not possible to validate '{}' item".format(item),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ desc_out = self._remove_envelop(yaml.safe_load(out))
+ return desc_out
+
+ except Exception as e:
+ raise EngineException("Error in pyangbind validation: {}".format(str(e)),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+
class VnfdTopic(DescriptorTopic):
topic = "vnfds"
clean_indata = clean_indata['vnfd-catalog']
if clean_indata.get('vnfd'):
if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1:
- raise EngineException("'vnfd' must be a list only one element")
+ raise EngineException("'vnfd' must be a list of only one element")
clean_indata = clean_indata['vnfd'][0]
+ elif clean_indata.get('vnfd:vnfd'):
+ if not isinstance(clean_indata['vnfd:vnfd'], list) or len(clean_indata['vnfd:vnfd']) != 1:
+ raise EngineException("'vnfd:vnfd' must be a list of only one element")
+ clean_indata = clean_indata['vnfd:vnfd'][0]
return clean_indata
def check_conflict_on_del(self, session, _id, force=False):
def _validate_input_new(self, indata, force=False):
# TODO validate with pyangbind, serialize
-
+ indata = self.pyangbind_validation("vnfds", indata, force)
# Cross references validation in the descriptor
if not indata.get("mgmt-interface"):
raise EngineException("'mgmt-interface' is a mandatory field and it is not defined",
clean_indata = clean_indata['nsd-catalog']
if clean_indata.get('nsd'):
if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1:
- raise EngineException("'nsd' must be a list only one element")
+ raise EngineException("'nsd' must be a list of only one element")
clean_indata = clean_indata['nsd'][0]
+ elif clean_indata.get('nsd:nsd'):
+ if not isinstance(clean_indata['nsd:nsd'], list) or len(clean_indata['nsd:nsd']) != 1:
+ raise EngineException("'nsd:nsd' must be a list of only one element")
+ clean_indata = clean_indata['nsd:nsd'][0]
return clean_indata
def _validate_input_new(self, indata, force=False):
- # transform constituent-vnfd:member-vnf-index to string
- if indata.get("constituent-vnfd"):
- for constituent_vnfd in indata["constituent-vnfd"]:
- if "member-vnf-index" in constituent_vnfd:
- constituent_vnfd["member-vnf-index"] = str(constituent_vnfd["member-vnf-index"])
# TODO validate with pyangbind, serialize
+ indata = self.pyangbind_validation("nsds", indata, force)
return indata
def _validate_input_edit(self, indata, force=False):