+
+ def charm_build(self, charms_folder, build_name, sol004_007=True):
+ """
+ Build the charms inside the package.
+ params: package_folder is the name of the folder where is the charms to compile.
+ build_name is the name of the layer or interface
+ """
+ self._logger.debug("")
+
+ if sol004_007:
+ os.environ["JUJU_REPOSITORY"] = "{}/Scripts/charms".format(charms_folder)
+ else:
+ os.environ["JUJU_REPOSITORY"] = "{}/charms".format(charms_folder)
+
+ os.environ["CHARM_LAYERS_DIR"] = "{}/layers".format(
+ os.environ["JUJU_REPOSITORY"]
+ )
+ os.environ["CHARM_INTERFACES_DIR"] = "{}/interfaces".format(
+ os.environ["JUJU_REPOSITORY"]
+ )
+
+ if sol004_007:
+ os.environ["CHARM_BUILD_DIR"] = "{}/Scripts/charms/builds".format(
+ charms_folder
+ )
+ else:
+ os.environ["CHARM_BUILD_DIR"] = "{}/charms/builds".format(charms_folder)
+
+ if not os.path.exists(os.environ["CHARM_BUILD_DIR"]):
+ os.makedirs(os.environ["CHARM_BUILD_DIR"])
+ src_folder = "{}/{}".format(os.environ["CHARM_LAYERS_DIR"], build_name)
+ result = subprocess.run(["charm", "build", "{}".format(src_folder)])
+ if result.returncode == 1:
+ raise ClientException("failed to build the charm: {}".format(src_folder))
+ self._logger.verbose("charm {} built".format(src_folder))
+
+ def charmcraft_build(self, package_folder, charm_name):
+ """
+ Build the charms inside the package (new operator framework charms)
+ params: package_folder is the name of the folder where is the charms to compile.
+ build_name is the name of the layer or interface
+ """
+ self._logger.debug("Building charm {}".format(charm_name))
+ src_folder = f"{package_folder}/Scripts/charms/ops/{charm_name}"
+ current_directory = os.getcwd()
+ os.chdir(src_folder)
+ try:
+ result = subprocess.run(["charmcraft", "build"])
+ if result.returncode == 1:
+ raise ClientException(
+ "failed to build the charm: {}".format(src_folder)
+ )
+ subprocess.run(["rm", "-rf", f"../../{charm_name}"])
+ subprocess.run(["mv", "build", f"../../{charm_name}"])
+ self._logger.verbose("charm {} built".format(src_folder))
+ finally:
+ os.chdir(current_directory)
+
+ def build_compressed_file(self, package_folder, charm_list=None, sol004_007=True):
+ if sol004_007:
+ return self.build_zipfile(package_folder, charm_list)
+ else:
+ return self.build_tarfile(package_folder, charm_list)
+
+ def build_zipfile(self, package_folder, charm_list=None):
+ """
+ Creates a zip file given a package_folder
+ params: package_folder is the name of the folder to be packaged
+ returns: .zip name
+ """
+ self._logger.debug("")
+ cwd = None
+ try:
+ directory_name, package_name = self.create_temp_dir_sol004_007(
+ package_folder, charm_list
+ )
+ cwd = os.getcwd()
+ os.chdir(directory_name)
+ package_type = package_handling.get_package_type(package_folder)
+ print(package_type)
+
+ if (
+ package_handling.SOL007 == package_type
+ or package_handling.SOL007_TOSCA == package_type
+ ):
+ the_package = SOL007Package(package_folder)
+ elif (
+ package_handling.SOL004 == package_type
+ or package_handling.SOL004_TOSCA == package_type
+ ):
+ the_package = SOL004Package(package_folder)
+
+ the_package.create_or_update_metadata_file()
+
+ the_zip_package = shutil.make_archive(
+ os.path.join(cwd, package_name),
+ "zip",
+ os.path.join(directory_name, package_name),
+ )
+
+ print("Package created: {}".format(the_zip_package))
+
+ return the_zip_package
+
+ except Exception as exc:
+ raise ClientException(
+ "failure during build of zip file (create temp dir, calculate checksum, "
+ "zip file): {}".format(exc)
+ )
+ finally:
+ if cwd:
+ os.chdir(cwd)
+ shutil.rmtree(os.path.join(package_folder, "tmp"))
+
+ def build_tarfile(self, package_folder, charm_list=None):
+ """
+ Creates a .tar.gz file given a package_folder
+ params: package_folder is the name of the folder to be packaged
+ returns: .tar.gz name
+ """
+ self._logger.debug("")
+ cwd = None
+ try:
+ directory_name, package_name = self.create_temp_dir(
+ package_folder, charm_list
+ )
+ cwd = os.getcwd()
+ os.chdir(directory_name)
+ self.calculate_checksum(package_name)
+ with tarfile.open("{}.tar.gz".format(package_name), mode="w:gz") as archive:
+ print("Adding File: {}".format(package_name))
+ archive.add("{}".format(package_name), recursive=True)
+ # return "Created {}.tar.gz".format(package_folder)
+ # self.build("{}".format(os.path.basename(package_folder)))
+ os.chdir(cwd)
+ cwd = None
+ created_package = "{}/{}.tar.gz".format(
+ os.path.dirname(package_folder) or ".", package_name
+ )
+ os.rename(
+ "{}/{}.tar.gz".format(directory_name, package_name), created_package
+ )
+ os.rename(
+ "{}/{}/checksums.txt".format(directory_name, package_name),
+ "{}/checksums.txt".format(package_folder),
+ )
+ print("Package created: {}".format(created_package))
+ return created_package
+ except Exception as exc:
+ raise ClientException(
+ "failure during build of targz file (create temp dir, calculate checksum, "
+ "tar.gz file): {}".format(exc)
+ )
+ finally:
+ if cwd:
+ os.chdir(cwd)
+ shutil.rmtree(os.path.join(package_folder, "tmp"))
+
+ def create_temp_dir(self, package_folder, charm_list=None):
+ """
+ Method to create a temporary folder where we can move the files in package_folder
+ """
+ self._logger.debug("")
+ ignore_patterns = ".gitignore"
+ ignore = shutil.ignore_patterns(ignore_patterns)
+ directory_name = os.path.abspath(package_folder)
+ package_name = os.path.basename(directory_name)
+ directory_name += "/tmp"
+ os.makedirs("{}/{}".format(directory_name, package_name), exist_ok=True)
+ self._logger.debug("Makedirs DONE: {}/{}".format(directory_name, package_name))
+ for item in os.listdir(package_folder):
+ self._logger.debug("Item: {}".format(item))
+ if item != "tmp":
+ s = os.path.join(package_folder, item)
+ d = os.path.join(os.path.join(directory_name, package_name), item)
+ if os.path.isdir(s):
+ if item == "charms":
+ os.makedirs(d, exist_ok=True)
+ s_builds = os.path.join(s, "builds")
+ for charm in charm_list:
+ self._logger.debug("Copying charm {}".format(charm))
+ if charm in os.listdir(s):
+ s_charm = os.path.join(s, charm)
+ elif charm in os.listdir(s_builds):
+ s_charm = os.path.join(s_builds, charm)
+ else:
+ raise ClientException(
+ "The charm {} referenced in the descriptor file "
+ "could not be found in {}/charms or in {}/charms/builds".format(
+ charm, package_folder, package_folder
+ )
+ )
+ d_temp = os.path.join(d, charm)
+ self._logger.debug(
+ "Copying tree: {} -> {}".format(s_charm, d_temp)
+ )
+ if os.path.isdir(s_charm):
+ shutil.copytree(
+ s_charm, d_temp, symlinks=True, ignore=ignore
+ )
+ else:
+ shutil.copy2(s_charm, d_temp)
+ self._logger.debug("DONE")
+ else:
+ self._logger.debug("Copying tree: {} -> {}".format(s, d))
+ shutil.copytree(s, d, symlinks=True, ignore=ignore)
+ self._logger.debug("DONE")
+ else:
+ if item in ignore_patterns:
+ continue
+ self._logger.debug("Copying file: {} -> {}".format(s, d))
+ shutil.copy2(s, d)
+ self._logger.debug("DONE")
+ return directory_name, package_name
+
+ def copy_tree(self, s, d, ignore):
+ self._logger.debug("Copying tree: {} -> {}".format(s, d))
+ shutil.copytree(s, d, symlinks=True, ignore=ignore)
+ self._logger.debug("DONE")
+
+ def create_temp_dir_sol004_007(self, package_folder, charm_list=None):
+ """
+ Method to create a temporary folder where we can move the files in package_folder
+ """
+ self._logger.debug("")
+ ignore_patterns = ".gitignore"
+ ignore = shutil.ignore_patterns(ignore_patterns)
+ directory_name = os.path.abspath(package_folder)
+ package_name = os.path.basename(directory_name)
+ directory_name += "/tmp"
+ os.makedirs("{}/{}".format(directory_name, package_name), exist_ok=True)
+ self._logger.debug("Makedirs DONE: {}/{}".format(directory_name, package_name))
+ for item in os.listdir(package_folder):
+ self._logger.debug("Item: {}".format(item))
+ if item != "tmp":
+ s = os.path.join(package_folder, item)
+ d = os.path.join(os.path.join(directory_name, package_name), item)
+ if os.path.isdir(s):
+ if item == "Scripts":
+ os.makedirs(d, exist_ok=True)
+ scripts_folder = s
+ for script_item in os.listdir(scripts_folder):
+ scripts_destination_folder = os.path.join(d, script_item)
+ if script_item == "charms":
+ s_builds = os.path.join(
+ scripts_folder, script_item, "builds"
+ )
+ for charm in charm_list:
+ self._logger.debug("Copying charm {}".format(charm))
+ if charm in os.listdir(
+ os.path.join(scripts_folder, script_item)
+ ):
+ s_charm = os.path.join(
+ scripts_folder, script_item, charm
+ )
+ elif charm in os.listdir(s_builds):
+ s_charm = os.path.join(s_builds, charm)
+ else:
+ raise ClientException(
+ "The charm {} referenced in the descriptor file "
+ "could not be found in {}/charms or in {}/charms/builds".format(
+ charm, package_folder, package_folder
+ )
+ )
+ d_temp = os.path.join(
+ scripts_destination_folder, charm
+ )
+ self.copy_tree(s_charm, d_temp, ignore)
+ else:
+ self.copy_tree(
+ os.path.join(scripts_folder, script_item),
+ scripts_destination_folder,
+ ignore,
+ )
+ else:
+ self.copy_tree(s, d, ignore)
+ else:
+ if item in ignore_patterns:
+ continue
+ self._logger.debug("Copying file: {} -> {}".format(s, d))
+ shutil.copy2(s, d)
+ self._logger.debug("DONE")
+ return directory_name, package_name
+
+ def charms_search(self, descriptor_file, desc_type):
+ self._logger.debug(
+ "descriptor_file: {}, desc_type: {}".format(descriptor_file, desc_type)
+ )
+ charms_set = set()
+ with open("{}".format(descriptor_file)) as yaml_desc:
+ descriptor_dict = yaml.safe_load(yaml_desc)
+ # self._logger.debug("\n"+yaml.safe_dump(descriptor_dict, indent=4, default_flow_style=False))
+
+ if (
+ desc_type == "vnf"
+ and (
+ "vnfd:vnfd-catalog" in descriptor_dict
+ or "vnfd-catalog" in descriptor_dict
+ )
+ ) or (
+ desc_type == "ns"
+ and (
+ "nsd:nsd-catalog" in descriptor_dict
+ or "nsd-catalog" in descriptor_dict
+ )
+ ):
+ charms_set = self._charms_search_on_osm_im_dict(
+ descriptor_dict, desc_type
+ )
+ else:
+ if desc_type == "ns":
+ get_charm_list = self._charms_search_on_nsd_sol006_dict
+ elif desc_type == "vnf":
+ get_charm_list = self._charms_search_on_vnfd_sol006_dict
+ else:
+ raise Exception("Bad descriptor type")
+ charms_set = get_charm_list(descriptor_dict)
+ return charms_set
+
+ def _charms_search_on_osm_im_dict(self, osm_im_dict, desc_type):
+ self._logger.debug("")
+ charms_set = set()
+ for k1, v1 in osm_im_dict.items():
+ for k2, v2 in v1.items():
+ for entry in v2:
+ if "{}-configuration".format(desc_type) in entry:
+ vnf_config = entry["{}-configuration".format(desc_type)]
+ for k3, v3 in vnf_config.items():
+ if "charm" in v3:
+ charms_set.add((v3["charm"]))
+ if "vdu" in entry:
+ vdus = entry["vdu"]
+ for vdu in vdus:
+ if "vdu-configuration" in vdu:
+ for k4, v4 in vdu["vdu-configuration"].items():
+ if "charm" in v4:
+ charms_set.add((v4["charm"]))
+ return charms_set
+
+ def _charms_search_on_vnfd_sol006_dict(self, sol006_dict):
+ self._logger.debug("")
+ charms_set = set()
+ dfs = sol006_dict.get("vnfd", {}).get("df", [])
+ for df in dfs:
+ day_1_2s = (
+ df.get("lcm-operations-configuration", {})
+ .get("operate-vnf-op-config", {})
+ .get("day1-2")
+ )
+ if day_1_2s is not None:
+ for day_1_2 in day_1_2s:
+ exec_env_list = day_1_2.get("execution-environment-list", [])
+ for exec_env in exec_env_list:
+ if "juju" in exec_env and "charm" in exec_env["juju"]:
+ charms_set.add(exec_env["juju"]["charm"])
+ return charms_set
+
+ def _charms_search_on_nsd_sol006_dict(self, sol006_dict):
+ self._logger.debug("")
+ charms_set = set()
+ nsd_list = sol006_dict.get("nsd", {}).get("nsd", [])
+ for nsd in nsd_list:
+ charm = nsd.get("ns-configuration", {}).get("juju", {}).get("charm")
+ if charm:
+ charms_set.add(charm)
+ return charms_set