Bug 2024
[osm/NBI.git] / osm_nbi / descriptor_topics.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import tarfile
17 import yaml
18 import json
19 import copy
20 import os
21 import shutil
22 import functools
23
24 # import logging
25 from deepdiff import DeepDiff
26 from hashlib import md5
27 from osm_common.dbbase import DbException, deep_update_rfc7396
28 from http import HTTPStatus
29 from time import time
30 from uuid import uuid4
31 from re import fullmatch
32 from zipfile import ZipFile
33 from osm_nbi.validation import (
34 ValidationError,
35 pdu_new_schema,
36 pdu_edit_schema,
37 validate_input,
38 vnfpkgop_new_schema,
39 )
40 from osm_nbi.base_topic import BaseTopic, EngineException, get_iterable
41 from osm_im import etsi_nfv_vnfd, etsi_nfv_nsd
42 from osm_im.nst import nst as nst_im
43 from pyangbind.lib.serialise import pybindJSONDecoder
44 import pyangbind.lib.pybindJSON as pybindJSON
45 from osm_nbi import utils
46
47 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
48
49
50 class DescriptorTopic(BaseTopic):
51 def __init__(self, db, fs, msg, auth):
52
53 BaseTopic.__init__(self, db, fs, msg, auth)
54
55 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
56 final_content = super().check_conflict_on_edit(
57 session, final_content, edit_content, _id
58 )
59
60 def _check_unique_id_name(descriptor, position=""):
61 for desc_key, desc_item in descriptor.items():
62 if isinstance(desc_item, list) and desc_item:
63 used_ids = []
64 desc_item_id = None
65 for index, list_item in enumerate(desc_item):
66 if isinstance(list_item, dict):
67 _check_unique_id_name(
68 list_item, "{}.{}[{}]".format(position, desc_key, index)
69 )
70 # Base case
71 if index == 0 and (
72 list_item.get("id") or list_item.get("name")
73 ):
74 desc_item_id = "id" if list_item.get("id") else "name"
75 if desc_item_id and list_item.get(desc_item_id):
76 if list_item[desc_item_id] in used_ids:
77 position = "{}.{}[{}]".format(
78 position, desc_key, index
79 )
80 raise EngineException(
81 "Error: identifier {} '{}' is not unique and repeats at '{}'".format(
82 desc_item_id,
83 list_item[desc_item_id],
84 position,
85 ),
86 HTTPStatus.UNPROCESSABLE_ENTITY,
87 )
88 used_ids.append(list_item[desc_item_id])
89
90 _check_unique_id_name(final_content)
91 # 1. validate again with pyangbind
92 # 1.1. remove internal keys
93 internal_keys = {}
94 for k in ("_id", "_admin"):
95 if k in final_content:
96 internal_keys[k] = final_content.pop(k)
97 storage_params = internal_keys["_admin"].get("storage")
98 serialized = self._validate_input_new(
99 final_content, storage_params, session["force"]
100 )
101
102 # 1.2. modify final_content with a serialized version
103 final_content = copy.deepcopy(serialized)
104 # 1.3. restore internal keys
105 for k, v in internal_keys.items():
106 final_content[k] = v
107 if session["force"]:
108 return final_content
109
110 # 2. check that this id is not present
111 if "id" in edit_content:
112 _filter = self._get_project_filter(session)
113
114 _filter["id"] = final_content["id"]
115 _filter["_id.neq"] = _id
116
117 if self.db.get_one(self.topic, _filter, fail_on_empty=False):
118 raise EngineException(
119 "{} with id '{}' already exists for this project".format(
120 self.topic[:-1], final_content["id"]
121 ),
122 HTTPStatus.CONFLICT,
123 )
124
125 return final_content
126
127 @staticmethod
128 def format_on_new(content, project_id=None, make_public=False):
129 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
130 content["_admin"]["onboardingState"] = "CREATED"
131 content["_admin"]["operationalState"] = "DISABLED"
132 content["_admin"]["usageState"] = "NOT_IN_USE"
133
134 def delete_extra(self, session, _id, db_content, not_send_msg=None):
135 """
136 Deletes file system storage associated with the descriptor
137 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
138 :param _id: server internal id
139 :param db_content: The database content of the descriptor
140 :param not_send_msg: To not send message (False) or store content (list) instead
141 :return: None if ok or raises EngineException with the problem
142 """
143 self.fs.file_delete(_id, ignore_non_exist=True)
144 self.fs.file_delete(_id + "_", ignore_non_exist=True) # remove temp folder
145 # Remove file revisions
146 if "revision" in db_content["_admin"]:
147 revision = db_content["_admin"]["revision"]
148 while revision > 0:
149 self.fs.file_delete(_id + ":" + str(revision), ignore_non_exist=True)
150 revision = revision - 1
151
152
153 @staticmethod
154 def get_one_by_id(db, session, topic, id):
155 # find owned by this project
156 _filter = BaseTopic._get_project_filter(session)
157 _filter["id"] = id
158 desc_list = db.get_list(topic, _filter)
159 if len(desc_list) == 1:
160 return desc_list[0]
161 elif len(desc_list) > 1:
162 raise DbException(
163 "Found more than one {} with id='{}' belonging to this project".format(
164 topic[:-1], id
165 ),
166 HTTPStatus.CONFLICT,
167 )
168
169 # not found any: try to find public
170 _filter = BaseTopic._get_project_filter(session)
171 _filter["id"] = id
172 desc_list = db.get_list(topic, _filter)
173 if not desc_list:
174 raise DbException(
175 "Not found any {} with id='{}'".format(topic[:-1], id),
176 HTTPStatus.NOT_FOUND,
177 )
178 elif len(desc_list) == 1:
179 return desc_list[0]
180 else:
181 raise DbException(
182 "Found more than one public {} with id='{}'; and no one belonging to this project".format(
183 topic[:-1], id
184 ),
185 HTTPStatus.CONFLICT,
186 )
187
188 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
189 """
190 Creates a new almost empty DISABLED entry into database. Due to SOL005, it does not follow normal procedure.
191 Creating a VNFD or NSD is done in two steps: 1. Creates an empty descriptor (this step) and 2) upload content
192 (self.upload_content)
193 :param rollback: list to append created items at database in case a rollback may to be done
194 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
195 :param indata: data to be inserted
196 :param kwargs: used to override the indata descriptor
197 :param headers: http request headers
198 :return: _id, None: identity of the inserted data; and None as there is not any operation
199 """
200
201 # No needed to capture exceptions
202 # Check Quota
203 self.check_quota(session)
204
205 # _remove_envelop
206 if indata:
207 if "userDefinedData" in indata:
208 indata = indata["userDefinedData"]
209
210 # Override descriptor with query string kwargs
211 self._update_input_with_kwargs(indata, kwargs)
212 # uncomment when this method is implemented.
213 # Avoid override in this case as the target is userDefinedData, but not vnfd,nsd descriptors
214 # indata = DescriptorTopic._validate_input_new(self, indata, project_id=session["force"])
215
216 content = {"_admin": {
217 "userDefinedData": indata,
218 "revision": 0
219 }}
220
221 self.format_on_new(
222 content, session["project_id"], make_public=session["public"]
223 )
224 _id = self.db.create(self.topic, content)
225 rollback.append({"topic": self.topic, "_id": _id})
226 self._send_msg("created", {"_id": _id})
227 return _id, None
228
229 def upload_content(self, session, _id, indata, kwargs, headers):
230 """
231 Used for receiving content by chunks (with a transaction_id header and/or gzip file. It will store and extract)
232 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
233 :param _id : the nsd,vnfd is already created, this is the id
234 :param indata: http body request
235 :param kwargs: user query string to override parameters. NOT USED
236 :param headers: http request headers
237 :return: True if package is completely uploaded or False if partial content has been uploded
238 Raise exception on error
239 """
240 # Check that _id exists and it is valid
241 current_desc = self.show(session, _id)
242
243 content_range_text = headers.get("Content-Range")
244 expected_md5 = headers.get("Content-File-MD5")
245 compressed = None
246 content_type = headers.get("Content-Type")
247 if (
248 content_type
249 and "application/gzip" in content_type
250 or "application/x-gzip" in content_type
251 ):
252 compressed = "gzip"
253 if (
254 content_type
255 and "application/zip" in content_type
256 ):
257 compressed = "zip"
258 filename = headers.get("Content-Filename")
259 if not filename and compressed:
260 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
261 elif not filename:
262 filename = "package"
263
264 revision = 1
265 if "revision" in current_desc["_admin"]:
266 revision = current_desc["_admin"]["revision"] + 1
267
268 # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
269 file_pkg = None
270 error_text = ""
271 fs_rollback = []
272
273 try:
274 if content_range_text:
275 content_range = (
276 content_range_text.replace("-", " ").replace("/", " ").split()
277 )
278 if (
279 content_range[0] != "bytes"
280 ): # TODO check x<y not negative < total....
281 raise IndexError()
282 start = int(content_range[1])
283 end = int(content_range[2]) + 1
284 total = int(content_range[3])
285 else:
286 start = 0
287 # Rather than using a temp folder, we will store the package in a folder based on
288 # the current revision.
289 proposed_revision_path = (
290 _id + ":" + str(revision)
291 ) # all the content is upload here and if ok, it is rename from id_ to is folder
292
293 if start:
294 if not self.fs.file_exists(proposed_revision_path, "dir"):
295 raise EngineException(
296 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
297 )
298 else:
299 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
300 self.fs.mkdir(proposed_revision_path)
301 fs_rollback.append(proposed_revision_path)
302
303 storage = self.fs.get_params()
304 storage["folder"] = proposed_revision_path
305
306 file_path = (proposed_revision_path, filename)
307 if self.fs.file_exists(file_path, "file"):
308 file_size = self.fs.file_size(file_path)
309 else:
310 file_size = 0
311 if file_size != start:
312 raise EngineException(
313 "invalid Content-Range start sequence, expected '{}' but received '{}'".format(
314 file_size, start
315 ),
316 HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
317 )
318 file_pkg = self.fs.file_open(file_path, "a+b")
319 if isinstance(indata, dict):
320 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
321 file_pkg.write(indata_text.encode(encoding="utf-8"))
322 else:
323 indata_len = 0
324 while True:
325 indata_text = indata.read(4096)
326 indata_len += len(indata_text)
327 if not indata_text:
328 break
329 file_pkg.write(indata_text)
330 if content_range_text:
331 if indata_len != end - start:
332 raise EngineException(
333 "Mismatch between Content-Range header {}-{} and body length of {}".format(
334 start, end - 1, indata_len
335 ),
336 HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
337 )
338 if end != total:
339 # TODO update to UPLOADING
340 return False
341
342 # PACKAGE UPLOADED
343 if expected_md5:
344 file_pkg.seek(0, 0)
345 file_md5 = md5()
346 chunk_data = file_pkg.read(1024)
347 while chunk_data:
348 file_md5.update(chunk_data)
349 chunk_data = file_pkg.read(1024)
350 if expected_md5 != file_md5.hexdigest():
351 raise EngineException("Error, MD5 mismatch", HTTPStatus.CONFLICT)
352 file_pkg.seek(0, 0)
353 if compressed == "gzip":
354 tar = tarfile.open(mode="r", fileobj=file_pkg)
355 descriptor_file_name = None
356 for tarinfo in tar:
357 tarname = tarinfo.name
358 tarname_path = tarname.split("/")
359 if (
360 not tarname_path[0] or ".." in tarname_path
361 ): # if start with "/" means absolute path
362 raise EngineException(
363 "Absolute path or '..' are not allowed for package descriptor tar.gz"
364 )
365 if len(tarname_path) == 1 and not tarinfo.isdir():
366 raise EngineException(
367 "All files must be inside a dir for package descriptor tar.gz"
368 )
369 if (
370 tarname.endswith(".yaml")
371 or tarname.endswith(".json")
372 or tarname.endswith(".yml")
373 ):
374 storage["pkg-dir"] = tarname_path[0]
375 if len(tarname_path) == 2:
376 if descriptor_file_name:
377 raise EngineException(
378 "Found more than one descriptor file at package descriptor tar.gz"
379 )
380 descriptor_file_name = tarname
381 if not descriptor_file_name:
382 raise EngineException(
383 "Not found any descriptor file at package descriptor tar.gz"
384 )
385 storage["descriptor"] = descriptor_file_name
386 storage["zipfile"] = filename
387 self.fs.file_extract(tar, proposed_revision_path)
388 with self.fs.file_open(
389 (proposed_revision_path, descriptor_file_name), "r"
390 ) as descriptor_file:
391 content = descriptor_file.read()
392 elif compressed == "zip":
393 zipfile = ZipFile(file_pkg)
394 descriptor_file_name = None
395 for package_file in zipfile.infolist():
396 zipfilename = package_file.filename
397 file_path = zipfilename.split("/")
398 if (
399 not file_path[0] or ".." in zipfilename
400 ): # if start with "/" means absolute path
401 raise EngineException(
402 "Absolute path or '..' are not allowed for package descriptor zip"
403 )
404
405 if (
406 (
407 zipfilename.endswith(".yaml")
408 or zipfilename.endswith(".json")
409 or zipfilename.endswith(".yml")
410 ) and (
411 zipfilename.find("/") < 0
412 or zipfilename.find("Definitions") >= 0
413 )
414 ):
415 storage["pkg-dir"] = ""
416 if descriptor_file_name:
417 raise EngineException(
418 "Found more than one descriptor file at package descriptor zip"
419 )
420 descriptor_file_name = zipfilename
421 if not descriptor_file_name:
422 raise EngineException(
423 "Not found any descriptor file at package descriptor zip"
424 )
425 storage["descriptor"] = descriptor_file_name
426 storage["zipfile"] = filename
427 self.fs.file_extract(zipfile, proposed_revision_path)
428
429 with self.fs.file_open(
430 (proposed_revision_path, descriptor_file_name), "r"
431 ) as descriptor_file:
432 content = descriptor_file.read()
433 else:
434 content = file_pkg.read()
435 storage["descriptor"] = descriptor_file_name = filename
436
437 if descriptor_file_name.endswith(".json"):
438 error_text = "Invalid json format "
439 indata = json.load(content)
440 else:
441 error_text = "Invalid yaml format "
442 indata = yaml.load(content, Loader=yaml.SafeLoader)
443
444 # Need to close the file package here so it can be copied from the
445 # revision to the current, unrevisioned record
446 if file_pkg:
447 file_pkg.close()
448 file_pkg = None
449
450 # Fetch both the incoming, proposed revision and the original revision so we
451 # can call a validate method to compare them
452 current_revision_path = _id + "/"
453 self.fs.sync(from_path=current_revision_path)
454 self.fs.sync(from_path=proposed_revision_path)
455
456 if revision > 1:
457 try:
458 self._validate_descriptor_changes(
459 descriptor_file_name,
460 current_revision_path,
461 proposed_revision_path)
462 except Exception as e:
463 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
464 shutil.rmtree(self.fs.path + proposed_revision_path, ignore_errors=True)
465 # Only delete the new revision. We need to keep the original version in place
466 # as it has not been changed.
467 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
468 raise e
469
470
471 indata = self._remove_envelop(indata)
472
473 # Override descriptor with query string kwargs
474 if kwargs:
475 self._update_input_with_kwargs(indata, kwargs)
476
477 current_desc["_admin"]["storage"] = storage
478 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
479 current_desc["_admin"]["operationalState"] = "ENABLED"
480 current_desc["_admin"]["modified"] = time()
481 current_desc["_admin"]["revision"] = revision
482
483 deep_update_rfc7396(current_desc, indata)
484 current_desc = self.check_conflict_on_edit(
485 session, current_desc, indata, _id=_id
486 )
487
488 # Copy the revision to the active package name by its original id
489 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
490 os.rename(self.fs.path + proposed_revision_path, self.fs.path + current_revision_path)
491 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
492 self.fs.mkdir(current_revision_path)
493 self.fs.reverse_sync(from_path=current_revision_path)
494
495 shutil.rmtree(self.fs.path + _id)
496
497 self.db.replace(self.topic, _id, current_desc)
498
499 # Store a copy of the package as a point in time revision
500 revision_desc = dict(current_desc)
501 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
502 self.db.create(self.topic + "_revisions", revision_desc)
503 fs_rollback = []
504
505 indata["_id"] = _id
506 self._send_msg("edited", indata)
507
508 # TODO if descriptor has changed because kwargs update content and remove cached zip
509 # TODO if zip is not present creates one
510 return True
511
512 except EngineException:
513 raise
514 except IndexError:
515 raise EngineException(
516 "invalid Content-Range header format. Expected 'bytes start-end/total'",
517 HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
518 )
519 except IOError as e:
520 raise EngineException(
521 "invalid upload transaction sequence: '{}'".format(e),
522 HTTPStatus.BAD_REQUEST,
523 )
524 except tarfile.ReadError as e:
525 raise EngineException(
526 "invalid file content {}".format(e), HTTPStatus.BAD_REQUEST
527 )
528 except (ValueError, yaml.YAMLError) as e:
529 raise EngineException(error_text + str(e))
530 except ValidationError as e:
531 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
532 finally:
533 if file_pkg:
534 file_pkg.close()
535 for file in fs_rollback:
536 self.fs.file_delete(file, ignore_non_exist=True)
537
538 def get_file(self, session, _id, path=None, accept_header=None):
539 """
540 Return the file content of a vnfd or nsd
541 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
542 :param _id: Identity of the vnfd, nsd
543 :param path: artifact path or "$DESCRIPTOR" or None
544 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
545 :return: opened file plus Accept format or raises an exception
546 """
547 accept_text = accept_zip = False
548 if accept_header:
549 if "text/plain" in accept_header or "*/*" in accept_header:
550 accept_text = True
551 if "application/zip" in accept_header or "*/*" in accept_header:
552 accept_zip = "application/zip"
553 elif "application/gzip" in accept_header:
554 accept_zip = "application/gzip"
555
556 if not accept_text and not accept_zip:
557 raise EngineException(
558 "provide request header 'Accept' with 'application/zip' or 'text/plain'",
559 http_code=HTTPStatus.NOT_ACCEPTABLE,
560 )
561
562 content = self.show(session, _id)
563 if content["_admin"]["onboardingState"] != "ONBOARDED":
564 raise EngineException(
565 "Cannot get content because this resource is not at 'ONBOARDED' state. "
566 "onboardingState is {}".format(content["_admin"]["onboardingState"]),
567 http_code=HTTPStatus.CONFLICT,
568 )
569 storage = content["_admin"]["storage"]
570 if path is not None and path != "$DESCRIPTOR": # artifacts
571 if not storage.get("pkg-dir"):
572 raise EngineException(
573 "Packages does not contains artifacts",
574 http_code=HTTPStatus.BAD_REQUEST,
575 )
576 if self.fs.file_exists(
577 (storage["folder"], storage["pkg-dir"], *path), "dir"
578 ):
579 folder_content = self.fs.dir_ls(
580 (storage["folder"], storage["pkg-dir"], *path)
581 )
582 return folder_content, "text/plain"
583 # TODO manage folders in http
584 else:
585 return (
586 self.fs.file_open(
587 (storage["folder"], storage["pkg-dir"], *path), "rb"
588 ),
589 "application/octet-stream",
590 )
591
592 # pkgtype accept ZIP TEXT -> result
593 # manyfiles yes X -> zip
594 # no yes -> error
595 # onefile yes no -> zip
596 # X yes -> text
597 contain_many_files = False
598 if storage.get("pkg-dir"):
599 # check if there are more than one file in the package, ignoring checksums.txt.
600 pkg_files = self.fs.dir_ls((storage["folder"], storage["pkg-dir"]))
601 if len(pkg_files) >= 3 or (
602 len(pkg_files) == 2 and "checksums.txt" not in pkg_files
603 ):
604 contain_many_files = True
605 if accept_text and (not contain_many_files or path == "$DESCRIPTOR"):
606 return (
607 self.fs.file_open((storage["folder"], storage["descriptor"]), "r"),
608 "text/plain",
609 )
610 elif contain_many_files and not accept_zip:
611 raise EngineException(
612 "Packages that contains several files need to be retrieved with 'application/zip'"
613 "Accept header",
614 http_code=HTTPStatus.NOT_ACCEPTABLE,
615 )
616 else:
617 if not storage.get("zipfile"):
618 # TODO generate zipfile if not present
619 raise EngineException(
620 "Only allowed 'text/plain' Accept header for this descriptor. To be solved in "
621 "future versions",
622 http_code=HTTPStatus.NOT_ACCEPTABLE,
623 )
624 return (
625 self.fs.file_open((storage["folder"], storage["zipfile"]), "rb"),
626 accept_zip,
627 )
628
629 def _remove_yang_prefixes_from_descriptor(self, descriptor):
630 new_descriptor = {}
631 for k, v in descriptor.items():
632 new_v = v
633 if isinstance(v, dict):
634 new_v = self._remove_yang_prefixes_from_descriptor(v)
635 elif isinstance(v, list):
636 new_v = list()
637 for x in v:
638 if isinstance(x, dict):
639 new_v.append(self._remove_yang_prefixes_from_descriptor(x))
640 else:
641 new_v.append(x)
642 new_descriptor[k.split(":")[-1]] = new_v
643 return new_descriptor
644
645 def pyangbind_validation(self, item, data, force=False):
646 raise EngineException(
647 "Not possible to validate '{}' item".format(item),
648 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
649 )
650
651 def _validate_input_edit(self, indata, content, force=False):
652 # not needed to validate with pyangbind becuase it will be validated at check_conflict_on_edit
653 if "_id" in indata:
654 indata.pop("_id")
655 if "_admin" not in indata:
656 indata["_admin"] = {}
657
658 if "operationalState" in indata:
659 if indata["operationalState"] in ("ENABLED", "DISABLED"):
660 indata["_admin"]["operationalState"] = indata.pop("operationalState")
661 else:
662 raise EngineException(
663 "State '{}' is not a valid operational state".format(
664 indata["operationalState"]
665 ),
666 http_code=HTTPStatus.BAD_REQUEST,
667 )
668
669 # In the case of user defined data, we need to put the data in the root of the object
670 # to preserve current expected behaviour
671 if "userDefinedData" in indata:
672 data = indata.pop("userDefinedData")
673 if type(data) == dict:
674 indata["_admin"]["userDefinedData"] = data
675 else:
676 raise EngineException(
677 "userDefinedData should be an object, but is '{}' instead".format(
678 type(data)
679 ),
680 http_code=HTTPStatus.BAD_REQUEST,
681 )
682
683 if (
684 "operationalState" in indata["_admin"]
685 and content["_admin"]["operationalState"]
686 == indata["_admin"]["operationalState"]
687 ):
688 raise EngineException(
689 "operationalState already {}".format(
690 content["_admin"]["operationalState"]
691 ),
692 http_code=HTTPStatus.CONFLICT,
693 )
694
695 return indata
696
697 def _validate_descriptor_changes(self,
698 descriptor_file_name,
699 old_descriptor_directory,
700 new_descriptor_directory):
701 # Todo: compare changes and throw a meaningful exception for the user to understand
702 # Example:
703 # raise EngineException(
704 # "Error in validating new descriptor: <NODE> cannot be modified",
705 # http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
706 # )
707 pass
708
709 class VnfdTopic(DescriptorTopic):
710 topic = "vnfds"
711 topic_msg = "vnfd"
712
713 def __init__(self, db, fs, msg, auth):
714 DescriptorTopic.__init__(self, db, fs, msg, auth)
715
716 def pyangbind_validation(self, item, data, force=False):
717 if self._descriptor_data_is_in_old_format(data):
718 raise EngineException(
719 "ERROR: Unsupported descriptor format. Please, use an ETSI SOL006 descriptor.",
720 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
721 )
722 try:
723 myvnfd = etsi_nfv_vnfd.etsi_nfv_vnfd()
724 pybindJSONDecoder.load_ietf_json(
725 {"etsi-nfv-vnfd:vnfd": data},
726 None,
727 None,
728 obj=myvnfd,
729 path_helper=True,
730 skip_unknown=force,
731 )
732 out = pybindJSON.dumps(myvnfd, mode="ietf")
733 desc_out = self._remove_envelop(yaml.safe_load(out))
734 desc_out = self._remove_yang_prefixes_from_descriptor(desc_out)
735 return utils.deep_update_dict(data, desc_out)
736 except Exception as e:
737 raise EngineException(
738 "Error in pyangbind validation: {}".format(str(e)),
739 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
740 )
741
742 @staticmethod
743 def _descriptor_data_is_in_old_format(data):
744 return ("vnfd-catalog" in data) or ("vnfd:vnfd-catalog" in data)
745
746 @staticmethod
747 def _remove_envelop(indata=None):
748 if not indata:
749 return {}
750 clean_indata = indata
751
752 if clean_indata.get("etsi-nfv-vnfd:vnfd"):
753 if not isinstance(clean_indata["etsi-nfv-vnfd:vnfd"], dict):
754 raise EngineException("'etsi-nfv-vnfd:vnfd' must be a dict")
755 clean_indata = clean_indata["etsi-nfv-vnfd:vnfd"]
756 elif clean_indata.get("vnfd"):
757 if not isinstance(clean_indata["vnfd"], dict):
758 raise EngineException("'vnfd' must be dict")
759 clean_indata = clean_indata["vnfd"]
760
761 return clean_indata
762
763 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
764 final_content = super().check_conflict_on_edit(
765 session, final_content, edit_content, _id
766 )
767
768 # set type of vnfd
769 contains_pdu = False
770 contains_vdu = False
771 for vdu in get_iterable(final_content.get("vdu")):
772 if vdu.get("pdu-type"):
773 contains_pdu = True
774 else:
775 contains_vdu = True
776 if contains_pdu:
777 final_content["_admin"]["type"] = "hnfd" if contains_vdu else "pnfd"
778 elif contains_vdu:
779 final_content["_admin"]["type"] = "vnfd"
780 # if neither vud nor pdu do not fill type
781 return final_content
782
783 def check_conflict_on_del(self, session, _id, db_content):
784 """
785 Check that there is not any NSD that uses this VNFD. Only NSDs belonging to this project are considered. Note
786 that VNFD can be public and be used by NSD of other projects. Also check there are not deployments, or vnfr
787 that uses this vnfd
788 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
789 :param _id: vnfd internal id
790 :param db_content: The database content of the _id.
791 :return: None or raises EngineException with the conflict
792 """
793 if session["force"]:
794 return
795 descriptor = db_content
796 descriptor_id = descriptor.get("id")
797 if not descriptor_id: # empty vnfd not uploaded
798 return
799
800 _filter = self._get_project_filter(session)
801
802 # check vnfrs using this vnfd
803 _filter["vnfd-id"] = _id
804 if self.db.get_list("vnfrs", _filter):
805 raise EngineException(
806 "There is at least one VNF instance using this descriptor",
807 http_code=HTTPStatus.CONFLICT,
808 )
809
810 # check NSD referencing this VNFD
811 del _filter["vnfd-id"]
812 _filter["vnfd-id"] = descriptor_id
813 if self.db.get_list("nsds", _filter):
814 raise EngineException(
815 "There is at least one NS package referencing this descriptor",
816 http_code=HTTPStatus.CONFLICT,
817 )
818
819 def _validate_input_new(self, indata, storage_params, force=False):
820 indata.pop("onboardingState", None)
821 indata.pop("operationalState", None)
822 indata.pop("usageState", None)
823 indata.pop("links", None)
824
825 indata = self.pyangbind_validation("vnfds", indata, force)
826 # Cross references validation in the descriptor
827
828 self.validate_mgmt_interface_connection_point(indata)
829
830 for vdu in get_iterable(indata.get("vdu")):
831 self.validate_vdu_internal_connection_points(vdu)
832 self._validate_vdu_cloud_init_in_package(storage_params, vdu, indata)
833 self._validate_vdu_charms_in_package(storage_params, indata)
834
835 self._validate_vnf_charms_in_package(storage_params, indata)
836
837 self.validate_external_connection_points(indata)
838 self.validate_internal_virtual_links(indata)
839 self.validate_monitoring_params(indata)
840 self.validate_scaling_group_descriptor(indata)
841
842 return indata
843
844 @staticmethod
845 def validate_mgmt_interface_connection_point(indata):
846 if not indata.get("vdu"):
847 return
848 if not indata.get("mgmt-cp"):
849 raise EngineException(
850 "'mgmt-cp' is a mandatory field and it is not defined",
851 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
852 )
853
854 for cp in get_iterable(indata.get("ext-cpd")):
855 if cp["id"] == indata["mgmt-cp"]:
856 break
857 else:
858 raise EngineException(
859 "mgmt-cp='{}' must match an existing ext-cpd".format(indata["mgmt-cp"]),
860 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
861 )
862
863 @staticmethod
864 def validate_vdu_internal_connection_points(vdu):
865 int_cpds = set()
866 for cpd in get_iterable(vdu.get("int-cpd")):
867 cpd_id = cpd.get("id")
868 if cpd_id and cpd_id in int_cpds:
869 raise EngineException(
870 "vdu[id='{}']:int-cpd[id='{}'] is already used by other int-cpd".format(
871 vdu["id"], cpd_id
872 ),
873 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
874 )
875 int_cpds.add(cpd_id)
876
877 @staticmethod
878 def validate_external_connection_points(indata):
879 all_vdus_int_cpds = set()
880 for vdu in get_iterable(indata.get("vdu")):
881 for int_cpd in get_iterable(vdu.get("int-cpd")):
882 all_vdus_int_cpds.add((vdu.get("id"), int_cpd.get("id")))
883
884 ext_cpds = set()
885 for cpd in get_iterable(indata.get("ext-cpd")):
886 cpd_id = cpd.get("id")
887 if cpd_id and cpd_id in ext_cpds:
888 raise EngineException(
889 "ext-cpd[id='{}'] is already used by other ext-cpd".format(cpd_id),
890 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
891 )
892 ext_cpds.add(cpd_id)
893
894 int_cpd = cpd.get("int-cpd")
895 if int_cpd:
896 if (int_cpd.get("vdu-id"), int_cpd.get("cpd")) not in all_vdus_int_cpds:
897 raise EngineException(
898 "ext-cpd[id='{}']:int-cpd must match an existing vdu int-cpd".format(
899 cpd_id
900 ),
901 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
902 )
903 # TODO: Validate k8s-cluster-net points to a valid k8s-cluster:nets ?
904
905 def _validate_vdu_charms_in_package(self, storage_params, indata):
906 for df in indata["df"]:
907 if (
908 "lcm-operations-configuration" in df
909 and "operate-vnf-op-config" in df["lcm-operations-configuration"]
910 ):
911 configs = df["lcm-operations-configuration"][
912 "operate-vnf-op-config"
913 ].get("day1-2", [])
914 vdus = df.get("vdu-profile", [])
915 for vdu in vdus:
916 for config in configs:
917 if config["id"] == vdu["id"] and utils.find_in_list(
918 config.get("execution-environment-list", []),
919 lambda ee: "juju" in ee,
920 ):
921 if not self._validate_package_folders(
922 storage_params, "charms"
923 ) and not self._validate_package_folders(
924 storage_params, "Scripts/charms"
925 ):
926 raise EngineException(
927 "Charm defined in vnf[id={}] but not present in "
928 "package".format(indata["id"])
929 )
930
931 def _validate_vdu_cloud_init_in_package(self, storage_params, vdu, indata):
932 if not vdu.get("cloud-init-file"):
933 return
934 if not self._validate_package_folders(
935 storage_params, "cloud_init", vdu["cloud-init-file"]
936 ) and not self._validate_package_folders(
937 storage_params, "Scripts/cloud_init", vdu["cloud-init-file"]
938 ):
939 raise EngineException(
940 "Cloud-init defined in vnf[id={}]:vdu[id={}] but not present in "
941 "package".format(indata["id"], vdu["id"])
942 )
943
944 def _validate_vnf_charms_in_package(self, storage_params, indata):
945 # Get VNF configuration through new container
946 for deployment_flavor in indata.get("df", []):
947 if "lcm-operations-configuration" not in deployment_flavor:
948 return
949 if (
950 "operate-vnf-op-config"
951 not in deployment_flavor["lcm-operations-configuration"]
952 ):
953 return
954 for day_1_2_config in deployment_flavor["lcm-operations-configuration"][
955 "operate-vnf-op-config"
956 ]["day1-2"]:
957 if day_1_2_config["id"] == indata["id"]:
958 if utils.find_in_list(
959 day_1_2_config.get("execution-environment-list", []),
960 lambda ee: "juju" in ee,
961 ):
962 if not self._validate_package_folders(
963 storage_params, "charms"
964 ) and not self._validate_package_folders(
965 storage_params, "Scripts/charms"
966 ):
967 raise EngineException(
968 "Charm defined in vnf[id={}] but not present in "
969 "package".format(indata["id"])
970 )
971
972 def _validate_package_folders(self, storage_params, folder, file=None):
973 if not storage_params:
974 return False
975 elif not storage_params.get("pkg-dir"):
976 if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):
977 f = "{}_/{}".format(
978 storage_params["folder"], folder
979 )
980 else:
981 f = "{}/{}".format(
982 storage_params["folder"], folder
983 )
984 if file:
985 return self.fs.file_exists("{}/{}".format(f, file), "file")
986 else:
987 if self.fs.file_exists(f, "dir"):
988 if self.fs.dir_ls(f):
989 return True
990 return False
991 else:
992 if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):
993 f = "{}_/{}/{}".format(
994 storage_params["folder"], storage_params["pkg-dir"], folder
995 )
996 else:
997 f = "{}/{}/{}".format(
998 storage_params["folder"], storage_params["pkg-dir"], folder
999 )
1000 if file:
1001 return self.fs.file_exists("{}/{}".format(f, file), "file")
1002 else:
1003 if self.fs.file_exists(f, "dir"):
1004 if self.fs.dir_ls(f):
1005 return True
1006 return False
1007
1008 @staticmethod
1009 def validate_internal_virtual_links(indata):
1010 all_ivld_ids = set()
1011 for ivld in get_iterable(indata.get("int-virtual-link-desc")):
1012 ivld_id = ivld.get("id")
1013 if ivld_id and ivld_id in all_ivld_ids:
1014 raise EngineException(
1015 "Duplicated VLD id in int-virtual-link-desc[id={}]".format(ivld_id),
1016 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1017 )
1018 else:
1019 all_ivld_ids.add(ivld_id)
1020
1021 for vdu in get_iterable(indata.get("vdu")):
1022 for int_cpd in get_iterable(vdu.get("int-cpd")):
1023 int_cpd_ivld_id = int_cpd.get("int-virtual-link-desc")
1024 if int_cpd_ivld_id and int_cpd_ivld_id not in all_ivld_ids:
1025 raise EngineException(
1026 "vdu[id='{}']:int-cpd[id='{}']:int-virtual-link-desc='{}' must match an existing "
1027 "int-virtual-link-desc".format(
1028 vdu["id"], int_cpd["id"], int_cpd_ivld_id
1029 ),
1030 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1031 )
1032
1033 for df in get_iterable(indata.get("df")):
1034 for vlp in get_iterable(df.get("virtual-link-profile")):
1035 vlp_ivld_id = vlp.get("id")
1036 if vlp_ivld_id and vlp_ivld_id not in all_ivld_ids:
1037 raise EngineException(
1038 "df[id='{}']:virtual-link-profile='{}' must match an existing "
1039 "int-virtual-link-desc".format(df["id"], vlp_ivld_id),
1040 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1041 )
1042
1043 @staticmethod
1044 def validate_monitoring_params(indata):
1045 all_monitoring_params = set()
1046 for ivld in get_iterable(indata.get("int-virtual-link-desc")):
1047 for mp in get_iterable(ivld.get("monitoring-parameters")):
1048 mp_id = mp.get("id")
1049 if mp_id and mp_id in all_monitoring_params:
1050 raise EngineException(
1051 "Duplicated monitoring-parameter id in "
1052 "int-virtual-link-desc[id='{}']:monitoring-parameters[id='{}']".format(
1053 ivld["id"], mp_id
1054 ),
1055 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1056 )
1057 else:
1058 all_monitoring_params.add(mp_id)
1059
1060 for vdu in get_iterable(indata.get("vdu")):
1061 for mp in get_iterable(vdu.get("monitoring-parameter")):
1062 mp_id = mp.get("id")
1063 if mp_id and mp_id in all_monitoring_params:
1064 raise EngineException(
1065 "Duplicated monitoring-parameter id in "
1066 "vdu[id='{}']:monitoring-parameter[id='{}']".format(
1067 vdu["id"], mp_id
1068 ),
1069 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1070 )
1071 else:
1072 all_monitoring_params.add(mp_id)
1073
1074 for df in get_iterable(indata.get("df")):
1075 for mp in get_iterable(df.get("monitoring-parameter")):
1076 mp_id = mp.get("id")
1077 if mp_id and mp_id in all_monitoring_params:
1078 raise EngineException(
1079 "Duplicated monitoring-parameter id in "
1080 "df[id='{}']:monitoring-parameter[id='{}']".format(
1081 df["id"], mp_id
1082 ),
1083 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1084 )
1085 else:
1086 all_monitoring_params.add(mp_id)
1087
1088 @staticmethod
1089 def validate_scaling_group_descriptor(indata):
1090 all_monitoring_params = set()
1091 for ivld in get_iterable(indata.get("int-virtual-link-desc")):
1092 for mp in get_iterable(ivld.get("monitoring-parameters")):
1093 all_monitoring_params.add(mp.get("id"))
1094
1095 for vdu in get_iterable(indata.get("vdu")):
1096 for mp in get_iterable(vdu.get("monitoring-parameter")):
1097 all_monitoring_params.add(mp.get("id"))
1098
1099 for df in get_iterable(indata.get("df")):
1100 for mp in get_iterable(df.get("monitoring-parameter")):
1101 all_monitoring_params.add(mp.get("id"))
1102
1103 for df in get_iterable(indata.get("df")):
1104 for sa in get_iterable(df.get("scaling-aspect")):
1105 for sp in get_iterable(sa.get("scaling-policy")):
1106 for sc in get_iterable(sp.get("scaling-criteria")):
1107 sc_monitoring_param = sc.get("vnf-monitoring-param-ref")
1108 if (
1109 sc_monitoring_param
1110 and sc_monitoring_param not in all_monitoring_params
1111 ):
1112 raise EngineException(
1113 "df[id='{}']:scaling-aspect[id='{}']:scaling-policy"
1114 "[name='{}']:scaling-criteria[name='{}']: "
1115 "vnf-monitoring-param-ref='{}' not defined in any monitoring-param".format(
1116 df["id"],
1117 sa["id"],
1118 sp["name"],
1119 sc["name"],
1120 sc_monitoring_param,
1121 ),
1122 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1123 )
1124
1125 for sca in get_iterable(sa.get("scaling-config-action")):
1126 if (
1127 "lcm-operations-configuration" not in df
1128 or "operate-vnf-op-config"
1129 not in df["lcm-operations-configuration"]
1130 or not utils.find_in_list(
1131 df["lcm-operations-configuration"][
1132 "operate-vnf-op-config"
1133 ].get("day1-2", []),
1134 lambda config: config["id"] == indata["id"],
1135 )
1136 ):
1137 raise EngineException(
1138 "'day1-2 configuration' not defined in the descriptor but it is "
1139 "referenced by df[id='{}']:scaling-aspect[id='{}']:scaling-config-action".format(
1140 df["id"], sa["id"]
1141 ),
1142 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1143 )
1144 for configuration in get_iterable(
1145 df["lcm-operations-configuration"]["operate-vnf-op-config"].get(
1146 "day1-2", []
1147 )
1148 ):
1149 for primitive in get_iterable(
1150 configuration.get("config-primitive")
1151 ):
1152 if (
1153 primitive["name"]
1154 == sca["vnf-config-primitive-name-ref"]
1155 ):
1156 break
1157 else:
1158 raise EngineException(
1159 "df[id='{}']:scaling-aspect[id='{}']:scaling-config-action:vnf-"
1160 "config-primitive-name-ref='{}' does not match any "
1161 "day1-2 configuration:config-primitive:name".format(
1162 df["id"],
1163 sa["id"],
1164 sca["vnf-config-primitive-name-ref"],
1165 ),
1166 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1167 )
1168
1169 def delete_extra(self, session, _id, db_content, not_send_msg=None):
1170 """
1171 Deletes associate file system storage (via super)
1172 Deletes associated vnfpkgops from database.
1173 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
1174 :param _id: server internal id
1175 :param db_content: The database content of the descriptor
1176 :return: None
1177 :raises: FsException in case of error while deleting associated storage
1178 """
1179 super().delete_extra(session, _id, db_content, not_send_msg)
1180 self.db.del_list("vnfpkgops", {"vnfPkgId": _id})
1181 self.db.del_list(self.topic+"_revisions", {"_id": {"$regex": _id}})
1182
1183 def sol005_projection(self, data):
1184 data["onboardingState"] = data["_admin"]["onboardingState"]
1185 data["operationalState"] = data["_admin"]["operationalState"]
1186 data["usageState"] = data["_admin"]["usageState"]
1187
1188 links = {}
1189 links["self"] = {"href": "/vnfpkgm/v1/vnf_packages/{}".format(data["_id"])}
1190 links["vnfd"] = {"href": "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(data["_id"])}
1191 links["packageContent"] = {
1192 "href": "/vnfpkgm/v1/vnf_packages/{}/package_content".format(data["_id"])
1193 }
1194 data["_links"] = links
1195
1196 return super().sol005_projection(data)
1197
1198 @staticmethod
1199 def find_software_version(vnfd: dict) -> str:
1200 """Find the sotware version in the VNFD descriptors
1201
1202 Args:
1203 vnfd (dict): Descriptor as a dictionary
1204
1205 Returns:
1206 software-version (str)
1207 """
1208 default_sw_version = "1.0"
1209 if vnfd.get("vnfd"):
1210 vnfd = vnfd["vnfd"]
1211 if vnfd.get("software-version"):
1212 return vnfd["software-version"]
1213 else:
1214 return default_sw_version
1215
1216 @staticmethod
1217 def extract_policies(vnfd: dict) -> dict:
1218 """Removes the policies from the VNFD descriptors
1219
1220 Args:
1221 vnfd (dict): Descriptor as a dictionary
1222
1223 Returns:
1224 vnfd (dict): VNFD which does not include policies
1225 """
1226 # TODO: Extract the policy related parts from the VNFD
1227 return vnfd
1228
1229 @staticmethod
1230 def extract_day12_primitives(vnfd: dict) -> dict:
1231 """Removes the day12 primitives from the VNFD descriptors
1232
1233 Args:
1234 vnfd (dict): Descriptor as a dictionary
1235
1236 Returns:
1237 vnfd (dict)
1238 """
1239 for df_id, df in enumerate(vnfd.get("df", {})):
1240 if (
1241 df.get("lcm-operations-configuration", {})
1242 .get("operate-vnf-op-config", {})
1243 .get("day1-2")
1244 ):
1245 day12 = df["lcm-operations-configuration"]["operate-vnf-op-config"].get(
1246 "day1-2"
1247 )
1248 for config_id, config in enumerate(day12):
1249 for key in [
1250 "initial-config-primitive",
1251 "config-primitive",
1252 "terminate-config-primitive",
1253 ]:
1254 config.pop(key, None)
1255 day12[config_id] = config
1256 df["lcm-operations-configuration"]["operate-vnf-op-config"][
1257 "day1-2"
1258 ] = day12
1259 vnfd["df"][df_id] = df
1260 return vnfd
1261
1262 def remove_modifiable_items(self, vnfd: dict) -> dict:
1263 """Removes the modifiable parts from the VNFD descriptors
1264
1265 It calls different extract functions according to different update types
1266 to clear all the modifiable items from VNFD
1267
1268 Args:
1269 vnfd (dict): Descriptor as a dictionary
1270
1271 Returns:
1272 vnfd (dict): Descriptor which does not include modifiable contents
1273 """
1274 if vnfd.get("vnfd"):
1275 vnfd = vnfd["vnfd"]
1276 vnfd.pop("_admin", None)
1277 # If the other extractions need to be done from VNFD,
1278 # the new extract methods could be appended to below list.
1279 for extract_function in [self.extract_day12_primitives, self.extract_policies]:
1280 vnfd_temp = extract_function(vnfd)
1281 vnfd = vnfd_temp
1282 return vnfd
1283
1284 def _validate_descriptor_changes(
1285 self,
1286 descriptor_file_name: str,
1287 old_descriptor_directory: str,
1288 new_descriptor_directory: str,
1289 ):
1290 """Compares the old and new VNFD descriptors and validates the new descriptor.
1291
1292 Args:
1293 old_descriptor_directory (str): Directory of descriptor which is in-use
1294 new_descriptor_directory (str): Directory of directory which is proposed to update (new revision)
1295
1296 Returns:
1297 None
1298
1299 Raises:
1300 EngineException: In case of error when there are unallowed changes
1301 """
1302 try:
1303 with self.fs.file_open(
1304 (old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
1305 ) as old_descriptor_file:
1306 with self.fs.file_open(
1307 (new_descriptor_directory, descriptor_file_name), "r"
1308 ) as new_descriptor_file:
1309 old_content = yaml.load(
1310 old_descriptor_file.read(), Loader=yaml.SafeLoader
1311 )
1312 new_content = yaml.load(
1313 new_descriptor_file.read(), Loader=yaml.SafeLoader
1314 )
1315 if old_content and new_content:
1316 if self.find_software_version(
1317 old_content
1318 ) != self.find_software_version(new_content):
1319 return
1320 disallowed_change = DeepDiff(
1321 self.remove_modifiable_items(old_content),
1322 self.remove_modifiable_items(new_content),
1323 )
1324 if disallowed_change:
1325 changed_nodes = functools.reduce(
1326 lambda a, b: a + " , " + b,
1327 [
1328 node.lstrip("root")
1329 for node in disallowed_change.get(
1330 "values_changed"
1331 ).keys()
1332 ],
1333 )
1334 raise EngineException(
1335 f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
1336 "there are disallowed changes in the vnf descriptor.",
1337 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1338 )
1339 except (
1340 DbException,
1341 AttributeError,
1342 IndexError,
1343 KeyError,
1344 ValueError,
1345 ) as e:
1346 raise type(e)(
1347 "VNF Descriptor could not be processed with error: {}.".format(e)
1348 )
1349
1350
1351 class NsdTopic(DescriptorTopic):
1352 topic = "nsds"
1353 topic_msg = "nsd"
1354
1355 def __init__(self, db, fs, msg, auth):
1356 DescriptorTopic.__init__(self, db, fs, msg, auth)
1357
1358 def pyangbind_validation(self, item, data, force=False):
1359 if self._descriptor_data_is_in_old_format(data):
1360 raise EngineException(
1361 "ERROR: Unsupported descriptor format. Please, use an ETSI SOL006 descriptor.",
1362 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1363 )
1364 try:
1365 nsd_vnf_profiles = data.get("df", [{}])[0].get("vnf-profile", [])
1366 mynsd = etsi_nfv_nsd.etsi_nfv_nsd()
1367 pybindJSONDecoder.load_ietf_json(
1368 {"nsd": {"nsd": [data]}},
1369 None,
1370 None,
1371 obj=mynsd,
1372 path_helper=True,
1373 skip_unknown=force,
1374 )
1375 out = pybindJSON.dumps(mynsd, mode="ietf")
1376 desc_out = self._remove_envelop(yaml.safe_load(out))
1377 desc_out = self._remove_yang_prefixes_from_descriptor(desc_out)
1378 if nsd_vnf_profiles:
1379 desc_out["df"][0]["vnf-profile"] = nsd_vnf_profiles
1380 return desc_out
1381 except Exception as e:
1382 raise EngineException(
1383 "Error in pyangbind validation: {}".format(str(e)),
1384 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1385 )
1386
1387 @staticmethod
1388 def _descriptor_data_is_in_old_format(data):
1389 return ("nsd-catalog" in data) or ("nsd:nsd-catalog" in data)
1390
1391 @staticmethod
1392 def _remove_envelop(indata=None):
1393 if not indata:
1394 return {}
1395 clean_indata = indata
1396
1397 if clean_indata.get("nsd"):
1398 clean_indata = clean_indata["nsd"]
1399 elif clean_indata.get("etsi-nfv-nsd:nsd"):
1400 clean_indata = clean_indata["etsi-nfv-nsd:nsd"]
1401 if clean_indata.get("nsd"):
1402 if (
1403 not isinstance(clean_indata["nsd"], list)
1404 or len(clean_indata["nsd"]) != 1
1405 ):
1406 raise EngineException("'nsd' must be a list of only one element")
1407 clean_indata = clean_indata["nsd"][0]
1408 return clean_indata
1409
1410 def _validate_input_new(self, indata, storage_params, force=False):
1411 indata.pop("nsdOnboardingState", None)
1412 indata.pop("nsdOperationalState", None)
1413 indata.pop("nsdUsageState", None)
1414
1415 indata.pop("links", None)
1416
1417 indata = self.pyangbind_validation("nsds", indata, force)
1418 # Cross references validation in the descriptor
1419 # TODO validata that if contains cloud-init-file or charms, have artifacts _admin.storage."pkg-dir" is not none
1420 for vld in get_iterable(indata.get("virtual-link-desc")):
1421 self.validate_vld_mgmt_network_with_virtual_link_protocol_data(vld, indata)
1422
1423 self.validate_vnf_profiles_vnfd_id(indata)
1424
1425 return indata
1426
1427 @staticmethod
1428 def validate_vld_mgmt_network_with_virtual_link_protocol_data(vld, indata):
1429 if not vld.get("mgmt-network"):
1430 return
1431 vld_id = vld.get("id")
1432 for df in get_iterable(indata.get("df")):
1433 for vlp in get_iterable(df.get("virtual-link-profile")):
1434 if vld_id and vld_id == vlp.get("virtual-link-desc-id"):
1435 if vlp.get("virtual-link-protocol-data"):
1436 raise EngineException(
1437 "Error at df[id='{}']:virtual-link-profile[id='{}']:virtual-link-"
1438 "protocol-data You cannot set a virtual-link-protocol-data "
1439 "when mgmt-network is True".format(df["id"], vlp["id"]),
1440 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1441 )
1442
1443 @staticmethod
1444 def validate_vnf_profiles_vnfd_id(indata):
1445 all_vnfd_ids = set(get_iterable(indata.get("vnfd-id")))
1446 for df in get_iterable(indata.get("df")):
1447 for vnf_profile in get_iterable(df.get("vnf-profile")):
1448 vnfd_id = vnf_profile.get("vnfd-id")
1449 if vnfd_id and vnfd_id not in all_vnfd_ids:
1450 raise EngineException(
1451 "Error at df[id='{}']:vnf_profile[id='{}']:vnfd-id='{}' "
1452 "does not match any vnfd-id".format(
1453 df["id"], vnf_profile["id"], vnfd_id
1454 ),
1455 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1456 )
1457
1458 def _validate_input_edit(self, indata, content, force=False):
1459 # not needed to validate with pyangbind becuase it will be validated at check_conflict_on_edit
1460 """
1461 indata looks as follows:
1462 - In the new case (conformant)
1463 {'nsdOperationalState': 'DISABLED', 'userDefinedData': {'id': 'string23',
1464 '_id': 'c6ddc544-cede-4b94-9ebe-be07b298a3c1', 'name': 'simon46'}}
1465 - In the old case (backwards-compatible)
1466 {'id': 'string23', '_id': 'c6ddc544-cede-4b94-9ebe-be07b298a3c1', 'name': 'simon46'}
1467 """
1468 if "_admin" not in indata:
1469 indata["_admin"] = {}
1470
1471 if "nsdOperationalState" in indata:
1472 if indata["nsdOperationalState"] in ("ENABLED", "DISABLED"):
1473 indata["_admin"]["operationalState"] = indata.pop("nsdOperationalState")
1474 else:
1475 raise EngineException(
1476 "State '{}' is not a valid operational state".format(
1477 indata["nsdOperationalState"]
1478 ),
1479 http_code=HTTPStatus.BAD_REQUEST,
1480 )
1481
1482 # In the case of user defined data, we need to put the data in the root of the object
1483 # to preserve current expected behaviour
1484 if "userDefinedData" in indata:
1485 data = indata.pop("userDefinedData")
1486 if type(data) == dict:
1487 indata["_admin"]["userDefinedData"] = data
1488 else:
1489 raise EngineException(
1490 "userDefinedData should be an object, but is '{}' instead".format(
1491 type(data)
1492 ),
1493 http_code=HTTPStatus.BAD_REQUEST,
1494 )
1495 if (
1496 "operationalState" in indata["_admin"]
1497 and content["_admin"]["operationalState"]
1498 == indata["_admin"]["operationalState"]
1499 ):
1500 raise EngineException(
1501 "nsdOperationalState already {}".format(
1502 content["_admin"]["operationalState"]
1503 ),
1504 http_code=HTTPStatus.CONFLICT,
1505 )
1506 return indata
1507
1508 def _check_descriptor_dependencies(self, session, descriptor):
1509 """
1510 Check that the dependent descriptors exist on a new descriptor or edition. Also checks references to vnfd
1511 connection points are ok
1512 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
1513 :param descriptor: descriptor to be inserted or edit
1514 :return: None or raises exception
1515 """
1516 if session["force"]:
1517 return
1518 vnfds_index = self._get_descriptor_constituent_vnfds_index(session, descriptor)
1519
1520 # Cross references validation in the descriptor and vnfd connection point validation
1521 for df in get_iterable(descriptor.get("df")):
1522 self.validate_df_vnf_profiles_constituent_connection_points(df, vnfds_index)
1523
1524 def _get_descriptor_constituent_vnfds_index(self, session, descriptor):
1525 vnfds_index = {}
1526 if descriptor.get("vnfd-id") and not session["force"]:
1527 for vnfd_id in get_iterable(descriptor.get("vnfd-id")):
1528 query_filter = self._get_project_filter(session)
1529 query_filter["id"] = vnfd_id
1530 vnf_list = self.db.get_list("vnfds", query_filter)
1531 if not vnf_list:
1532 raise EngineException(
1533 "Descriptor error at 'vnfd-id'='{}' references a non "
1534 "existing vnfd".format(vnfd_id),
1535 http_code=HTTPStatus.CONFLICT,
1536 )
1537 vnfds_index[vnfd_id] = vnf_list[0]
1538 return vnfds_index
1539
1540 @staticmethod
1541 def validate_df_vnf_profiles_constituent_connection_points(df, vnfds_index):
1542 for vnf_profile in get_iterable(df.get("vnf-profile")):
1543 vnfd = vnfds_index.get(vnf_profile["vnfd-id"])
1544 all_vnfd_ext_cpds = set()
1545 for ext_cpd in get_iterable(vnfd.get("ext-cpd")):
1546 if ext_cpd.get("id"):
1547 all_vnfd_ext_cpds.add(ext_cpd.get("id"))
1548
1549 for virtual_link in get_iterable(
1550 vnf_profile.get("virtual-link-connectivity")
1551 ):
1552 for vl_cpd in get_iterable(virtual_link.get("constituent-cpd-id")):
1553 vl_cpd_id = vl_cpd.get("constituent-cpd-id")
1554 if vl_cpd_id and vl_cpd_id not in all_vnfd_ext_cpds:
1555 raise EngineException(
1556 "Error at df[id='{}']:vnf-profile[id='{}']:virtual-link-connectivity"
1557 "[virtual-link-profile-id='{}']:constituent-cpd-id='{}' references a "
1558 "non existing ext-cpd:id inside vnfd '{}'".format(
1559 df["id"],
1560 vnf_profile["id"],
1561 virtual_link["virtual-link-profile-id"],
1562 vl_cpd_id,
1563 vnfd["id"],
1564 ),
1565 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1566 )
1567
1568 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
1569 final_content = super().check_conflict_on_edit(
1570 session, final_content, edit_content, _id
1571 )
1572
1573 self._check_descriptor_dependencies(session, final_content)
1574
1575 return final_content
1576
1577 def check_conflict_on_del(self, session, _id, db_content):
1578 """
1579 Check that there is not any NSR that uses this NSD. Only NSRs belonging to this project are considered. Note
1580 that NSD can be public and be used by other projects.
1581 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
1582 :param _id: nsd internal id
1583 :param db_content: The database content of the _id
1584 :return: None or raises EngineException with the conflict
1585 """
1586 if session["force"]:
1587 return
1588 descriptor = db_content
1589 descriptor_id = descriptor.get("id")
1590 if not descriptor_id: # empty nsd not uploaded
1591 return
1592
1593 # check NSD used by NS
1594 _filter = self._get_project_filter(session)
1595 _filter["nsd-id"] = _id
1596 if self.db.get_list("nsrs", _filter):
1597 raise EngineException(
1598 "There is at least one NS instance using this descriptor",
1599 http_code=HTTPStatus.CONFLICT,
1600 )
1601
1602 # check NSD referenced by NST
1603 del _filter["nsd-id"]
1604 _filter["netslice-subnet.ANYINDEX.nsd-ref"] = descriptor_id
1605 if self.db.get_list("nsts", _filter):
1606 raise EngineException(
1607 "There is at least one NetSlice Template referencing this descriptor",
1608 http_code=HTTPStatus.CONFLICT,
1609 )
1610
1611 def delete_extra(self, session, _id, db_content, not_send_msg=None):
1612 """
1613 Deletes associate file system storage (via super)
1614 Deletes associated vnfpkgops from database.
1615 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
1616 :param _id: server internal id
1617 :param db_content: The database content of the descriptor
1618 :return: None
1619 :raises: FsException in case of error while deleting associated storage
1620 """
1621 super().delete_extra(session, _id, db_content, not_send_msg)
1622 self.db.del_list(self.topic+"_revisions", { "_id": { "$regex": _id}})
1623
1624 @staticmethod
1625 def extract_day12_primitives(nsd: dict) -> dict:
1626 """Removes the day12 primitives from the NSD descriptors
1627
1628 Args:
1629 nsd (dict): Descriptor as a dictionary
1630
1631 Returns:
1632 nsd (dict): Cleared NSD
1633 """
1634 if nsd.get("ns-configuration"):
1635 for key in [
1636 "config-primitive",
1637 "initial-config-primitive",
1638 "terminate-config-primitive",
1639 ]:
1640 nsd["ns-configuration"].pop(key, None)
1641 return nsd
1642
1643 def remove_modifiable_items(self, nsd: dict) -> dict:
1644 """Removes the modifiable parts from the VNFD descriptors
1645
1646 It calls different extract functions according to different update types
1647 to clear all the modifiable items from NSD
1648
1649 Args:
1650 nsd (dict): Descriptor as a dictionary
1651
1652 Returns:
1653 nsd (dict): Descriptor which does not include modifiable contents
1654 """
1655 while isinstance(nsd, dict) and nsd.get("nsd"):
1656 nsd = nsd["nsd"]
1657 if isinstance(nsd, list):
1658 nsd = nsd[0]
1659 nsd.pop("_admin", None)
1660 # If the more extractions need to be done from NSD,
1661 # the new extract methods could be appended to below list.
1662 for extract_function in [self.extract_day12_primitives]:
1663 nsd_temp = extract_function(nsd)
1664 nsd = nsd_temp
1665 return nsd
1666
1667 def _validate_descriptor_changes(
1668 self,
1669 descriptor_file_name: str,
1670 old_descriptor_directory: str,
1671 new_descriptor_directory: str,
1672 ):
1673 """Compares the old and new NSD descriptors and validates the new descriptor
1674
1675 Args:
1676 old_descriptor_directory: Directory of descriptor which is in-use
1677 new_descriptor_directory: Directory of directory which is proposed to update (new revision)
1678
1679 Returns:
1680 None
1681
1682 Raises:
1683 EngineException: In case of error if the changes are not allowed
1684 """
1685
1686 try:
1687 with self.fs.file_open(
1688 (old_descriptor_directory, descriptor_file_name), "r"
1689 ) as old_descriptor_file:
1690 with self.fs.file_open(
1691 (new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
1692 ) as new_descriptor_file:
1693 old_content = yaml.load(
1694 old_descriptor_file.read(), Loader=yaml.SafeLoader
1695 )
1696 new_content = yaml.load(
1697 new_descriptor_file.read(), Loader=yaml.SafeLoader
1698 )
1699 if old_content and new_content:
1700 disallowed_change = DeepDiff(
1701 self.remove_modifiable_items(old_content),
1702 self.remove_modifiable_items(new_content),
1703 )
1704 if disallowed_change:
1705 changed_nodes = functools.reduce(
1706 lambda a, b: a + ", " + b,
1707 [
1708 node.lstrip("root")
1709 for node in disallowed_change.get(
1710 "values_changed"
1711 ).keys()
1712 ],
1713 )
1714 raise EngineException(
1715 f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
1716 "there are disallowed changes in the ns descriptor. ",
1717 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1718 )
1719 except (
1720 DbException,
1721 AttributeError,
1722 IndexError,
1723 KeyError,
1724 ValueError,
1725 ) as e:
1726 raise type(e)(
1727 "NS Descriptor could not be processed with error: {}.".format(e)
1728 )
1729
1730 def sol005_projection(self, data):
1731 data["nsdOnboardingState"] = data["_admin"]["onboardingState"]
1732 data["nsdOperationalState"] = data["_admin"]["operationalState"]
1733 data["nsdUsageState"] = data["_admin"]["usageState"]
1734
1735 links = {}
1736 links["self"] = {"href": "/nsd/v1/ns_descriptors/{}".format(data["_id"])}
1737 links["nsd_content"] = {
1738 "href": "/nsd/v1/ns_descriptors/{}/nsd_content".format(data["_id"])
1739 }
1740 data["_links"] = links
1741
1742 return super().sol005_projection(data)
1743
1744
1745 class NstTopic(DescriptorTopic):
1746 topic = "nsts"
1747 topic_msg = "nst"
1748 quota_name = "slice_templates"
1749
1750 def __init__(self, db, fs, msg, auth):
1751 DescriptorTopic.__init__(self, db, fs, msg, auth)
1752
1753 def pyangbind_validation(self, item, data, force=False):
1754 try:
1755 mynst = nst_im()
1756 pybindJSONDecoder.load_ietf_json(
1757 {"nst": [data]},
1758 None,
1759 None,
1760 obj=mynst,
1761 path_helper=True,
1762 skip_unknown=force,
1763 )
1764 out = pybindJSON.dumps(mynst, mode="ietf")
1765 desc_out = self._remove_envelop(yaml.safe_load(out))
1766 return desc_out
1767 except Exception as e:
1768 raise EngineException(
1769 "Error in pyangbind validation: {}".format(str(e)),
1770 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
1771 )
1772
1773 @staticmethod
1774 def _remove_envelop(indata=None):
1775 if not indata:
1776 return {}
1777 clean_indata = indata
1778
1779 if clean_indata.get("nst"):
1780 if (
1781 not isinstance(clean_indata["nst"], list)
1782 or len(clean_indata["nst"]) != 1
1783 ):
1784 raise EngineException("'nst' must be a list only one element")
1785 clean_indata = clean_indata["nst"][0]
1786 elif clean_indata.get("nst:nst"):
1787 if (
1788 not isinstance(clean_indata["nst:nst"], list)
1789 or len(clean_indata["nst:nst"]) != 1
1790 ):
1791 raise EngineException("'nst:nst' must be a list only one element")
1792 clean_indata = clean_indata["nst:nst"][0]
1793 return clean_indata
1794
1795 def _validate_input_new(self, indata, storage_params, force=False):
1796 indata.pop("onboardingState", None)
1797 indata.pop("operationalState", None)
1798 indata.pop("usageState", None)
1799 indata = self.pyangbind_validation("nsts", indata, force)
1800 return indata.copy()
1801
1802 def _check_descriptor_dependencies(self, session, descriptor):
1803 """
1804 Check that the dependent descriptors exist on a new descriptor or edition
1805 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
1806 :param descriptor: descriptor to be inserted or edit
1807 :return: None or raises exception
1808 """
1809 if not descriptor.get("netslice-subnet"):
1810 return
1811 for nsd in descriptor["netslice-subnet"]:
1812 nsd_id = nsd["nsd-ref"]
1813 filter_q = self._get_project_filter(session)
1814 filter_q["id"] = nsd_id
1815 if not self.db.get_list("nsds", filter_q):
1816 raise EngineException(
1817 "Descriptor error at 'netslice-subnet':'nsd-ref'='{}' references a non "
1818 "existing nsd".format(nsd_id),
1819 http_code=HTTPStatus.CONFLICT,
1820 )
1821
1822 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
1823 final_content = super().check_conflict_on_edit(
1824 session, final_content, edit_content, _id
1825 )
1826
1827 self._check_descriptor_dependencies(session, final_content)
1828 return final_content
1829
1830 def check_conflict_on_del(self, session, _id, db_content):
1831 """
1832 Check that there is not any NSIR that uses this NST. Only NSIRs belonging to this project are considered. Note
1833 that NST can be public and be used by other projects.
1834 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
1835 :param _id: nst internal id
1836 :param db_content: The database content of the _id.
1837 :return: None or raises EngineException with the conflict
1838 """
1839 # TODO: Check this method
1840 if session["force"]:
1841 return
1842 # Get Network Slice Template from Database
1843 _filter = self._get_project_filter(session)
1844 _filter["_admin.nst-id"] = _id
1845 if self.db.get_list("nsis", _filter):
1846 raise EngineException(
1847 "there is at least one Netslice Instance using this descriptor",
1848 http_code=HTTPStatus.CONFLICT,
1849 )
1850
1851 def sol005_projection(self, data):
1852 data["onboardingState"] = data["_admin"]["onboardingState"]
1853 data["operationalState"] = data["_admin"]["operationalState"]
1854 data["usageState"] = data["_admin"]["usageState"]
1855
1856 links = {}
1857 links["self"] = {"href": "/nst/v1/netslice_templates/{}".format(data["_id"])}
1858 links["nst"] = {"href": "/nst/v1/netslice_templates/{}/nst".format(data["_id"])}
1859 data["_links"] = links
1860
1861 return super().sol005_projection(data)
1862
1863
1864 class PduTopic(BaseTopic):
1865 topic = "pdus"
1866 topic_msg = "pdu"
1867 quota_name = "pduds"
1868 schema_new = pdu_new_schema
1869 schema_edit = pdu_edit_schema
1870
1871 def __init__(self, db, fs, msg, auth):
1872 BaseTopic.__init__(self, db, fs, msg, auth)
1873
1874 @staticmethod
1875 def format_on_new(content, project_id=None, make_public=False):
1876 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
1877 content["_admin"]["onboardingState"] = "CREATED"
1878 content["_admin"]["operationalState"] = "ENABLED"
1879 content["_admin"]["usageState"] = "NOT_IN_USE"
1880
1881 def check_conflict_on_del(self, session, _id, db_content):
1882 """
1883 Check that there is not any vnfr that uses this PDU
1884 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
1885 :param _id: pdu internal id
1886 :param db_content: The database content of the _id.
1887 :return: None or raises EngineException with the conflict
1888 """
1889 if session["force"]:
1890 return
1891
1892 _filter = self._get_project_filter(session)
1893 _filter["vdur.pdu-id"] = _id
1894 if self.db.get_list("vnfrs", _filter):
1895 raise EngineException(
1896 "There is at least one VNF instance using this PDU",
1897 http_code=HTTPStatus.CONFLICT,
1898 )
1899
1900
1901 class VnfPkgOpTopic(BaseTopic):
1902 topic = "vnfpkgops"
1903 topic_msg = "vnfd"
1904 schema_new = vnfpkgop_new_schema
1905 schema_edit = None
1906
1907 def __init__(self, db, fs, msg, auth):
1908 BaseTopic.__init__(self, db, fs, msg, auth)
1909
1910 def edit(self, session, _id, indata=None, kwargs=None, content=None):
1911 raise EngineException(
1912 "Method 'edit' not allowed for topic '{}'".format(self.topic),
1913 HTTPStatus.METHOD_NOT_ALLOWED,
1914 )
1915
1916 def delete(self, session, _id, dry_run=False):
1917 raise EngineException(
1918 "Method 'delete' not allowed for topic '{}'".format(self.topic),
1919 HTTPStatus.METHOD_NOT_ALLOWED,
1920 )
1921
1922 def delete_list(self, session, filter_q=None):
1923 raise EngineException(
1924 "Method 'delete_list' not allowed for topic '{}'".format(self.topic),
1925 HTTPStatus.METHOD_NOT_ALLOWED,
1926 )
1927
1928 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1929 """
1930 Creates a new entry into database.
1931 :param rollback: list to append created items at database in case a rollback may to be done
1932 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
1933 :param indata: data to be inserted
1934 :param kwargs: used to override the indata descriptor
1935 :param headers: http request headers
1936 :return: _id, op_id:
1937 _id: identity of the inserted data.
1938 op_id: None
1939 """
1940 self._update_input_with_kwargs(indata, kwargs)
1941 validate_input(indata, self.schema_new)
1942 vnfpkg_id = indata["vnfPkgId"]
1943 filter_q = BaseTopic._get_project_filter(session)
1944 filter_q["_id"] = vnfpkg_id
1945 vnfd = self.db.get_one("vnfds", filter_q)
1946 operation = indata["lcmOperationType"]
1947 kdu_name = indata["kdu_name"]
1948 for kdu in vnfd.get("kdu", []):
1949 if kdu["name"] == kdu_name:
1950 helm_chart = kdu.get("helm-chart")
1951 juju_bundle = kdu.get("juju-bundle")
1952 break
1953 else:
1954 raise EngineException(
1955 "Not found vnfd[id='{}']:kdu[name='{}']".format(vnfpkg_id, kdu_name)
1956 )
1957 if helm_chart:
1958 indata["helm-chart"] = helm_chart
1959 match = fullmatch(r"([^/]*)/([^/]*)", helm_chart)
1960 repo_name = match.group(1) if match else None
1961 elif juju_bundle:
1962 indata["juju-bundle"] = juju_bundle
1963 match = fullmatch(r"([^/]*)/([^/]*)", juju_bundle)
1964 repo_name = match.group(1) if match else None
1965 else:
1966 raise EngineException(
1967 "Found neither 'helm-chart' nor 'juju-bundle' in vnfd[id='{}']:kdu[name='{}']".format(
1968 vnfpkg_id, kdu_name
1969 )
1970 )
1971 if repo_name:
1972 del filter_q["_id"]
1973 filter_q["name"] = repo_name
1974 repo = self.db.get_one("k8srepos", filter_q)
1975 k8srepo_id = repo.get("_id")
1976 k8srepo_url = repo.get("url")
1977 else:
1978 k8srepo_id = None
1979 k8srepo_url = None
1980 indata["k8srepoId"] = k8srepo_id
1981 indata["k8srepo_url"] = k8srepo_url
1982 vnfpkgop_id = str(uuid4())
1983 vnfpkgop_desc = {
1984 "_id": vnfpkgop_id,
1985 "operationState": "PROCESSING",
1986 "vnfPkgId": vnfpkg_id,
1987 "lcmOperationType": operation,
1988 "isAutomaticInvocation": False,
1989 "isCancelPending": False,
1990 "operationParams": indata,
1991 "links": {
1992 "self": "/osm/vnfpkgm/v1/vnfpkg_op_occs/" + vnfpkgop_id,
1993 "vnfpkg": "/osm/vnfpkgm/v1/vnf_packages/" + vnfpkg_id,
1994 },
1995 }
1996 self.format_on_new(
1997 vnfpkgop_desc, session["project_id"], make_public=session["public"]
1998 )
1999 ctime = vnfpkgop_desc["_admin"]["created"]
2000 vnfpkgop_desc["statusEnteredTime"] = ctime
2001 vnfpkgop_desc["startTime"] = ctime
2002 self.db.create(self.topic, vnfpkgop_desc)
2003 rollback.append({"topic": self.topic, "_id": vnfpkgop_id})
2004 self.msg.write(self.topic_msg, operation, vnfpkgop_desc)
2005 return vnfpkgop_id, None