bug 864 fix operation (created/edited) published to kafka
[osm/NBI.git] / osm_nbi / descriptor_topics.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import tarfile
17 import yaml
18 import json
19 # import logging
20 from hashlib import md5
21 from osm_common.dbbase import DbException, deep_update_rfc7396
22 from http import HTTPStatus
23 from osm_nbi.validation import ValidationError, pdu_new_schema, pdu_edit_schema
24 from osm_nbi.base_topic import BaseTopic, EngineException, get_iterable
25 from osm_im.vnfd import vnfd as vnfd_im
26 from osm_im.nsd import nsd as nsd_im
27 from osm_im.nst import nst as nst_im
28 from pyangbind.lib.serialise import pybindJSONDecoder
29 import pyangbind.lib.pybindJSON as pybindJSON
30
31 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
32
33
34 class DescriptorTopic(BaseTopic):
35
36 def __init__(self, db, fs, msg, auth):
37 BaseTopic.__init__(self, db, fs, msg, auth)
38
39 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
40 super().check_conflict_on_edit(session, final_content, edit_content, _id)
41 # 1. validate again with pyangbind
42 # 1.1. remove internal keys
43 internal_keys = {}
44 for k in ("_id", "_admin"):
45 if k in final_content:
46 internal_keys[k] = final_content.pop(k)
47 storage_params = internal_keys["_admin"].get("storage")
48 serialized = self._validate_input_new(final_content, storage_params, session["force"])
49 # 1.2. modify final_content with a serialized version
50 final_content.clear()
51 final_content.update(serialized)
52 # 1.3. restore internal keys
53 for k, v in internal_keys.items():
54 final_content[k] = v
55
56 if session["force"]:
57 return
58 # 2. check that this id is not present
59 if "id" in edit_content:
60 _filter = self._get_project_filter(session)
61 _filter["id"] = final_content["id"]
62 _filter["_id.neq"] = _id
63 if self.db.get_one(self.topic, _filter, fail_on_empty=False):
64 raise EngineException("{} with id '{}' already exists for this project".format(self.topic[:-1],
65 final_content["id"]),
66 HTTPStatus.CONFLICT)
67
68 @staticmethod
69 def format_on_new(content, project_id=None, make_public=False):
70 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
71 content["_admin"]["onboardingState"] = "CREATED"
72 content["_admin"]["operationalState"] = "DISABLED"
73 content["_admin"]["usageState"] = "NOT_IN_USE"
74
75 def delete_extra(self, session, _id, db_content):
76 """
77 Deletes file system storage associated with the descriptor
78 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
79 :param _id: server internal id
80 :param db_content: The database content of the descriptor
81 :return: None if ok or raises EngineException with the problem
82 """
83 self.fs.file_delete(_id, ignore_non_exist=True)
84 self.fs.file_delete(_id + "_", ignore_non_exist=True) # remove temp folder
85
86 @staticmethod
87 def get_one_by_id(db, session, topic, id):
88 # find owned by this project
89 _filter = BaseTopic._get_project_filter(session)
90 _filter["id"] = id
91 desc_list = db.get_list(topic, _filter)
92 if len(desc_list) == 1:
93 return desc_list[0]
94 elif len(desc_list) > 1:
95 raise DbException("Found more than one {} with id='{}' belonging to this project".format(topic[:-1], id),
96 HTTPStatus.CONFLICT)
97
98 # not found any: try to find public
99 _filter = BaseTopic._get_project_filter(session)
100 _filter["id"] = id
101 desc_list = db.get_list(topic, _filter)
102 if not desc_list:
103 raise DbException("Not found any {} with id='{}'".format(topic[:-1], id), HTTPStatus.NOT_FOUND)
104 elif len(desc_list) == 1:
105 return desc_list[0]
106 else:
107 raise DbException("Found more than one public {} with id='{}'; and no one belonging to this project".format(
108 topic[:-1], id), HTTPStatus.CONFLICT)
109
110 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
111 """
112 Creates a new almost empty DISABLED entry into database. Due to SOL005, it does not follow normal procedure.
113 Creating a VNFD or NSD is done in two steps: 1. Creates an empty descriptor (this step) and 2) upload content
114 (self.upload_content)
115 :param rollback: list to append created items at database in case a rollback may to be done
116 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
117 :param indata: data to be inserted
118 :param kwargs: used to override the indata descriptor
119 :param headers: http request headers
120 :return: _id, None: identity of the inserted data; and None as there is not any operation
121 """
122
123 try:
124 # Check Quota
125 self.check_quota(session)
126
127 # _remove_envelop
128 if indata:
129 if "userDefinedData" in indata:
130 indata = indata['userDefinedData']
131
132 # Override descriptor with query string kwargs
133 self._update_input_with_kwargs(indata, kwargs)
134 # uncomment when this method is implemented.
135 # Avoid override in this case as the target is userDefinedData, but not vnfd,nsd descriptors
136 # indata = DescriptorTopic._validate_input_new(self, indata, project_id=session["force"])
137
138 content = {"_admin": {"userDefinedData": indata}}
139 self.format_on_new(content, session["project_id"], make_public=session["public"])
140 _id = self.db.create(self.topic, content)
141 rollback.append({"topic": self.topic, "_id": _id})
142 self._send_msg("created", {"_id": _id})
143 return _id, None
144 except ValidationError as e:
145 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
146
147 def upload_content(self, session, _id, indata, kwargs, headers):
148 """
149 Used for receiving content by chunks (with a transaction_id header and/or gzip file. It will store and extract)
150 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
151 :param _id : the nsd,vnfd is already created, this is the id
152 :param indata: http body request
153 :param kwargs: user query string to override parameters. NOT USED
154 :param headers: http request headers
155 :return: True if package is completely uploaded or False if partial content has been uploded
156 Raise exception on error
157 """
158 # Check that _id exists and it is valid
159 current_desc = self.show(session, _id)
160
161 content_range_text = headers.get("Content-Range")
162 expected_md5 = headers.get("Content-File-MD5")
163 compressed = None
164 content_type = headers.get("Content-Type")
165 if content_type and "application/gzip" in content_type or "application/x-gzip" in content_type or \
166 "application/zip" in content_type:
167 compressed = "gzip"
168 filename = headers.get("Content-Filename")
169 if not filename:
170 filename = "package.tar.gz" if compressed else "package"
171 # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
172 file_pkg = None
173 error_text = ""
174 try:
175 if content_range_text:
176 content_range = content_range_text.replace("-", " ").replace("/", " ").split()
177 if content_range[0] != "bytes": # TODO check x<y not negative < total....
178 raise IndexError()
179 start = int(content_range[1])
180 end = int(content_range[2]) + 1
181 total = int(content_range[3])
182 else:
183 start = 0
184 temp_folder = _id + "_" # all the content is upload here and if ok, it is rename from id_ to is folder
185
186 if start:
187 if not self.fs.file_exists(temp_folder, 'dir'):
188 raise EngineException("invalid Transaction-Id header", HTTPStatus.NOT_FOUND)
189 else:
190 self.fs.file_delete(temp_folder, ignore_non_exist=True)
191 self.fs.mkdir(temp_folder)
192
193 storage = self.fs.get_params()
194 storage["folder"] = _id
195
196 file_path = (temp_folder, filename)
197 if self.fs.file_exists(file_path, 'file'):
198 file_size = self.fs.file_size(file_path)
199 else:
200 file_size = 0
201 if file_size != start:
202 raise EngineException("invalid Content-Range start sequence, expected '{}' but received '{}'".format(
203 file_size, start), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
204 file_pkg = self.fs.file_open(file_path, 'a+b')
205 if isinstance(indata, dict):
206 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
207 file_pkg.write(indata_text.encode(encoding="utf-8"))
208 else:
209 indata_len = 0
210 while True:
211 indata_text = indata.read(4096)
212 indata_len += len(indata_text)
213 if not indata_text:
214 break
215 file_pkg.write(indata_text)
216 if content_range_text:
217 if indata_len != end-start:
218 raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format(
219 start, end-1, indata_len), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
220 if end != total:
221 # TODO update to UPLOADING
222 return False
223
224 # PACKAGE UPLOADED
225 if expected_md5:
226 file_pkg.seek(0, 0)
227 file_md5 = md5()
228 chunk_data = file_pkg.read(1024)
229 while chunk_data:
230 file_md5.update(chunk_data)
231 chunk_data = file_pkg.read(1024)
232 if expected_md5 != file_md5.hexdigest():
233 raise EngineException("Error, MD5 mismatch", HTTPStatus.CONFLICT)
234 file_pkg.seek(0, 0)
235 if compressed == "gzip":
236 tar = tarfile.open(mode='r', fileobj=file_pkg)
237 descriptor_file_name = None
238 for tarinfo in tar:
239 tarname = tarinfo.name
240 tarname_path = tarname.split("/")
241 if not tarname_path[0] or ".." in tarname_path: # if start with "/" means absolute path
242 raise EngineException("Absolute path or '..' are not allowed for package descriptor tar.gz")
243 if len(tarname_path) == 1 and not tarinfo.isdir():
244 raise EngineException("All files must be inside a dir for package descriptor tar.gz")
245 if tarname.endswith(".yaml") or tarname.endswith(".json") or tarname.endswith(".yml"):
246 storage["pkg-dir"] = tarname_path[0]
247 if len(tarname_path) == 2:
248 if descriptor_file_name:
249 raise EngineException(
250 "Found more than one descriptor file at package descriptor tar.gz")
251 descriptor_file_name = tarname
252 if not descriptor_file_name:
253 raise EngineException("Not found any descriptor file at package descriptor tar.gz")
254 storage["descriptor"] = descriptor_file_name
255 storage["zipfile"] = filename
256 self.fs.file_extract(tar, temp_folder)
257 with self.fs.file_open((temp_folder, descriptor_file_name), "r") as descriptor_file:
258 content = descriptor_file.read()
259 else:
260 content = file_pkg.read()
261 storage["descriptor"] = descriptor_file_name = filename
262
263 if descriptor_file_name.endswith(".json"):
264 error_text = "Invalid json format "
265 indata = json.load(content)
266 else:
267 error_text = "Invalid yaml format "
268 indata = yaml.load(content, Loader=yaml.SafeLoader)
269
270 current_desc["_admin"]["storage"] = storage
271 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
272 current_desc["_admin"]["operationalState"] = "ENABLED"
273
274 indata = self._remove_envelop(indata)
275
276 # Override descriptor with query string kwargs
277 if kwargs:
278 self._update_input_with_kwargs(indata, kwargs)
279 # it will call overrides method at VnfdTopic or NsdTopic
280 # indata = self._validate_input_edit(indata, force=session["force"])
281
282 deep_update_rfc7396(current_desc, indata)
283 self.check_conflict_on_edit(session, current_desc, indata, _id=_id)
284 self.db.replace(self.topic, _id, current_desc)
285 self.fs.dir_rename(temp_folder, _id)
286
287 indata["_id"] = _id
288 self._send_msg("edited", indata)
289
290 # TODO if descriptor has changed because kwargs update content and remove cached zip
291 # TODO if zip is not present creates one
292 return True
293
294 except EngineException:
295 raise
296 except IndexError:
297 raise EngineException("invalid Content-Range header format. Expected 'bytes start-end/total'",
298 HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
299 except IOError as e:
300 raise EngineException("invalid upload transaction sequence: '{}'".format(e), HTTPStatus.BAD_REQUEST)
301 except tarfile.ReadError as e:
302 raise EngineException("invalid file content {}".format(e), HTTPStatus.BAD_REQUEST)
303 except (ValueError, yaml.YAMLError) as e:
304 raise EngineException(error_text + str(e))
305 except ValidationError as e:
306 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
307 finally:
308 if file_pkg:
309 file_pkg.close()
310
311 def get_file(self, session, _id, path=None, accept_header=None):
312 """
313 Return the file content of a vnfd or nsd
314 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
315 :param _id: Identity of the vnfd, nsd
316 :param path: artifact path or "$DESCRIPTOR" or None
317 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
318 :return: opened file plus Accept format or raises an exception
319 """
320 accept_text = accept_zip = False
321 if accept_header:
322 if 'text/plain' in accept_header or '*/*' in accept_header:
323 accept_text = True
324 if 'application/zip' in accept_header or '*/*' in accept_header:
325 accept_zip = 'application/zip'
326 elif 'application/gzip' in accept_header:
327 accept_zip = 'application/gzip'
328
329 if not accept_text and not accept_zip:
330 raise EngineException("provide request header 'Accept' with 'application/zip' or 'text/plain'",
331 http_code=HTTPStatus.NOT_ACCEPTABLE)
332
333 content = self.show(session, _id)
334 if content["_admin"]["onboardingState"] != "ONBOARDED":
335 raise EngineException("Cannot get content because this resource is not at 'ONBOARDED' state. "
336 "onboardingState is {}".format(content["_admin"]["onboardingState"]),
337 http_code=HTTPStatus.CONFLICT)
338 storage = content["_admin"]["storage"]
339 if path is not None and path != "$DESCRIPTOR": # artifacts
340 if not storage.get('pkg-dir'):
341 raise EngineException("Packages does not contains artifacts", http_code=HTTPStatus.BAD_REQUEST)
342 if self.fs.file_exists((storage['folder'], storage['pkg-dir'], *path), 'dir'):
343 folder_content = self.fs.dir_ls((storage['folder'], storage['pkg-dir'], *path))
344 return folder_content, "text/plain"
345 # TODO manage folders in http
346 else:
347 return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"),\
348 "application/octet-stream"
349
350 # pkgtype accept ZIP TEXT -> result
351 # manyfiles yes X -> zip
352 # no yes -> error
353 # onefile yes no -> zip
354 # X yes -> text
355
356 if accept_text and (not storage.get('pkg-dir') or path == "$DESCRIPTOR"):
357 return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain"
358 elif storage.get('pkg-dir') and not accept_zip:
359 raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'"
360 "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE)
361 else:
362 if not storage.get('zipfile'):
363 # TODO generate zipfile if not present
364 raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in "
365 "future versions", http_code=HTTPStatus.NOT_ACCEPTABLE)
366 return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), accept_zip
367
368 def pyangbind_validation(self, item, data, force=False):
369 try:
370 if item == "vnfds":
371 myvnfd = vnfd_im()
372 pybindJSONDecoder.load_ietf_json({'vnfd:vnfd-catalog': {'vnfd': [data]}}, None, None, obj=myvnfd,
373 path_helper=True, skip_unknown=force)
374 out = pybindJSON.dumps(myvnfd, mode="ietf")
375 elif item == "nsds":
376 mynsd = nsd_im()
377 pybindJSONDecoder.load_ietf_json({'nsd:nsd-catalog': {'nsd': [data]}}, None, None, obj=mynsd,
378 path_helper=True, skip_unknown=force)
379 out = pybindJSON.dumps(mynsd, mode="ietf")
380 elif item == "nsts":
381 mynst = nst_im()
382 pybindJSONDecoder.load_ietf_json({'nst': [data]}, None, None, obj=mynst,
383 path_helper=True, skip_unknown=force)
384 out = pybindJSON.dumps(mynst, mode="ietf")
385 else:
386 raise EngineException("Not possible to validate '{}' item".format(item),
387 http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
388
389 desc_out = self._remove_envelop(yaml.safe_load(out))
390 return desc_out
391
392 except Exception as e:
393 raise EngineException("Error in pyangbind validation: {}".format(str(e)),
394 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
395
396
397 class VnfdTopic(DescriptorTopic):
398 topic = "vnfds"
399 topic_msg = "vnfd"
400
401 def __init__(self, db, fs, msg, auth):
402 DescriptorTopic.__init__(self, db, fs, msg, auth)
403
404 @staticmethod
405 def _remove_envelop(indata=None):
406 if not indata:
407 return {}
408 clean_indata = indata
409 if clean_indata.get('vnfd:vnfd-catalog'):
410 clean_indata = clean_indata['vnfd:vnfd-catalog']
411 elif clean_indata.get('vnfd-catalog'):
412 clean_indata = clean_indata['vnfd-catalog']
413 if clean_indata.get('vnfd'):
414 if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1:
415 raise EngineException("'vnfd' must be a list of only one element")
416 clean_indata = clean_indata['vnfd'][0]
417 elif clean_indata.get('vnfd:vnfd'):
418 if not isinstance(clean_indata['vnfd:vnfd'], list) or len(clean_indata['vnfd:vnfd']) != 1:
419 raise EngineException("'vnfd:vnfd' must be a list of only one element")
420 clean_indata = clean_indata['vnfd:vnfd'][0]
421 return clean_indata
422
423 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
424 super().check_conflict_on_edit(session, final_content, edit_content, _id)
425
426 # set type of vnfd
427 contains_pdu = False
428 contains_vdu = False
429 for vdu in get_iterable(final_content.get("vdu")):
430 if vdu.get("pdu-type"):
431 contains_pdu = True
432 else:
433 contains_vdu = True
434 if contains_pdu:
435 final_content["_admin"]["type"] = "hnfd" if contains_vdu else "pnfd"
436 elif contains_vdu:
437 final_content["_admin"]["type"] = "vnfd"
438 # if neither vud nor pdu do not fill type
439
440 def check_conflict_on_del(self, session, _id, db_content):
441 """
442 Check that there is not any NSD that uses this VNFD. Only NSDs belonging to this project are considered. Note
443 that VNFD can be public and be used by NSD of other projects. Also check there are not deployments, or vnfr
444 that uses this vnfd
445 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
446 :param _id: vnfd internal id
447 :param db_content: The database content of the _id.
448 :return: None or raises EngineException with the conflict
449 """
450 if session["force"]:
451 return
452 descriptor = db_content
453 descriptor_id = descriptor.get("id")
454 if not descriptor_id: # empty vnfd not uploaded
455 return
456
457 _filter = self._get_project_filter(session)
458
459 # check vnfrs using this vnfd
460 _filter["vnfd-id"] = _id
461 if self.db.get_list("vnfrs", _filter):
462 raise EngineException("There is at least one VNF using this descriptor", http_code=HTTPStatus.CONFLICT)
463
464 # check NSD referencing this VNFD
465 del _filter["vnfd-id"]
466 _filter["constituent-vnfd.ANYINDEX.vnfd-id-ref"] = descriptor_id
467 if self.db.get_list("nsds", _filter):
468 raise EngineException("There is at least one NSD referencing this descriptor",
469 http_code=HTTPStatus.CONFLICT)
470
471 def _validate_input_new(self, indata, storage_params, force=False):
472 indata = self.pyangbind_validation("vnfds", indata, force)
473 # Cross references validation in the descriptor
474 if indata.get("vdu"):
475 if not indata.get("mgmt-interface"):
476 raise EngineException("'mgmt-interface' is a mandatory field and it is not defined",
477 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
478 if indata["mgmt-interface"].get("cp"):
479 for cp in get_iterable(indata.get("connection-point")):
480 if cp["name"] == indata["mgmt-interface"]["cp"]:
481 break
482 else:
483 raise EngineException("mgmt-interface:cp='{}' must match an existing connection-point"
484 .format(indata["mgmt-interface"]["cp"]),
485 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
486
487 for vdu in get_iterable(indata.get("vdu")):
488 for interface in get_iterable(vdu.get("interface")):
489 if interface.get("external-connection-point-ref"):
490 for cp in get_iterable(indata.get("connection-point")):
491 if cp["name"] == interface["external-connection-point-ref"]:
492 break
493 else:
494 raise EngineException("vdu[id='{}']:interface[name='{}']:external-connection-point-ref='{}' "
495 "must match an existing connection-point"
496 .format(vdu["id"], interface["name"],
497 interface["external-connection-point-ref"]),
498 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
499
500 elif interface.get("internal-connection-point-ref"):
501 for internal_cp in get_iterable(vdu.get("internal-connection-point")):
502 if interface["internal-connection-point-ref"] == internal_cp.get("id"):
503 break
504 else:
505 raise EngineException("vdu[id='{}']:interface[name='{}']:internal-connection-point-ref='{}' "
506 "must match an existing vdu:internal-connection-point"
507 .format(vdu["id"], interface["name"],
508 interface["internal-connection-point-ref"]),
509 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
510 # Validate that if descriptor contains charms, artifacts _admin.storage."pkg-dir" is not none
511 if vdu.get("vdu-configuration"):
512 if vdu["vdu-configuration"].get("juju"):
513 if not self._validate_package_folders(storage_params, 'charms'):
514 raise EngineException("Charm defined in vnf[id={}]:vdu[id={}] but not present in "
515 "package".format(indata["id"], vdu["id"]))
516 # Validate that if descriptor contains cloud-init, artifacts _admin.storage."pkg-dir" is not none
517 if vdu.get("cloud-init-file"):
518 if not self._validate_package_folders(storage_params, 'cloud_init', vdu["cloud-init-file"]):
519 raise EngineException("Cloud-init defined in vnf[id={}]:vdu[id={}] but not present in "
520 "package".format(indata["id"], vdu["id"]))
521 # Validate that if descriptor contains charms, artifacts _admin.storage."pkg-dir" is not none
522 if indata.get("vnf-configuration"):
523 if indata["vnf-configuration"].get("juju"):
524 if not self._validate_package_folders(storage_params, 'charms'):
525 raise EngineException("Charm defined in vnf[id={}] but not present in "
526 "package".format(indata["id"]))
527 vld_names = [] # For detection of duplicated VLD names
528 for ivld in get_iterable(indata.get("internal-vld")):
529 # BEGIN Detection of duplicated VLD names
530 ivld_name = ivld["name"]
531 if ivld_name in vld_names:
532 raise EngineException("Duplicated VLD name '{}' in vnfd[id={}]:internal-vld[id={}]"
533 .format(ivld["name"], indata["id"], ivld["id"]),
534 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
535 else:
536 vld_names.append(ivld_name)
537 # END Detection of duplicated VLD names
538 for icp in get_iterable(ivld.get("internal-connection-point")):
539 icp_mark = False
540 for vdu in get_iterable(indata.get("vdu")):
541 for internal_cp in get_iterable(vdu.get("internal-connection-point")):
542 if icp["id-ref"] == internal_cp["id"]:
543 icp_mark = True
544 break
545 if icp_mark:
546 break
547 else:
548 raise EngineException("internal-vld[id='{}']:internal-connection-point='{}' must match an existing "
549 "vdu:internal-connection-point".format(ivld["id"], icp["id-ref"]),
550 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
551 if ivld.get("ip-profile-ref"):
552 for ip_prof in get_iterable(indata.get("ip-profiles")):
553 if ip_prof["name"] == get_iterable(ivld.get("ip-profile-ref")):
554 break
555 else:
556 raise EngineException("internal-vld[id='{}']:ip-profile-ref='{}' does not exist".format(
557 ivld["id"], ivld["ip-profile-ref"]),
558 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
559 for mp in get_iterable(indata.get("monitoring-param")):
560 if mp.get("vdu-monitoring-param"):
561 mp_vmp_mark = False
562 for vdu in get_iterable(indata.get("vdu")):
563 for vmp in get_iterable(vdu.get("monitoring-param")):
564 if vmp["id"] == mp["vdu-monitoring-param"].get("vdu-monitoring-param-ref") and vdu["id"] ==\
565 mp["vdu-monitoring-param"]["vdu-ref"]:
566 mp_vmp_mark = True
567 break
568 if mp_vmp_mark:
569 break
570 else:
571 raise EngineException("monitoring-param:vdu-monitoring-param:vdu-monitoring-param-ref='{}' not "
572 "defined at vdu[id='{}'] or vdu does not exist"
573 .format(mp["vdu-monitoring-param"]["vdu-monitoring-param-ref"],
574 mp["vdu-monitoring-param"]["vdu-ref"]),
575 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
576 elif mp.get("vdu-metric"):
577 mp_vm_mark = False
578 for vdu in get_iterable(indata.get("vdu")):
579 if vdu.get("vdu-configuration"):
580 for metric in get_iterable(vdu["vdu-configuration"].get("metrics")):
581 if metric["name"] == mp["vdu-metric"]["vdu-metric-name-ref"] and vdu["id"] == \
582 mp["vdu-metric"]["vdu-ref"]:
583 mp_vm_mark = True
584 break
585 if mp_vm_mark:
586 break
587 else:
588 raise EngineException("monitoring-param:vdu-metric:vdu-metric-name-ref='{}' not defined at "
589 "vdu[id='{}'] or vdu does not exist"
590 .format(mp["vdu-metric"]["vdu-metric-name-ref"],
591 mp["vdu-metric"]["vdu-ref"]),
592 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
593
594 for sgd in get_iterable(indata.get("scaling-group-descriptor")):
595 for sp in get_iterable(sgd.get("scaling-policy")):
596 for sc in get_iterable(sp.get("scaling-criteria")):
597 for mp in get_iterable(indata.get("monitoring-param")):
598 if mp["id"] == get_iterable(sc.get("vnf-monitoring-param-ref")):
599 break
600 else:
601 raise EngineException("scaling-group-descriptor[name='{}']:scaling-criteria[name='{}']:"
602 "vnf-monitoring-param-ref='{}' not defined in any monitoring-param"
603 .format(sgd["name"], sc["name"], sc["vnf-monitoring-param-ref"]),
604 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
605 for sgd_vdu in get_iterable(sgd.get("vdu")):
606 sgd_vdu_mark = False
607 for vdu in get_iterable(indata.get("vdu")):
608 if vdu["id"] == sgd_vdu["vdu-id-ref"]:
609 sgd_vdu_mark = True
610 break
611 if sgd_vdu_mark:
612 break
613 else:
614 raise EngineException("scaling-group-descriptor[name='{}']:vdu-id-ref={} does not match any vdu"
615 .format(sgd["name"], sgd_vdu["vdu-id-ref"]),
616 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
617 for sca in get_iterable(sgd.get("scaling-config-action")):
618 if not indata.get("vnf-configuration"):
619 raise EngineException("'vnf-configuration' not defined in the descriptor but it is referenced by "
620 "scaling-group-descriptor[name='{}']:scaling-config-action"
621 .format(sgd["name"]),
622 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
623 for primitive in get_iterable(indata["vnf-configuration"].get("config-primitive")):
624 if primitive["name"] == sca["vnf-config-primitive-name-ref"]:
625 break
626 else:
627 raise EngineException("scaling-group-descriptor[name='{}']:scaling-config-action:vnf-config-"
628 "primitive-name-ref='{}' does not match any "
629 "vnf-configuration:config-primitive:name"
630 .format(sgd["name"], sca["vnf-config-primitive-name-ref"]),
631 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
632 return indata
633
634 def _validate_input_edit(self, indata, force=False):
635 # not needed to validate with pyangbind becuase it will be validated at check_conflict_on_edit
636 return indata
637
638 def _validate_package_folders(self, storage_params, folder, file=None):
639 if not storage_params or not storage_params.get("pkg-dir"):
640 return False
641 else:
642 if self.fs.file_exists("{}_".format(storage_params["folder"]), 'dir'):
643 f = "{}_/{}/{}".format(storage_params["folder"], storage_params["pkg-dir"], folder)
644 else:
645 f = "{}/{}/{}".format(storage_params["folder"], storage_params["pkg-dir"], folder)
646 if file:
647 return self.fs.file_exists("{}/{}".format(f, file), 'file')
648 else:
649 if self.fs.file_exists(f, 'dir'):
650 if self.fs.dir_ls(f):
651 return True
652 return False
653
654
655 class NsdTopic(DescriptorTopic):
656 topic = "nsds"
657 topic_msg = "nsd"
658
659 def __init__(self, db, fs, msg, auth):
660 DescriptorTopic.__init__(self, db, fs, msg, auth)
661
662 @staticmethod
663 def _remove_envelop(indata=None):
664 if not indata:
665 return {}
666 clean_indata = indata
667
668 if clean_indata.get('nsd:nsd-catalog'):
669 clean_indata = clean_indata['nsd:nsd-catalog']
670 elif clean_indata.get('nsd-catalog'):
671 clean_indata = clean_indata['nsd-catalog']
672 if clean_indata.get('nsd'):
673 if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1:
674 raise EngineException("'nsd' must be a list of only one element")
675 clean_indata = clean_indata['nsd'][0]
676 elif clean_indata.get('nsd:nsd'):
677 if not isinstance(clean_indata['nsd:nsd'], list) or len(clean_indata['nsd:nsd']) != 1:
678 raise EngineException("'nsd:nsd' must be a list of only one element")
679 clean_indata = clean_indata['nsd:nsd'][0]
680 return clean_indata
681
682 def _validate_input_new(self, indata, storage_params, force=False):
683 indata = self.pyangbind_validation("nsds", indata, force)
684 # Cross references validation in the descriptor
685 # TODO validata that if contains cloud-init-file or charms, have artifacts _admin.storage."pkg-dir" is not none
686 for vld in get_iterable(indata.get("vld")):
687 if vld.get("mgmt-network") and vld.get("ip-profile-ref"):
688 raise EngineException("Error at vld[id='{}']:ip-profile-ref"
689 " You cannot set an ip-profile when mgmt-network is True"
690 .format(vld["id"]), http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
691 for vnfd_cp in get_iterable(vld.get("vnfd-connection-point-ref")):
692 for constituent_vnfd in get_iterable(indata.get("constituent-vnfd")):
693 if vnfd_cp["member-vnf-index-ref"] == constituent_vnfd["member-vnf-index"]:
694 if vnfd_cp.get("vnfd-id-ref") and vnfd_cp["vnfd-id-ref"] != constituent_vnfd["vnfd-id-ref"]:
695 raise EngineException("Error at vld[id='{}']:vnfd-connection-point-ref[vnfd-id-ref='{}'] "
696 "does not match constituent-vnfd[member-vnf-index='{}']:vnfd-id-ref"
697 " '{}'".format(vld["id"], vnfd_cp["vnfd-id-ref"],
698 constituent_vnfd["member-vnf-index"],
699 constituent_vnfd["vnfd-id-ref"]),
700 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
701 break
702 else:
703 raise EngineException("Error at vld[id='{}']:vnfd-connection-point-ref[member-vnf-index-ref='{}'] "
704 "does not match any constituent-vnfd:member-vnf-index"
705 .format(vld["id"], vnfd_cp["member-vnf-index-ref"]),
706 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
707 return indata
708
709 def _validate_input_edit(self, indata, force=False):
710 # not needed to validate with pyangbind becuase it will be validated at check_conflict_on_edit
711 return indata
712
713 def _check_descriptor_dependencies(self, session, descriptor):
714 """
715 Check that the dependent descriptors exist on a new descriptor or edition. Also checks references to vnfd
716 connection points are ok
717 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
718 :param descriptor: descriptor to be inserted or edit
719 :return: None or raises exception
720 """
721 if session["force"]:
722 return
723 member_vnfd_index = {}
724 if descriptor.get("constituent-vnfd") and not session["force"]:
725 for vnf in descriptor["constituent-vnfd"]:
726 vnfd_id = vnf["vnfd-id-ref"]
727 filter_q = self._get_project_filter(session)
728 filter_q["id"] = vnfd_id
729 vnf_list = self.db.get_list("vnfds", filter_q)
730 if not vnf_list:
731 raise EngineException("Descriptor error at 'constituent-vnfd':'vnfd-id-ref'='{}' references a non "
732 "existing vnfd".format(vnfd_id), http_code=HTTPStatus.CONFLICT)
733 # elif len(vnf_list) > 1:
734 # raise EngineException("More than one vnfd found for id='{}'".format(vnfd_id),
735 # http_code=HTTPStatus.CONFLICT)
736 member_vnfd_index[vnf["member-vnf-index"]] = vnf_list[0]
737
738 # Cross references validation in the descriptor and vnfd connection point validation
739 for vld in get_iterable(descriptor.get("vld")):
740 for referenced_vnfd_cp in get_iterable(vld.get("vnfd-connection-point-ref")):
741 # look if this vnfd contains this connection point
742 vnfd = member_vnfd_index.get(referenced_vnfd_cp["member-vnf-index-ref"])
743 if not vnfd:
744 raise EngineException("Error at vld[id='{}']:vnfd-connection-point-ref[member-vnf-index-ref='{}'] "
745 "does not match any constituent-vnfd:member-vnf-index"
746 .format(vld["id"], referenced_vnfd_cp["member-vnf-index-ref"]),
747 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
748 for vnfd_cp in get_iterable(vnfd.get("connection-point")):
749 if referenced_vnfd_cp.get("vnfd-connection-point-ref") == vnfd_cp["name"]:
750 break
751 else:
752 raise EngineException(
753 "Error at vld[id='{}']:vnfd-connection-point-ref[member-vnf-index-ref='{}']:vnfd-"
754 "connection-point-ref='{}' references a non existing conection-point:name inside vnfd '{}'"
755 .format(vld["id"], referenced_vnfd_cp["member-vnf-index-ref"],
756 referenced_vnfd_cp["vnfd-connection-point-ref"], vnfd["id"]),
757 http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
758
759 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
760 super().check_conflict_on_edit(session, final_content, edit_content, _id)
761
762 self._check_descriptor_dependencies(session, final_content)
763
764 def check_conflict_on_del(self, session, _id, db_content):
765 """
766 Check that there is not any NSR that uses this NSD. Only NSRs belonging to this project are considered. Note
767 that NSD can be public and be used by other projects.
768 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
769 :param _id: nsd internal id
770 :param db_content: The database content of the _id
771 :return: None or raises EngineException with the conflict
772 """
773 if session["force"]:
774 return
775 descriptor = db_content
776 descriptor_id = descriptor.get("id")
777 if not descriptor_id: # empty nsd not uploaded
778 return
779
780 # check NSD used by NS
781 _filter = self._get_project_filter(session)
782 _filter["nsd-id"] = _id
783 if self.db.get_list("nsrs", _filter):
784 raise EngineException("There is at least one NS using this descriptor", http_code=HTTPStatus.CONFLICT)
785
786 # check NSD referenced by NST
787 del _filter["nsd-id"]
788 _filter["netslice-subnet.ANYINDEX.nsd-ref"] = descriptor_id
789 if self.db.get_list("nsts", _filter):
790 raise EngineException("There is at least one NetSlice Template referencing this descriptor",
791 http_code=HTTPStatus.CONFLICT)
792
793
794 class NstTopic(DescriptorTopic):
795 topic = "nsts"
796 topic_msg = "nst"
797
798 def __init__(self, db, fs, msg, auth):
799 DescriptorTopic.__init__(self, db, fs, msg, auth)
800
801 @staticmethod
802 def _remove_envelop(indata=None):
803 if not indata:
804 return {}
805 clean_indata = indata
806
807 if clean_indata.get('nst'):
808 if not isinstance(clean_indata['nst'], list) or len(clean_indata['nst']) != 1:
809 raise EngineException("'nst' must be a list only one element")
810 clean_indata = clean_indata['nst'][0]
811 elif clean_indata.get('nst:nst'):
812 if not isinstance(clean_indata['nst:nst'], list) or len(clean_indata['nst:nst']) != 1:
813 raise EngineException("'nst:nst' must be a list only one element")
814 clean_indata = clean_indata['nst:nst'][0]
815 return clean_indata
816
817 def _validate_input_edit(self, indata, force=False):
818 # TODO validate with pyangbind, serialize
819 return indata
820
821 def _validate_input_new(self, indata, storage_params, force=False):
822 indata = self.pyangbind_validation("nsts", indata, force)
823 return indata.copy()
824
825 def _check_descriptor_dependencies(self, session, descriptor):
826 """
827 Check that the dependent descriptors exist on a new descriptor or edition
828 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
829 :param descriptor: descriptor to be inserted or edit
830 :return: None or raises exception
831 """
832 if not descriptor.get("netslice-subnet"):
833 return
834 for nsd in descriptor["netslice-subnet"]:
835 nsd_id = nsd["nsd-ref"]
836 filter_q = self._get_project_filter(session)
837 filter_q["id"] = nsd_id
838 if not self.db.get_list("nsds", filter_q):
839 raise EngineException("Descriptor error at 'netslice-subnet':'nsd-ref'='{}' references a non "
840 "existing nsd".format(nsd_id), http_code=HTTPStatus.CONFLICT)
841
842 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
843 super().check_conflict_on_edit(session, final_content, edit_content, _id)
844
845 self._check_descriptor_dependencies(session, final_content)
846
847 def check_conflict_on_del(self, session, _id, db_content):
848 """
849 Check that there is not any NSIR that uses this NST. Only NSIRs belonging to this project are considered. Note
850 that NST can be public and be used by other projects.
851 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
852 :param _id: nst internal id
853 :param db_content: The database content of the _id.
854 :return: None or raises EngineException with the conflict
855 """
856 # TODO: Check this method
857 if session["force"]:
858 return
859 # Get Network Slice Template from Database
860 _filter = self._get_project_filter(session)
861 _filter["_admin.nst-id"] = _id
862 if self.db.get_list("nsis", _filter):
863 raise EngineException("there is at least one Netslice Instance using this descriptor",
864 http_code=HTTPStatus.CONFLICT)
865
866
867 class PduTopic(BaseTopic):
868 topic = "pdus"
869 topic_msg = "pdu"
870 schema_new = pdu_new_schema
871 schema_edit = pdu_edit_schema
872
873 def __init__(self, db, fs, msg, auth):
874 BaseTopic.__init__(self, db, fs, msg, auth)
875
876 @staticmethod
877 def format_on_new(content, project_id=None, make_public=False):
878 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
879 content["_admin"]["onboardingState"] = "CREATED"
880 content["_admin"]["operationalState"] = "ENABLED"
881 content["_admin"]["usageState"] = "NOT_IN_USE"
882
883 def check_conflict_on_del(self, session, _id, db_content):
884 """
885 Check that there is not any vnfr that uses this PDU
886 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
887 :param _id: pdu internal id
888 :param db_content: The database content of the _id.
889 :return: None or raises EngineException with the conflict
890 """
891 if session["force"]:
892 return
893
894 _filter = self._get_project_filter(session)
895 _filter["vdur.pdu-id"] = _id
896 if self.db.get_list("vnfrs", _filter):
897 raise EngineException("There is at least one VNF using this PDU", http_code=HTTPStatus.CONFLICT)