OSM Repo API handling
"""
import glob
-import hashlib
import logging
from os import listdir, mkdir, getcwd, remove
from os.path import isfile, isdir, join, abspath
from osmclient.common.exceptions import ClientException
from osmclient.common.package_tool import PackageTool
from osmclient.sol005.repo import Repo
+from osmclient.common import utils
from packaging import version as versioning
import requests
import yaml
"Error cannot read from repository {} '{}': {}".format(
repository["name"], repository["url"], e
),
- exc_info=True
+ exc_info=True,
)
continue
return f_name
def pkg_get(self, pkgtype, name, repo, version, filter):
-
pkg_name = self.get_pkg(pkgtype, name, repo, filter, version)
if not pkg_name:
raise ClientException("Package not found")
artifacts = []
directories = []
for f in listdir(origin):
- if isfile(join(origin, f)) and f.endswith('.tar.gz'):
+ self._logger.debug(f"Element: {join(origin,f)}")
+ if isfile(join(origin, f)) and f.endswith(".tar.gz"):
artifacts.append(f)
- elif isdir(join(origin, f)) and f != destination.split('/')[-1] and not f.startswith('.'):
- directories.append(f) # TODO: Document that nested directories are not supported
+ elif (
+ isdir(join(origin, f))
+ and f != destination.split("/")[-1]
+ and not f.startswith(".")
+ ):
+ directories.append(
+ f
+ ) # TODO: Document that nested directories are not supported
else:
self._logger.debug(f"Ignoring {f}")
- for artifact in artifacts:
- self.register_artifact_in_repository(
- join(origin, artifact), destination, source="artifact"
+ self._logger.debug(f"Artifacts: {artifacts}")
+ for package in artifacts:
+ self.register_package_in_repository(
+ join(origin, package), origin, destination, kind="artifact"
)
- for artifact in directories:
- self.register_artifact_in_repository(
- join(origin, artifact), destination, source="directory"
+ self._logger.debug(f"Directories: {directories}")
+ for package in directories:
+ self.register_package_in_repository(
+ join(origin, package), origin, destination, kind="directory"
)
self._logger.info("\nFinal Results: ")
self._logger.info(
+ str(len(glob.glob(destination + "/nst/*/*/metadata.yaml")))
)
- def md5(self, fname):
- """
- Checksum generator
- :param fname: file path
- :return: checksum string
- """
- self._logger.debug("")
- hash_md5 = hashlib.md5()
- with open(fname, "rb") as f:
- for chunk in iter(lambda: f.read(4096), b""):
- hash_md5.update(chunk)
- return hash_md5.hexdigest()
-
def fields_building(self, descriptor_dict, file, package_type):
"""
From an artifact descriptor, obtain the fields required for indexing
if descriptor_dict.get("vnfd-catalog", False):
aux_dict = descriptor_dict.get("vnfd-catalog", {}).get("vnfd", [{}])[0]
elif descriptor_dict.get("vnfd:vnfd-catalog"):
- aux_dict = descriptor_dict.get("vnfd:vnfd-catalog", {}).get("vnfd", [{}])[0]
+ aux_dict = descriptor_dict.get("vnfd:vnfd-catalog", {}).get(
+ "vnfd", [{}]
+ )[0]
elif descriptor_dict.get("vnfd"):
aux_dict = descriptor_dict["vnfd"]
if aux_dict.get("vnfd"):
- aux_dict = aux_dict['vnfd'][0]
+ aux_dict = aux_dict["vnfd"][0]
else:
msg = f"Unexpected descriptor format {descriptor_dict}"
self._logger.error(msg)
raise ValueError(msg)
- self._logger.debug(f"Extracted descriptor info for {package_type}: {aux_dict}")
+ self._logger.debug(
+ f"Extracted descriptor info for {package_type}: {aux_dict}"
+ )
images = []
- for vdu in aux_dict.get("vdu", aux_dict.get('kdu', ())):
- images.append(vdu.get("image", vdu.get('name')))
+ for vdu in aux_dict.get("vdu", aux_dict.get("kdu", ())):
+ images.append(vdu.get("image", vdu.get("name")))
fields["images"] = images
elif package_type == "ns":
if descriptor_dict.get("nsd-catalog", False):
aux_dict = descriptor_dict.get("nsd-catalog", {}).get("nsd", [{}])[0]
elif descriptor_dict.get("nsd:nsd-catalog"):
- aux_dict = descriptor_dict.get("nsd:nsd-catalog", {}).get("nsd", [{}])[0]
+ aux_dict = descriptor_dict.get("nsd:nsd-catalog", {}).get("nsd", [{}])[
+ 0
+ ]
elif descriptor_dict.get("nsd"):
- aux_dict = descriptor_dict['nsd']
+ aux_dict = descriptor_dict["nsd"]
if aux_dict.get("nsd"):
aux_dict = descriptor_dict["nsd"]["nsd"][0]
else:
for vnf in aux_dict.get("constituent-vnfd", ()):
vnfs.append(vnf.get("vnfd-id-ref"))
else:
- vnfs = aux_dict.get('vnfd-id')
+ vnfs = aux_dict.get("vnfd-id")
self._logger.debug("Used VNFS in the NSD: " + str(vnfs))
fields["vnfd-id-ref"] = vnfs
- elif package_type == 'nst':
+ elif package_type == "nst":
if descriptor_dict.get("nst-catalog", False):
aux_dict = descriptor_dict.get("nst-catalog", {}).get("nst", [{}])[0]
elif descriptor_dict.get("nst:nst-catalog"):
- aux_dict = descriptor_dict.get("nst:nst-catalog", {}).get("nst", [{}])[0]
+ aux_dict = descriptor_dict.get("nst:nst-catalog", {}).get("nst", [{}])[
+ 0
+ ]
elif descriptor_dict.get("nst"):
- aux_dict = descriptor_dict['nst']
+ aux_dict = descriptor_dict["nst"]
if aux_dict.get("nst"):
aux_dict = descriptor_dict["nst"]["nst"][0]
nsds = []
msg = f"Unexpected descriptor format {descriptor_dict}"
self._logger.error(msg)
raise ValueError(msg)
-
- fields["name"] = aux_dict.get("name")
+ # Repo search is based on 'name' entry in index.yaml. It is mandatory then
+ fields["name"] = aux_dict.get("name", aux_dict["product-name"])
fields["id"] = aux_dict.get("id")
fields["description"] = aux_dict.get("description")
fields["vendor"] = aux_dict.get("vendor")
descriptor_file = glob.glob("{}/*.y*ml".format(folder))[0]
return folder, descriptor_file
- def validate_artifact(self, path, source):
+ def validate_artifact(self, path, origin, kind):
"""
Validation of artifact.
:param path: file path
- :param source: flag to select the correct file type (directory or artifact)
+ :param origin: folder where the package is located
+ :param kind: flag to select the correct file type (directory or artifact)
:return: status details, status, fields, package_type
"""
- self._logger.debug(f"Validating {path} {source}")
+ self._logger.debug(f"Validating {path} {kind}")
package_type = ""
folder = ""
try:
- if source == "directory":
+ if kind == "directory":
descriptor_file = glob.glob("{}/*.y*ml".format(path))[0]
else:
folder, descriptor_file = self.zip_extraction(path)
+ folder = join(origin, folder)
+ self._logger.debug(
+ f"Kind is an artifact (tar.gz). Folder: {folder}. Descriptor_file: {descriptor_file}"
+ )
self._logger.debug("Opening descriptor file: {}".format(descriptor_file))
if folder:
rmtree(folder, ignore_errors=True)
- def register_artifact_in_repository(self, path, destination, source):
+ def register_package_in_repository(self, path, origin, destination, kind):
"""
Registration of one artifact in a repository
- param path:
- param destination: path for index creation
- param source:
+ :param path: absolute path of the VNF/NS package
+ :param origin: folder where the package is located
+ :param destination: path for index creation
+ :param kind: artifact (tar.gz) or directory
"""
self._logger.debug("")
pt = PackageTool()
compressed = False
try:
fields = {}
- _, valid, fields, package_type = self.validate_artifact(path, source)
+ _, valid, fields, package_type = self.validate_artifact(path, origin, kind)
if not valid:
raise Exception(
"{} {} Not well configured.".format(package_type.upper(), str(path))
)
else:
- if source == "directory":
+ if kind == "directory":
path = pt.build(path)
self._logger.debug(f"Directory path {path}")
compressed = True
- fields["checksum"] = self.md5(path)
+ fields["checksum"] = utils.md5(path)
self.indexation(destination, path, package_type, fields)
except Exception as e:
self._logger.exception(
- "Error registering artifact in Repository: {}".format(e)
+ "Error registering package in Repository: {}".format(e)
)
raise ClientException(e)
finally:
- if source == "directory" and compressed:
+ if kind == "directory" and compressed:
remove(path)
def indexation(self, destination, path, package_type, fields):