"""
OSM Repo API handling
"""
-from osmclient.common.exceptions import ClientException
-from osmclient.sol005.repo import Repo
-import requests
+import glob
+import hashlib
import logging
-import tempfile
+from os import listdir, mkdir, getcwd, remove
+from os.path import isfile, isdir, join, abspath
from shutil import copyfile, rmtree
-import yaml
import tarfile
-import glob
-from packaging import version as versioning
+import tempfile
import time
-from os import listdir, mkdir, getcwd, remove
-from os.path import isfile, isdir, join, abspath
-import hashlib
+
from osm_im.validation import Validation as validation_im
-import ruamel.yaml
+from osmclient.common.exceptions import ClientException
+from osmclient.common.package_tool import PackageTool
+from osmclient.sol005.repo import Repo
+from packaging import version as versioning
+import requests
+import yaml
class OSMRepo(Repo):
f.write(r.raw.read())
f_name = f.name
if not f_name:
- raise ClientException("{} {} not found at repo {}".format(pkgtype,name, repo))
+ raise ClientException("{} {} not found at repo {}".format(pkgtype, name, repo))
return f_name
def pkg_get(self, pkgtype, name, repo, version, filter):
:param origin: origin directory for getting all the artifacts
:param destination: destination folder for create and index the valid artifacts
"""
+ self._logger.debug("")
if destination == '.':
if origin == destination:
destination = 'repository'
:param fname: file path
:return: checksum string
"""
+ self._logger.debug("")
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
- def fields_building(self, descriptor_json, file, package_type):
+ def fields_building(self, descriptor_dict, file, package_type):
"""
From an artifact descriptor, obtain the fields required for indexing
- :param descriptor_json: artifact description
+ :param descriptor_dict: artifact description
:param file: artifact package
:param package_type: type of artifact (vnf or ns)
:return: fields
"""
+ self._logger.debug("")
fields = {}
- base_path = '/' + package_type + '/'
+ base_path = '/{}/'.format(package_type)
+ aux_dict = {}
if package_type == "vnf":
- if descriptor_json.get('vnfd-catalog', False):
- aux_dict = descriptor_json.get('vnfd-catalog', {}).get('vnfd', [{}])[0]
+ if descriptor_dict.get('vnfd-catalog', False):
+ aux_dict = descriptor_dict.get('vnfd-catalog', {}).get('vnfd', [{}])[0]
else:
- aux_dict = descriptor_json.get('vnfd:vnfd-catalog', {}).get('vnfd', [{}])[0]
+ aux_dict = descriptor_dict.get('vnfd:vnfd-catalog', {}).get('vnfd', [{}])[0]
images = []
for vdu in aux_dict.get('vdu', ()):
images.append(vdu.get('image'))
fields['images'] = images
if package_type == "ns":
- if descriptor_json.get('nsd-catalog', False):
- aux_dict = descriptor_json.get('nsd-catalog', {}).get('nsd', [{}])[0]
+ if descriptor_dict.get('nsd-catalog', False):
+ aux_dict = descriptor_dict.get('nsd-catalog', {}).get('nsd', [{}])[0]
else:
- aux_dict = descriptor_json.get('nsd:nsd-catalog', {}).get('nsd', [{}])[0]
+ aux_dict = descriptor_dict.get('nsd:nsd-catalog', {}).get('nsd', [{}])[0]
vnfs = []
fields['description'] = aux_dict.get('description')
fields['vendor'] = aux_dict.get('vendor')
fields['version'] = aux_dict.get('version', '1.0')
- fields['path'] = base_path + fields['id'] + '/' + fields['version'] + '/' + fields.get('id') + "-" + \
- fields.get('version') + '.tar.gz'
+ fields['path'] = "{}{}/{}/{}-{}.tar.gz".format(base_path, fields['id'], fields['version'], fields.get('id'),
+ fields.get('version'))
return fields
- def zip_extraction(self, file):
+ def zip_extraction(self, file_name):
"""
Validation of artifact.
:param file: file path
:return: status details, status, fields, package_type
"""
self._logger.debug("Decompressing package file")
- temp_file = '/tmp/' + file.split('/')[-1]
- if file != temp_file:
- copyfile(file, temp_file)
+ temp_file = '/tmp/{}'.format(file_name.split('/')[-1])
+ if file_name != temp_file:
+ copyfile(file_name, temp_file)
with tarfile.open(temp_file, "r:gz") as tar:
folder = tar.getnames()[0].split('/')[0]
tar.extractall()
remove(temp_file)
- descriptor_file = glob.glob(folder + "/*.y*ml")[0]
+ descriptor_file = glob.glob('{}/*.y*ml'.format(folder))[0]
return folder, descriptor_file
def validate_artifact(self, path, source):
:param path: file path
:return: status details, status, fields, package_type
"""
+ self._logger.debug("")
+ package_type = ''
+ folder = ''
try:
- package_type = ''
- folder = ''
if source == 'directory':
- descriptor_file = glob.glob(path + "/*.y*ml")[0]
+ descriptor_file = glob.glob('{}/*.y*ml'.format(path))[0]
else:
folder, descriptor_file = self.zip_extraction(path)
with open(descriptor_file, 'r') as f:
descriptor_data = f.read()
validation = validation_im()
- desc_type, descriptor_data = validation.yaml_validation(descriptor_data)
- validation_im.pyangbind_validation(self, desc_type, descriptor_data)
- if 'vnf' in list(descriptor_data.keys())[0]:
+ desc_type, descriptor_dict = validation.yaml_validation(descriptor_data)
+ validation_im.pyangbind_validation(self, desc_type, descriptor_dict)
+ if 'vnf' in list(descriptor_dict.keys())[0]:
package_type = 'vnf'
else:
# raise ClientException("Not VNF package")
package_type = 'ns'
- self._logger.debug("Descriptor: {}".format(descriptor_data))
- fields = self.fields_building(descriptor_data, path, package_type)
+ self._logger.debug("Descriptor: {}".format(descriptor_dict))
+ fields = self.fields_building(descriptor_dict, path, package_type)
self._logger.debug("Descriptor sucessfully validated")
return {"detail": "{}D successfully validated".format(package_type.upper()),
"code": "OK"}, True, fields, package_type
if folder:
rmtree(folder, ignore_errors=True)
- def compress_artifact(self, path):
- """
- Compress a directory for building an artifact
- :param path: path of the directory
- :return: file path
- """
- if path[-1] == '/':
- path = path[:-1]
- file = path + '.tar.gz'
- with tarfile.open(file, "w:gz") as tar:
- tar.add(path)
-
- return file
-
def register_artifact_in_repository(self, path, destination, source):
"""
Registration of one artifact in a repository
file: VNF or NS
destination: path for index creation
"""
+ self._logger.debug("")
+ pt = PackageTool()
+ compresed = False
try:
- compresed = False
fields = {}
- res, valid, fields, package_type = self.validate_artifact(path, source)
+ _, valid, fields, package_type = self.validate_artifact(path, source)
if not valid:
raise Exception('{} {} Not well configured.'.format(package_type.upper(), str(path)))
else:
if source == 'directory':
- path = self.compress_artifact(path)
+ path = pt.build(path)
compresed = True
fields['checksum'] = self.md5(path)
self.indexation(destination, path, package_type, fields)
except Exception as e:
- self._logger.debug(str(e))
+ self._logger.exception("Error registering artifact in Repository: {}".format(e))
finally:
if source == 'directory' and compresed:
:param package_type: package type (vnf, ns)
:param fields: dict with the required values
"""
+ self._logger.debug("")
data_ind = {'name': fields.get('name'), 'description': fields.get('description'),
'vendor': fields.get('vendor'), 'path': fields.get('path')}
mkdir(final_path)
copyfile(path,
final_path + '/' + fields.get('id') + "-" + fields.get('version') + '.tar.gz')
- yaml.dump(fields, open(final_path + '/' + 'metadata.yaml', 'w'),
- Dumper=ruamel.yaml.RoundTripDumper)
- index = yaml.load(open(destination + '/index.yaml'))
+ yaml.safe_dump(fields, open(final_path + '/' + 'metadata.yaml', 'w'),
+ default_flow_style=False, width=80, indent=4)
+ index = yaml.safe_load(open(destination + '/index.yaml'))
index['{}_packages'.format(package_type)][fields.get('id')][fields.get('version')] = data_ind
if versioning.parse(index['{}_packages'.format(package_type)][fields.get('id')][
'latest']) < versioning.parse(fields.get('version')):
index['{}_packages'.format(package_type)][fields.get('id')]['latest'] = fields.get(
'version')
- yaml.dump(index, open(destination + '/index.yaml', 'w'), Dumper=ruamel.yaml.RoundTripDumper)
+ yaml.safe_dump(index, open(destination + '/index.yaml', 'w'),
+ default_flow_style=False, width=80, indent=4)
self._logger.info('{} {} added in the repository'.format(package_type.upper(), str(path)))
else:
mkdir(destination + '/{}/'.format(package_type) + fields.get('id'))
mkdir(final_path)
copyfile(path,
final_path + '/' + fields.get('id') + "-" + fields.get('version') + '.tar.gz')
- yaml.dump(fields, open(join(final_path, 'metadata.yaml'), 'w'), Dumper=ruamel.yaml.RoundTripDumper)
- index = yaml.load(open(destination + '/index.yaml'))
+ yaml.safe_dump(fields, open(join(final_path, 'metadata.yaml'), 'w'),
+ default_flow_style=False, width=80, indent=4)
+ index = yaml.safe_load(open(destination + '/index.yaml'))
index['{}_packages'.format(package_type)][fields.get('id')] = {fields.get('version'): data_ind}
index['{}_packages'.format(package_type)][fields.get('id')]['latest'] = fields.get('version')
- yaml.dump(index, open(join(destination, 'index.yaml'), 'w'), Dumper=ruamel.yaml.RoundTripDumper)
+ yaml.safe_dump(index, open(join(destination, 'index.yaml'), 'w'),
+ default_flow_style=False, width=80, indent=4)
self._logger.info('{} {} added in the repository'.format(package_type.upper(), str(path)))
def current_datatime(self):
Datetime Generator
:return: Datetime as string with the following structure "2020-04-29T08:41:07.681653Z"
"""
+ self._logger.debug("")
return time.strftime('%Y-%m-%dT%H:%M:%S.%sZ')
def init_directory(self, destination):
:param destination:
:return:
"""
+ self._logger.debug("")
if not isdir(destination):
mkdir(destination)
if not isfile(join(destination, 'index.yaml')):
index_data = {'apiVersion': 'v1', 'generated': self.current_datatime(), 'vnf_packages': {},
'ns_packages': {}}
with open(join(destination, 'index.yaml'), 'w') as outfile:
- yaml.dump(index_data, outfile, default_flow_style=False)
+ yaml.safe_dump(index_data, outfile, default_flow_style=False, width=80, indent=4)
+