Fix Bug 1493 repo generation from osm-packages
[osm/osmclient.git] / osmclient / common / package_tool.py
1 # /bin/env python3
2 # Copyright 2019 ATOS
3 #
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17
18 import glob
19 import hashlib
20 import logging
21 import os
22 import shutil
23 import subprocess
24 import tarfile
25 import time
26
27 from jinja2 import Environment, PackageLoader
28 from osm_im.validation import Validation as validation_im
29 from osm_im.validation import ValidationException
30 from osm_im import im_translation
31 from osmclient.common.exceptions import ClientException
32 import yaml
33
34
35 class PackageTool(object):
36 def __init__(self, client=None):
37 self._client = client
38 self._logger = logging.getLogger("osmclient")
39 self._validator = validation_im()
40
41 def create(
42 self,
43 package_type,
44 base_directory,
45 package_name,
46 override,
47 image,
48 vdus,
49 vcpu,
50 memory,
51 storage,
52 interfaces,
53 vendor,
54 detailed,
55 netslice_subnets,
56 netslice_vlds,
57 ):
58 """
59 **Create a package descriptor**
60
61 :params:
62 - package_type: [vnf, ns, nst]
63 - base directory: path of destination folder
64 - package_name: is the name of the package to be created
65 - image: specify the image of the vdu
66 - vcpu: number of virtual cpus of the vdu
67 - memory: amount of memory in MB pf the vdu
68 - storage: amount of storage in GB of the vdu
69 - interfaces: number of interfaces besides management interface
70 - vendor: vendor name of the vnf/ns
71 - detailed: include all possible values for NSD, VNFD, NST
72 - netslice_subnets: number of netslice_subnets for the NST
73 - netslice_vlds: number of virtual link descriptors for the NST
74
75 :return: status
76 """
77 self._logger.debug("")
78 # print("location: {}".format(osmclient.__path__))
79 file_loader = PackageLoader("osmclient")
80 env = Environment(loader=file_loader)
81 if package_type == "ns":
82 template = env.get_template("nsd.yaml.j2")
83 content = {
84 "name": package_name,
85 "vendor": vendor,
86 "vdus": vdus,
87 "clean": False,
88 "interfaces": interfaces,
89 "detailed": detailed,
90 }
91 elif package_type == "vnf":
92 template = env.get_template("vnfd.yaml.j2")
93 content = {
94 "name": package_name,
95 "vendor": vendor,
96 "vdus": vdus,
97 "clean": False,
98 "interfaces": interfaces,
99 "image": image,
100 "vcpu": vcpu,
101 "memory": memory,
102 "storage": storage,
103 "detailed": detailed,
104 }
105 elif package_type == "nst":
106 template = env.get_template("nst.yaml.j2")
107 content = {
108 "name": package_name,
109 "vendor": vendor,
110 "interfaces": interfaces,
111 "netslice_subnets": netslice_subnets,
112 "netslice_vlds": netslice_vlds,
113 "detailed": detailed,
114 }
115 else:
116 raise ClientException(
117 "Wrong descriptor type {}. Options: ns, vnf, nst".format(package_type)
118 )
119
120 # print("To be rendered: {}".format(content))
121 output = template.render(content)
122 # print(output)
123
124 structure = self.discover_folder_structure(
125 base_directory, package_name, override
126 )
127 if structure.get("folders"):
128 self.create_folders(structure["folders"], package_type)
129 if structure.get("files"):
130 self.create_files(structure["files"], output, package_type)
131 return "Created"
132
133 def validate(self, base_directory, recursive=True, old_format=False):
134 """
135 **Validate OSM Descriptors given a path**
136
137 :params:
138 - base_directory is the root path for all descriptors
139
140 :return: List of dict of validated descriptors. keys: type, path, valid, error
141 """
142 self._logger.debug("")
143 table = []
144 if recursive:
145 descriptors_paths = [
146 f for f in glob.glob(base_directory + "/**/*.yaml", recursive=recursive)
147 ]
148 else:
149 descriptors_paths = [
150 f for f in glob.glob(base_directory + "/*.yaml", recursive=recursive)
151 ]
152 self._logger.info("Base directory: {}".format(base_directory))
153 self._logger.info("{} Descriptors found to validate".format(len(descriptors_paths)))
154 for desc_path in descriptors_paths:
155 with open(desc_path) as descriptor_file:
156 descriptor_data = descriptor_file.read()
157 desc_type = "-"
158 try:
159 desc_type, descriptor_data = validation_im.yaml_validation(
160 self, descriptor_data
161 )
162 self._logger.debug(f"Validate {desc_type} {descriptor_data}")
163 if not old_format:
164 if desc_type == "vnfd" or desc_type == "nsd":
165 self._logger.error(
166 "OSM descriptor '{}' written in an unsupported format. Please update to ETSI SOL006 format".format(
167 desc_path
168 )
169 )
170 self._logger.warning(
171 "Package validation skipped. It can still be done with 'osm package-validate --old'"
172 )
173 self._logger.warning(
174 "Package build can still be done with 'osm package-build --skip-validation'"
175 )
176 raise Exception("Not SOL006 format")
177 validation_im.pyangbind_validation(self, desc_type, descriptor_data)
178 table.append(
179 {"type": desc_type, "path": desc_path, "valid": "OK", "error": "-"}
180 )
181 except Exception as e:
182 table.append(
183 {
184 "type": desc_type,
185 "path": desc_path,
186 "valid": "ERROR",
187 "error": str(e),
188 }
189 )
190 self._logger.debug(table[-1])
191 return table
192
193 def translate(self, base_directory, recursive=True, dryrun=False):
194 """
195 **Translate OSM Packages given a path**
196
197 :params:
198 - base_directory is the root path for all packages
199
200 :return: List of dict of translated packages. keys: current type, new type, path, valid, translated, error
201 """
202 self._logger.debug("")
203 table = []
204 if recursive:
205 descriptors_paths = [
206 f for f in glob.glob(base_directory + "/**/*.yaml", recursive=recursive)
207 ]
208 else:
209 descriptors_paths = [
210 f for f in glob.glob(base_directory + "/*.yaml", recursive=recursive)
211 ]
212 print("Base directory: {}".format(base_directory))
213 print("{} Descriptors found to validate".format(len(descriptors_paths)))
214 for desc_path in descriptors_paths:
215 with open(desc_path) as descriptor_file:
216 descriptor_data = descriptor_file.read()
217 desc_type = "-"
218 try:
219 desc_type, descriptor_data = validation_im.yaml_validation(
220 self, descriptor_data
221 )
222 self._logger.debug("desc_type: {}".format(desc_type))
223 self._logger.debug("descriptor_data:\n{}".format(descriptor_data))
224 self._validator.pyangbind_validation(desc_type, descriptor_data)
225 if not (desc_type == "vnfd" or desc_type == "nsd"):
226 table.append(
227 {
228 "current type": desc_type,
229 "new type": desc_type,
230 "path": desc_path,
231 "valid": "OK",
232 "translated": "N/A",
233 "error": "-",
234 }
235 )
236 else:
237 new_desc_type = desc_type
238 try:
239 sol006_model = yaml.safe_dump(
240 im_translation.translate_im_model_to_sol006(
241 descriptor_data
242 ),
243 indent=4,
244 default_flow_style=False,
245 )
246 (
247 new_desc_type,
248 new_descriptor_data,
249 ) = self._validator.yaml_validation(sol006_model)
250 self._validator.pyangbind_validation(
251 new_desc_type, new_descriptor_data
252 )
253 if not dryrun:
254 with open(desc_path, "w") as descriptor_file:
255 descriptor_file.write(sol006_model)
256 table.append(
257 {
258 "current type": desc_type,
259 "new type": new_desc_type,
260 "path": desc_path,
261 "valid": "OK",
262 "translated": "OK",
263 "error": "-",
264 }
265 )
266 except ValidationException as ve2:
267 table.append(
268 {
269 "current type": desc_type,
270 "new type": new_desc_type,
271 "path": desc_path,
272 "valid": "OK",
273 "translated": "ERROR",
274 "error": "Error in the post-validation: {}".format(
275 str(ve2)
276 ),
277 }
278 )
279 except Exception as e2:
280 table.append(
281 {
282 "current type": desc_type,
283 "new type": new_desc_type,
284 "path": desc_path,
285 "valid": "OK",
286 "translated": "ERROR",
287 "error": "Error in the translation: {}".format(str(e2)),
288 }
289 )
290 except ValidationException as ve:
291 table.append(
292 {
293 "current type": desc_type,
294 "new type": "N/A",
295 "path": desc_path,
296 "valid": "ERROR",
297 "translated": "N/A",
298 "error": "Error in the pre-validation: {}".format(str(ve)),
299 }
300 )
301 except Exception as e:
302 table.append(
303 {
304 "current type": desc_type,
305 "new type": "N/A",
306 "path": desc_path,
307 "valid": "ERROR",
308 "translated": "N/A",
309 "error": str(e),
310 }
311 )
312 return table
313
314 def descriptor_translate(self, descriptor_file):
315 """
316 **Translate input descriptor file from Rel EIGHT OSM to SOL006**
317
318 :params:
319 - base_directory is the root path for all packages
320
321 :return: YAML descriptor in the new format
322 """
323 self._logger.debug("")
324 with open(descriptor_file, "r") as df:
325 im_model = yaml.safe_load(df.read())
326 sol006_model = im_translation.translate_im_model_to_sol006(im_model)
327 return yaml.safe_dump(sol006_model, indent=4, default_flow_style=False)
328
329 def build(self, package_folder, skip_validation=False, skip_charm_build=False):
330 """
331 **Creates a .tar.gz file given a package_folder**
332
333 :params:
334 - package_folder: is the name of the folder to be packaged
335 - skip_validation: is the flag to validate or not the descriptors on the folder before build
336
337 :returns: message result for the build process
338 """
339 self._logger.debug("")
340 package_folder = package_folder.rstrip("/")
341 if not os.path.exists("{}".format(package_folder)):
342 return "Fail, package is not in the specified path"
343 if not skip_validation:
344 print("Validating package {}".format(package_folder))
345 results = self.validate(package_folder, recursive=False)
346 if results:
347 for result in results:
348 if result["valid"] != "OK":
349 raise ClientException(
350 "There was an error validating the file {} with error: {}".format(
351 result["path"], result["error"]
352 )
353 )
354 print("Validation OK")
355 else:
356 raise ClientException(
357 "No descriptor file found in: {}".format(package_folder)
358 )
359 charm_list = self.build_all_charms(package_folder, skip_charm_build)
360 return self.build_tarfile(package_folder, charm_list)
361
362 def calculate_checksum(self, package_folder):
363 """
364 **Function to calculate the checksum given a folder**
365
366 :params:
367 - package_folder: is the folder where we have the files to calculate the checksum
368 :returns: None
369 """
370 self._logger.debug("")
371 files = [
372 f
373 for f in glob.glob(package_folder + "/**/*.*", recursive=True)
374 if os.path.isfile(f)
375 ]
376 with open("{}/checksums.txt".format(package_folder), "w+") as checksum:
377 for file_item in files:
378 if "checksums.txt" in file_item:
379 continue
380 # from https://www.quickprogrammingtips.com/python/how-to-calculate-md5-hash-of-a-file-in-python.html
381 md5_hash = hashlib.md5()
382 with open(file_item, "rb") as f:
383 # Read and update hash in chunks of 4K
384 for byte_block in iter(lambda: f.read(4096), b""):
385 md5_hash.update(byte_block)
386 checksum.write("{}\t{}\n".format(md5_hash.hexdigest(), file_item))
387
388 def create_folders(self, folders, package_type):
389 """
390 **Create folder given a list of folders**
391
392 :params:
393 - folders: [List] list of folders paths to be created
394 - package_type: is the type of package to be created
395 :return: None
396 """
397 self._logger.debug("")
398 for folder in folders:
399 try:
400 # print("Folder {} == package_type {}".format(folder[1], package_type))
401 if folder[1] == package_type:
402 print("Creating folder:\t{}".format(folder[0]))
403 os.makedirs(folder[0])
404 except FileExistsError:
405 pass
406
407 def save_file(self, file_name, file_body):
408 """
409 **Create a file given a name and the content**
410
411 :params:
412 - file_name: is the name of the file with the relative route
413 - file_body: is the content of the file
414 :return: None
415 """
416 self._logger.debug("")
417 print("Creating file: \t{}".format(file_name))
418 try:
419 with open(file_name, "w+") as f:
420 f.write(file_body)
421 except Exception as e:
422 raise ClientException(e)
423
424 def generate_readme(self):
425 """
426 **Creates the README content**
427
428 :returns: readme content
429 """
430 self._logger.debug("")
431 return """# Descriptor created by OSM descriptor package generated\n\n**Created on {} **""".format(
432 time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime())
433 )
434
435 def generate_cloud_init(self):
436 """
437 **Creates the cloud-init content**
438
439 :returns: cloud-init content
440 """
441 self._logger.debug("")
442 return "---\n#cloud-config"
443
444 def create_files(self, files, file_content, package_type):
445 """
446 **Creates the files given the file list and type**
447
448 :params:
449 - files: is the list of files structure
450 - file_content: is the content of the descriptor rendered by the template
451 - package_type: is the type of package to filter the creation structure
452
453 :return: None
454 """
455 self._logger.debug("")
456 for file_item, file_package, file_type in files:
457 if package_type == file_package:
458 if file_type == "descriptor":
459 self.save_file(file_item, file_content)
460 elif file_type == "readme":
461 self.save_file(file_item, self.generate_readme())
462 elif file_type == "cloud_init":
463 self.save_file(file_item, self.generate_cloud_init())
464
465 def check_files_folders(self, path_list, override):
466 """
467 **Find files and folders missing given a directory structure {"folders": [], "files": []}**
468
469 :params:
470 - path_list: is the list of files and folders to be created
471 - override: is the flag used to indicate the creation of the list even if the file exist to override it
472
473 :return: Missing paths Dict
474 """
475 self._logger.debug("")
476 missing_paths = {}
477 folders = []
478 files = []
479 for folder in path_list.get("folders"):
480 if not os.path.exists(folder[0]):
481 folders.append(folder)
482 missing_paths["folders"] = folders
483
484 for file_item in path_list.get("files"):
485 if not os.path.exists(file_item[0]) or override is True:
486 files.append(file_item)
487 missing_paths["files"] = files
488
489 return missing_paths
490
491 def build_all_charms(self, package_folder, skip_charm_build):
492 """
493 **Read the descriptor file, check that the charms referenced are in the folder and compiles them**
494
495 :params:
496 - packet_folder: is the location of the package
497 :return: Files and Folders not found. In case of override, it will return all file list
498 """
499 self._logger.debug("")
500 charms_set = set()
501 descriptor_file = False
502 descriptors_paths = [f for f in glob.glob(package_folder + "/*.yaml")]
503 for file in descriptors_paths:
504 if file.endswith("nfd.yaml"):
505 descriptor_file = True
506 charms_set = self.charms_search(file, "vnf")
507 if file.endswith("nsd.yaml"):
508 descriptor_file = True
509 charms_set = self.charms_search(file, "ns")
510 print("List of charms in the descriptor: {}".format(charms_set))
511 if not descriptor_file:
512 raise ClientException(
513 'Descriptor filename is not correct in: {}. It should end with "nfd.yaml" or "nsd.yaml"'.format(
514 package_folder
515 )
516 )
517 if charms_set and not skip_charm_build:
518 for charmName in charms_set:
519 if os.path.isdir(
520 "{}/charms/layers/{}".format(package_folder, charmName)
521 ):
522 print(
523 "Building charm {}/charms/layers/{}".format(
524 package_folder, charmName
525 )
526 )
527 self.charm_build(package_folder, charmName)
528 print("Charm built: {}".format(charmName))
529 elif os.path.isdir(
530 "{}/charms/ops/{}".format(package_folder, charmName)
531 ):
532 self.charmcraft_build(package_folder, charmName)
533 else:
534 if not os.path.isdir(
535 "{}/charms/{}".format(package_folder, charmName)
536 ):
537 raise ClientException(
538 "The charm: {} referenced in the descriptor file "
539 "is not present either in {}/charms or in {}/charms/layers".format(
540 charmName, package_folder, package_folder
541 )
542 )
543 self._logger.debug("Return list of charms: {}".format(charms_set))
544 return charms_set
545
546 def discover_folder_structure(self, base_directory, name, override):
547 """
548 **Discover files and folders structure for OSM descriptors given a base_directory and name**
549
550 :params:
551 - base_directory: is the location of the package to be created
552 - name: is the name of the package
553 - override: is the flag used to indicate the creation of the list even if the file exist to override it
554 :return: Files and Folders not found. In case of override, it will return all file list
555 """
556 self._logger.debug("")
557 prefix = "{}/{}".format(base_directory, name)
558 files_folders = {
559 "folders": [
560 ("{}_ns".format(prefix), "ns"),
561 ("{}_ns/icons".format(prefix), "ns"),
562 ("{}_ns/charms".format(prefix), "ns"),
563 ("{}_vnf".format(name), "vnf"),
564 ("{}_vnf/charms".format(prefix), "vnf"),
565 ("{}_vnf/cloud_init".format(prefix), "vnf"),
566 ("{}_vnf/images".format(prefix), "vnf"),
567 ("{}_vnf/icons".format(prefix), "vnf"),
568 ("{}_vnf/scripts".format(prefix), "vnf"),
569 ("{}_nst".format(prefix), "nst"),
570 ("{}_nst/icons".format(prefix), "nst"),
571 ],
572 "files": [
573 ("{}_ns/{}_nsd.yaml".format(prefix, name), "ns", "descriptor"),
574 ("{}_ns/README.md".format(prefix), "ns", "readme"),
575 ("{}_vnf/{}_vnfd.yaml".format(prefix, name), "vnf", "descriptor"),
576 (
577 "{}_vnf/cloud_init/cloud-config.txt".format(prefix),
578 "vnf",
579 "cloud_init",
580 ),
581 ("{}_vnf/README.md".format(prefix), "vnf", "readme"),
582 ("{}_nst/{}_nst.yaml".format(prefix, name), "nst", "descriptor"),
583 ("{}_nst/README.md".format(prefix), "nst", "readme"),
584 ],
585 }
586 missing_files_folders = self.check_files_folders(files_folders, override)
587 # print("Missing files and folders: {}".format(missing_files_folders))
588 return missing_files_folders
589
590 def charm_build(self, charms_folder, build_name):
591 """
592 Build the charms inside the package.
593 params: package_folder is the name of the folder where is the charms to compile.
594 build_name is the name of the layer or interface
595 """
596 self._logger.debug("")
597 os.environ["JUJU_REPOSITORY"] = "{}/charms".format(charms_folder)
598 os.environ["CHARM_LAYERS_DIR"] = "{}/layers".format(
599 os.environ["JUJU_REPOSITORY"]
600 )
601 os.environ["CHARM_INTERFACES_DIR"] = "{}/interfaces".format(
602 os.environ["JUJU_REPOSITORY"]
603 )
604 os.environ["CHARM_BUILD_DIR"] = "{}/charms/builds".format(charms_folder)
605 if not os.path.exists(os.environ["CHARM_BUILD_DIR"]):
606 os.makedirs(os.environ["CHARM_BUILD_DIR"])
607 src_folder = "{}/{}".format(os.environ["CHARM_LAYERS_DIR"], build_name)
608 result = subprocess.run(["charm", "build", "{}".format(src_folder)])
609 if result.returncode == 1:
610 raise ClientException("failed to build the charm: {}".format(src_folder))
611 self._logger.verbose("charm {} built".format(src_folder))
612
613 def charmcraft_build(self, package_folder, charm_name):
614 """
615 Build the charms inside the package (new operator framework charms)
616 params: package_folder is the name of the folder where is the charms to compile.
617 build_name is the name of the layer or interface
618 """
619 self._logger.debug("Building charm {}".format(charm_name))
620 src_folder = f"{package_folder}/charms/ops/{charm_name}"
621 current_directory = os.getcwd()
622 os.chdir(src_folder)
623 try:
624 result = subprocess.run(["charmcraft", "build"])
625 if result.returncode == 1:
626 raise ClientException(
627 "failed to build the charm: {}".format(src_folder)
628 )
629 subprocess.run(["rm", "-rf", f"../../{charm_name}"])
630 subprocess.run(["mv", "build", f"../../{charm_name}"])
631 self._logger.verbose("charm {} built".format(src_folder))
632 finally:
633 os.chdir(current_directory)
634
635 def build_tarfile(self, package_folder, charm_list=None):
636 """
637 Creates a .tar.gz file given a package_folder
638 params: package_folder is the name of the folder to be packaged
639 returns: .tar.gz name
640 """
641 self._logger.debug("")
642 cwd = None
643 try:
644 directory_name, package_name = self.create_temp_dir(
645 package_folder, charm_list
646 )
647 cwd = os.getcwd()
648 os.chdir(directory_name)
649 self.calculate_checksum(package_name)
650 with tarfile.open("{}.tar.gz".format(package_name), mode="w:gz") as archive:
651 print("Adding File: {}".format(package_name))
652 archive.add("{}".format(package_name), recursive=True)
653 # return "Created {}.tar.gz".format(package_folder)
654 # self.build("{}".format(os.path.basename(package_folder)))
655 os.chdir(cwd)
656 cwd = None
657 created_package = "{}/{}.tar.gz".format(
658 os.path.dirname(package_folder) or ".", package_name
659 )
660 os.rename(
661 "{}/{}.tar.gz".format(directory_name, package_name), created_package
662 )
663 os.rename(
664 "{}/{}/checksums.txt".format(directory_name, package_name),
665 "{}/checksums.txt".format(package_folder),
666 )
667 print("Package created: {}".format(created_package))
668 return created_package
669 except Exception as exc:
670 raise ClientException(
671 "failure during build of targz file (create temp dir, calculate checksum, "
672 "tar.gz file): {}".format(exc)
673 )
674 finally:
675 if cwd:
676 os.chdir(cwd)
677 shutil.rmtree(os.path.join(package_folder, "tmp"))
678
679 def create_temp_dir(self, package_folder, charm_list=None):
680 """
681 Method to create a temporary folder where we can move the files in package_folder
682 """
683 self._logger.debug("")
684 ignore_patterns = ".gitignore"
685 ignore = shutil.ignore_patterns(ignore_patterns)
686 directory_name = os.path.abspath(package_folder)
687 package_name = os.path.basename(directory_name)
688 directory_name += "/tmp"
689 os.makedirs("{}/{}".format(directory_name, package_name), exist_ok=True)
690 self._logger.debug("Makedirs DONE: {}/{}".format(directory_name, package_name))
691 for item in os.listdir(package_folder):
692 self._logger.debug("Item: {}".format(item))
693 if item != "tmp":
694 s = os.path.join(package_folder, item)
695 d = os.path.join(os.path.join(directory_name, package_name), item)
696 if os.path.isdir(s):
697 if item == "charms":
698 os.makedirs(d, exist_ok=True)
699 s_builds = os.path.join(s, "builds")
700 for charm in charm_list:
701 self._logger.debug("Copying charm {}".format(charm))
702 if charm in os.listdir(s):
703 s_charm = os.path.join(s, charm)
704 elif charm in os.listdir(s_builds):
705 s_charm = os.path.join(s_builds, charm)
706 else:
707 raise ClientException(
708 "The charm {} referenced in the descriptor file "
709 "could not be found in {}/charms or in {}/charms/builds".format(
710 charm, package_folder, package_folder
711 )
712 )
713 d_temp = os.path.join(d, charm)
714 self._logger.debug(
715 "Copying tree: {} -> {}".format(s_charm, d_temp)
716 )
717 shutil.copytree(
718 s_charm, d_temp, symlinks=True, ignore=ignore
719 )
720 self._logger.debug("DONE")
721 else:
722 self._logger.debug("Copying tree: {} -> {}".format(s, d))
723 shutil.copytree(s, d, symlinks=True, ignore=ignore)
724 self._logger.debug("DONE")
725 else:
726 if item in ignore_patterns:
727 continue
728 self._logger.debug("Copying file: {} -> {}".format(s, d))
729 shutil.copy2(s, d)
730 self._logger.debug("DONE")
731 return directory_name, package_name
732
733 def charms_search(self, descriptor_file, desc_type):
734 self._logger.debug(
735 "descriptor_file: {}, desc_type: {}".format(descriptor_file, desc_type)
736 )
737 charms_set = set()
738 with open("{}".format(descriptor_file)) as yaml_desc:
739 descriptor_dict = yaml.safe_load(yaml_desc)
740 # self._logger.debug("\n"+yaml.safe_dump(descriptor_dict, indent=4, default_flow_style=False))
741
742 if (
743 desc_type == "vnf"
744 and (
745 "vnfd:vnfd-catalog" in descriptor_dict
746 or "vnfd-catalog" in descriptor_dict
747 )
748 ) or (
749 desc_type == "ns"
750 and (
751 "nsd:nsd-catalog" in descriptor_dict
752 or "nsd-catalog" in descriptor_dict
753 )
754 ):
755 charms_set = self._charms_search_on_osm_im_dict(
756 descriptor_dict, desc_type
757 )
758 else:
759 if desc_type == "ns":
760 get_charm_list = self._charms_search_on_nsd_sol006_dict
761 elif desc_type == "vnf":
762 get_charm_list = self._charms_search_on_vnfd_sol006_dict
763 else:
764 raise Exception("Bad descriptor type")
765 charms_set = get_charm_list(descriptor_dict)
766 return charms_set
767
768 def _charms_search_on_osm_im_dict(self, osm_im_dict, desc_type):
769 self._logger.debug("")
770 charms_set = []
771 for k1, v1 in osm_im_dict.items():
772 for k2, v2 in v1.items():
773 for entry in v2:
774 if "{}-configuration".format(desc_type) in entry:
775 vnf_config = entry["{}-configuration".format(desc_type)]
776 for k3, v3 in vnf_config.items():
777 if "charm" in v3:
778 charms_set.add((v3["charm"]))
779 if "vdu" in entry:
780 vdus = entry["vdu"]
781 for vdu in vdus:
782 if "vdu-configuration" in vdu:
783 for k4, v4 in vdu["vdu-configuration"].items():
784 if "charm" in v4:
785 charms_set.add((v4["charm"]))
786 return charms_set
787
788 def _charms_search_on_vnfd_sol006_dict(self, sol006_dict):
789 self._logger.debug("")
790 charms_set = set()
791 dfs = sol006_dict.get("vnfd", {}).get("df", [])
792 for df in dfs:
793 day_1_2s = (
794 df.get("lcm-operations-configuration", {})
795 .get("operate-vnf-op-config", {})
796 .get("day1-2")
797 )
798 if day_1_2s is not None:
799 for day_1_2 in day_1_2s:
800 exec_env_list = day_1_2.get("execution-environment-list", [])
801 for exec_env in exec_env_list:
802 if "juju" in exec_env and "charm" in exec_env["juju"]:
803 charms_set.add(exec_env["juju"]["charm"])
804 return charms_set
805
806 def _charms_search_on_nsd_sol006_dict(self, sol006_dict):
807 self._logger.debug("")
808 charms_set = set()
809 nsd_list = sol006_dict.get("nsd", {}).get("nsd", [])
810 for nsd in nsd_list:
811 charm = nsd.get("ns-configuration", {}).get("juju", {}).get("charm")
812 if charm:
813 charms_set.add(charm)
814 return charms_set