"""
OSM Repo API handling
"""
-from osmclient.common.exceptions import ClientException
-from osmclient.sol005.repo import Repo
-import requests
+import glob
+import hashlib
import logging
-import tempfile
+from os import listdir, mkdir, getcwd, remove
+from os.path import isfile, isdir, join, abspath
from shutil import copyfile, rmtree
-import yaml
import tarfile
-import glob
-from packaging import version as versioning
+import tempfile
import time
-from os import listdir, mkdir, getcwd, remove
-from os.path import isfile, isdir, join, abspath
-import hashlib
+
from osm_im.validation import Validation as validation_im
-import ruamel.yaml
+from osmclient.common.exceptions import ClientException
+from osmclient.common.package_tool import PackageTool
+from osmclient.sol005.repo import Repo
+from packaging import version as versioning
+import requests
+import yaml
class OSMRepo(Repo):
else:
raise Exception('repository in url {} unreachable'.format(repository.get('url')))
except Exception as e:
- logging.error("Error cannot read from repository {} '{}': {}".format(repository['name'], repository['url'], e))
+ self._logger.error(
+ "Error cannot read from repository {} '{}': {}".format(
+ repository["name"], repository["url"], e
+ ),
+ exc_info=True
+ )
continue
vnf_repos_filtered = []
f.write(r.raw.read())
f_name = f.name
if not f_name:
- raise ClientException("{} {} not found at repo {}".format(pkgtype,name, repo))
+ raise ClientException("{} {} not found at repo {}".format(pkgtype, name, repo))
return f_name
def pkg_get(self, pkgtype, name, repo, version, filter):
:param origin: origin directory for getting all the artifacts
:param destination: destination folder for create and index the valid artifacts
"""
- if destination == '.':
+ self._logger.debug("Starting index composition")
+ if destination == ".":
if origin == destination:
destination = 'repository'
destination = abspath(destination)
origin = abspath(origin)
-
- if origin[0] != '/':
+ self._logger.debug(f"Paths {destination}, {origin}")
+ if origin[0] != "/":
origin = join(getcwd(), origin)
if destination[0] != '/':
destination = join(getcwd(), destination)
self.init_directory(destination)
- artifacts = [f for f in listdir(origin) if isfile(join(origin, f))]
- directories = [f for f in listdir(origin) if isdir(join(origin, f))]
+ artifacts = []
+ directories = []
+ for f in listdir(origin):
+ if isfile(join(origin, f)) and f.endswith('.tar.gz'):
+ artifacts.append(f)
+ elif isdir(join(origin, f)) and f != destination.split('/')[-1] and not f.startswith('.'):
+ directories.append(f) # TODO: Document that nested directories are not supported
+ else:
+ self._logger.debug(f"Ignoring {f}")
for artifact in artifacts:
- self.register_artifact_in_repository(join(origin, artifact), destination, source='file')
+ self.register_artifact_in_repository(
+ join(origin, artifact), destination, source="artifact"
+ )
for artifact in directories:
- self.register_artifact_in_repository(join(origin, artifact), destination, source='directory')
- print("\nFinal Results: ")
- print("VNF Packages Indexed: " + str(len(glob.glob(destination + "/vnf/*/*/metadata.yaml"))))
- print("NS Packages Indexed: " + str(len(glob.glob(destination + "/ns/*/*/metadata.yaml"))))
+ self.register_artifact_in_repository(
+ join(origin, artifact), destination, source="directory"
+ )
+ self._logger.info("\nFinal Results: ")
+ self._logger.info(
+ "VNF Packages Indexed: "
+ + str(len(glob.glob(destination + "/vnf/*/*/metadata.yaml")))
+ )
+ self._logger.info(
+ "NS Packages Indexed: "
+ + str(len(glob.glob(destination + "/ns/*/*/metadata.yaml")))
+ )
+
+ self._logger.info(
+ "NST Packages Indexed: "
+ + str(len(glob.glob(destination + "/nst/*/*/metadata.yaml")))
+ )
def md5(self, fname):
"""
:param fname: file path
:return: checksum string
"""
+ self._logger.debug("")
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
- def fields_building(self, descriptor_json, file, package_type):
+ def fields_building(self, descriptor_dict, file, package_type):
"""
- From an artifact descriptor, obtain the fields required for indexing
- :param descriptor_json: artifact description
- :param file: artifact package
- :param package_type: type of artifact (vnf or ns)
- :return: fields
+ From an artifact descriptor, obtain the fields required for indexing
+ :param descriptor_dict: artifact description
+ :param file: artifact package
+ :param package_type: type of artifact (vnf, ns, nst)
+ :return: fields
"""
+ self._logger.debug("")
+
fields = {}
- base_path = '/' + package_type + '/'
+ base_path = '/{}/'.format(package_type)
+ aux_dict = {}
if package_type == "vnf":
- if descriptor_json.get('vnfd-catalog', False):
- aux_dict = descriptor_json.get('vnfd-catalog', {}).get('vnfd', [{}])[0]
+ if descriptor_dict.get("vnfd-catalog", False):
+ aux_dict = descriptor_dict.get("vnfd-catalog", {}).get("vnfd", [{}])[0]
+ elif descriptor_dict.get("vnfd:vnfd-catalog"):
+ aux_dict = descriptor_dict.get("vnfd:vnfd-catalog", {}).get("vnfd", [{}])[0]
+ elif descriptor_dict.get("vnfd"):
+ aux_dict = descriptor_dict["vnfd"]
+ if aux_dict.get("vnfd"):
+ aux_dict = aux_dict['vnfd'][0]
else:
- aux_dict = descriptor_json.get('vnfd:vnfd-catalog', {}).get('vnfd', [{}])[0]
-
+ msg = f"Unexpected descriptor format {descriptor_dict}"
+ self._logger.error(msg)
+ raise ValueError(msg)
+ self._logger.debug(f"Extracted descriptor info for {package_type}: {aux_dict}")
images = []
- for vdu in aux_dict.get('vdu', ()):
- images.append(vdu.get('image'))
- fields['images'] = images
- if package_type == "ns":
- if descriptor_json.get('nsd-catalog', False):
- aux_dict = descriptor_json.get('nsd-catalog', {}).get('nsd', [{}])[0]
+ for vdu in aux_dict.get("vdu", aux_dict.get('kdu', ())):
+ images.append(vdu.get("image", vdu.get('name')))
+ fields["images"] = images
+ elif package_type == "ns":
+ if descriptor_dict.get("nsd-catalog", False):
+ aux_dict = descriptor_dict.get("nsd-catalog", {}).get("nsd", [{}])[0]
+ elif descriptor_dict.get("nsd:nsd-catalog"):
+ aux_dict = descriptor_dict.get("nsd:nsd-catalog", {}).get("nsd", [{}])[0]
+ elif descriptor_dict.get("nsd"):
+ aux_dict = descriptor_dict['nsd']
+ if aux_dict.get("nsd"):
+ aux_dict = descriptor_dict["nsd"]["nsd"][0]
else:
- aux_dict = descriptor_json.get('nsd:nsd-catalog', {}).get('nsd', [{}])[0]
-
+ msg = f"Unexpected descriptor format {descriptor_dict}"
+ self._logger.error(msg)
+ raise ValueError(msg)
vnfs = []
-
- for vnf in aux_dict.get('constituent-vnfd', ()):
- vnfs.append(vnf.get('vnfd-id-ref'))
- self._logger.debug('Used VNFS in the NSD: ' + str(vnfs))
- fields['vnfd-id-ref'] = vnfs
-
- fields['name'] = aux_dict.get('name')
- fields['id'] = aux_dict.get('id')
- fields['description'] = aux_dict.get('description')
- fields['vendor'] = aux_dict.get('vendor')
- fields['version'] = aux_dict.get('version', '1.0')
- fields['path'] = base_path + fields['id'] + '/' + fields['version'] + '/' + fields.get('id') + "-" + \
- fields.get('version') + '.tar.gz'
+ if aux_dict.get("constituent-vnfd"):
+ for vnf in aux_dict.get("constituent-vnfd", ()):
+ vnfs.append(vnf.get("vnfd-id-ref"))
+ else:
+ vnfs = aux_dict.get('vnfd-id')
+ self._logger.debug("Used VNFS in the NSD: " + str(vnfs))
+ fields["vnfd-id-ref"] = vnfs
+ elif package_type == 'nst':
+ if descriptor_dict.get("nst-catalog", False):
+ aux_dict = descriptor_dict.get("nst-catalog", {}).get("nst", [{}])[0]
+ elif descriptor_dict.get("nst:nst-catalog"):
+ aux_dict = descriptor_dict.get("nst:nst-catalog", {}).get("nst", [{}])[0]
+ elif descriptor_dict.get("nst"):
+ aux_dict = descriptor_dict['nst']
+ if aux_dict.get("nst"):
+ aux_dict = descriptor_dict["nst"]["nst"][0]
+ nsds = []
+ for nsd in aux_dict.get("netslice-subnet", ()):
+ nsds.append(nsd.get("nsd-ref"))
+ self._logger.debug("Used NSDs in the NST: " + str(nsds))
+ if not nsds:
+ msg = f"Unexpected descriptor format {descriptor_dict}"
+ self._logger.error(msg)
+ raise ValueError(msg)
+ fields["nsd-id-ref"] = nsds
+ else:
+ msg = f"Unexpected descriptor format {descriptor_dict}"
+ self._logger.error(msg)
+ raise ValueError(msg)
+
+ fields["name"] = aux_dict.get("name")
+ fields["id"] = aux_dict.get("id")
+ fields["description"] = aux_dict.get("description")
+ fields["vendor"] = aux_dict.get("vendor")
+ fields["version"] = str(aux_dict.get("version", "1.0"))
+ fields["path"] = "{}{}/{}/{}-{}.tar.gz".format(
+ base_path,
+ fields["id"],
+ fields["version"],
+ fields.get("id"),
+ fields.get("version"),
+ )
return fields
- def zip_extraction(self, file):
+ def zip_extraction(self, file_name):
"""
Validation of artifact.
:param file: file path
:return: status details, status, fields, package_type
"""
self._logger.debug("Decompressing package file")
- temp_file = '/tmp/' + file.split('/')[-1]
- if file != temp_file:
- copyfile(file, temp_file)
+ temp_file = '/tmp/{}'.format(file_name.split('/')[-1])
+ if file_name != temp_file:
+ copyfile(file_name, temp_file)
with tarfile.open(temp_file, "r:gz") as tar:
folder = tar.getnames()[0].split('/')[0]
tar.extractall()
remove(temp_file)
- descriptor_file = glob.glob(folder + "/*.y*ml")[0]
+ descriptor_file = glob.glob('{}/*.y*ml'.format(folder))[0]
return folder, descriptor_file
def validate_artifact(self, path, source):
"""
- Validation of artifact.
- :param path: file path
- :return: status details, status, fields, package_type
+ Validation of artifact.
+ :param path: file path
+ :param source: flag to select the correct file type (directory or artifact)
+ :return: status details, status, fields, package_type
"""
+ self._logger.debug("")
+ package_type = ''
+ folder = ''
try:
- package_type = ''
- folder = ''
if source == 'directory':
- descriptor_file = glob.glob(path + "/*.y*ml")[0]
+ descriptor_file = glob.glob('{}/*.y*ml'.format(path))[0]
else:
folder, descriptor_file = self.zip_extraction(path)
with open(descriptor_file, 'r') as f:
descriptor_data = f.read()
+ self._logger.debug(f"Descriptor data: {descriptor_data}")
validation = validation_im()
- desc_type, descriptor_data = validation.yaml_validation(descriptor_data)
- validation_im.pyangbind_validation(self, desc_type, descriptor_data)
- if 'vnf' in list(descriptor_data.keys())[0]:
- package_type = 'vnf'
+ desc_type, descriptor_dict = validation.yaml_validation(descriptor_data)
+ try:
+ validation_im.pyangbind_validation(self, desc_type, descriptor_dict)
+ except Exception as e:
+ self._logger.error(e, exc_info=True)
+ raise e
+ descriptor_type_ref = list(descriptor_dict.keys())[0].lower()
+ if "vnf" in descriptor_type_ref:
+ package_type = "vnf"
+ elif "nst" in descriptor_type_ref:
+ package_type = "nst"
+ elif "ns" in descriptor_type_ref:
+ package_type = "ns"
else:
- # raise ClientException("Not VNF package")
- package_type = 'ns'
-
- self._logger.debug("Descriptor: {}".format(descriptor_data))
- fields = self.fields_building(descriptor_data, path, package_type)
- self._logger.debug("Descriptor sucessfully validated")
- return {"detail": "{}D successfully validated".format(package_type.upper()),
- "code": "OK"}, True, fields, package_type
+ msg = f"Unknown package type {descriptor_type_ref}"
+ self._logger.error(msg)
+ raise ValueError(msg)
+ self._logger.debug("Descriptor: {}".format(descriptor_dict))
+ fields = self.fields_building(descriptor_dict, path, package_type)
+ self._logger.debug(f"Descriptor successfully validated {fields}")
+ return (
+ {
+ "detail": "{}D successfully validated".format(package_type.upper()),
+ "code": "OK",
+ },
+ True,
+ fields,
+ package_type,
+ )
except Exception as e:
# Delete the folder we just created
return {"detail": str(e)}, False, {}, package_type
if folder:
rmtree(folder, ignore_errors=True)
- def compress_artifact(self, path):
- """
- Compress a directory for building an artifact
- :param path: path of the directory
- :return: file path
- """
- if path[-1] == '/':
- path = path[:-1]
- file = path + '.tar.gz'
- with tarfile.open(file, "w:gz") as tar:
- tar.add(path)
-
- return file
-
def register_artifact_in_repository(self, path, destination, source):
"""
Registration of one artifact in a repository
file: VNF or NS
destination: path for index creation
"""
+ self._logger.debug("")
+ pt = PackageTool()
+ compressed = False
try:
- compresed = False
fields = {}
- res, valid, fields, package_type = self.validate_artifact(path, source)
+ _, valid, fields, package_type = self.validate_artifact(path, source)
if not valid:
- raise Exception('{} {} Not well configured.'.format(package_type.upper(), str(path)))
+ raise Exception(
+ "{} {} Not well configured.".format(package_type.upper(), str(path))
+ )
else:
- if source == 'directory':
- path = self.compress_artifact(path)
- compresed = True
- fields['checksum'] = self.md5(path)
+ if source == "directory":
+ path = pt.build(path)
+ self._logger.debug(f"Directory path {path}")
+ compressed = True
+ fields["checksum"] = self.md5(path)
self.indexation(destination, path, package_type, fields)
except Exception as e:
- self._logger.debug(str(e))
+ self._logger.exception("Error registering artifact in Repository: {}".format(e))
finally:
- if source == 'directory' and compresed:
+ if source == "directory" and compressed:
remove(path)
def indexation(self, destination, path, package_type, fields):
:param package_type: package type (vnf, ns)
:param fields: dict with the required values
"""
+ self._logger.debug("")
data_ind = {'name': fields.get('name'), 'description': fields.get('description'),
'vendor': fields.get('vendor'), 'path': fields.get('path')}
mkdir(final_path)
copyfile(path,
final_path + '/' + fields.get('id') + "-" + fields.get('version') + '.tar.gz')
- yaml.dump(fields, open(final_path + '/' + 'metadata.yaml', 'w'),
- Dumper=ruamel.yaml.RoundTripDumper)
- index = yaml.load(open(destination + '/index.yaml'))
+ yaml.safe_dump(fields, open(final_path + '/' + 'metadata.yaml', 'w'),
+ default_flow_style=False, width=80, indent=4)
+ index = yaml.safe_load(open(destination + '/index.yaml'))
index['{}_packages'.format(package_type)][fields.get('id')][fields.get('version')] = data_ind
if versioning.parse(index['{}_packages'.format(package_type)][fields.get('id')][
'latest']) < versioning.parse(fields.get('version')):
index['{}_packages'.format(package_type)][fields.get('id')]['latest'] = fields.get(
'version')
- yaml.dump(index, open(destination + '/index.yaml', 'w'), Dumper=ruamel.yaml.RoundTripDumper)
+ yaml.safe_dump(index, open(destination + '/index.yaml', 'w'),
+ default_flow_style=False, width=80, indent=4)
self._logger.info('{} {} added in the repository'.format(package_type.upper(), str(path)))
else:
mkdir(destination + '/{}/'.format(package_type) + fields.get('id'))
mkdir(final_path)
copyfile(path,
final_path + '/' + fields.get('id') + "-" + fields.get('version') + '.tar.gz')
- yaml.dump(fields, open(join(final_path, 'metadata.yaml'), 'w'), Dumper=ruamel.yaml.RoundTripDumper)
- index = yaml.load(open(destination + '/index.yaml'))
+ yaml.safe_dump(fields, open(join(final_path, 'metadata.yaml'), 'w'),
+ default_flow_style=False, width=80, indent=4)
+ index = yaml.safe_load(open(destination + '/index.yaml'))
index['{}_packages'.format(package_type)][fields.get('id')] = {fields.get('version'): data_ind}
index['{}_packages'.format(package_type)][fields.get('id')]['latest'] = fields.get('version')
- yaml.dump(index, open(join(destination, 'index.yaml'), 'w'), Dumper=ruamel.yaml.RoundTripDumper)
+ yaml.safe_dump(index, open(join(destination, 'index.yaml'), 'w'),
+ default_flow_style=False, width=80, indent=4)
self._logger.info('{} {} added in the repository'.format(package_type.upper(), str(path)))
- def current_datatime(self):
+ def current_datetime(self):
"""
Datetime Generator
:return: Datetime as string with the following structure "2020-04-29T08:41:07.681653Z"
"""
+ self._logger.debug("")
return time.strftime('%Y-%m-%dT%H:%M:%S.%sZ')
def init_directory(self, destination):
"""
- Initialize the index directory. Creation of index.yaml, and the directories for vnf and ns
- :param destination:
- :return:
+ Initialize the index directory. Creation of index.yaml, and the directories for vnf and ns
+ :param destination:
+ :return:
"""
+ self._logger.debug("")
if not isdir(destination):
mkdir(destination)
- if not isfile(join(destination, 'index.yaml')):
- mkdir(join(destination, 'vnf'))
- mkdir(join(destination, 'ns'))
- index_data = {'apiVersion': 'v1', 'generated': self.current_datatime(), 'vnf_packages': {},
- 'ns_packages': {}}
- with open(join(destination, 'index.yaml'), 'w') as outfile:
- yaml.dump(index_data, outfile, default_flow_style=False)
+ if not isfile(join(destination, "index.yaml")):
+ mkdir(join(destination, "vnf"))
+ mkdir(join(destination, "ns"))
+ mkdir(join(destination, "nst"))
+ index_data = {
+ "apiVersion": "v1",
+ "generated": self.current_datetime(),
+ "vnf_packages": {},
+ "ns_packages": {},
+ "nst_packages": {},
+ }
+ with open(join(destination, "index.yaml"), "w") as outfile:
+ yaml.safe_dump(
+ index_data, outfile, default_flow_style=False, width=80, indent=4
+ )