X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm_utils.py;h=5817b16a701d8d8cedbb4dcce69f65d87701bfdb;hb=4c0e6805c44f9ed1d0bb35161bf69645f5b84151;hp=a6ce5bddde57c219737e079a7ae77b8f637e2765;hpb=9f9c6f2e8b8e978deaa4fb6e1482c0656ce4bd45;p=osm%2FLCM.git diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index a6ce5bd..5817b16 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -17,7 +17,20 @@ ## import asyncio +import checksumdir from collections import OrderedDict +import hashlib +import os +import shutil +import traceback +from time import time + +from osm_common.fsbase import FsException +from osm_lcm.data_utils.database.database import Database +from osm_lcm.data_utils.filesystem.filesystem import Filesystem +import yaml +from zipfile import ZipFile, BadZipfile + # from osm_common.dbbase import DbException __author__ = "Alfonso Tierno" @@ -27,10 +40,6 @@ class LcmException(Exception): pass -class LcmExceptionNoMgmtIP(LcmException): - pass - - class LcmExceptionExit(LcmException): pass @@ -48,34 +57,397 @@ def versiontuple(v): return tuple(filled) -# LcmBase must be listed before TaskRegistry, as it is a dependency. -class LcmBase: +def deep_get(target_dict, key_list, default_value=None): + """ + Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None + Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None + :param target_dict: dictionary to be read + :param key_list: list of keys to read from target_dict + :param default_value: value to return if key is not present in the nested dictionary + :return: The wanted value if exist, None otherwise + """ + for key in key_list: + if not isinstance(target_dict, dict) or key not in target_dict: + return default_value + target_dict = target_dict[key] + return target_dict + + +def get_iterable(in_dict, in_key): + """ + Similar to .get(), but if value is None, False, ..., An empty tuple is returned instead + :param in_dict: a dictionary + :param in_key: the key to look for at in_dict + :return: in_dict[in_var] or () if it is None or not present + """ + if not in_dict.get(in_key): + return () + return in_dict[in_key] + + +def check_juju_bundle_existence(vnfd: dict) -> str: + """Checks the existence of juju-bundle in the descriptor + + Args: + vnfd: Descriptor as a dictionary + + Returns: + Juju bundle if dictionary has juju-bundle else None + + """ + if vnfd.get("vnfd"): + vnfd = vnfd["vnfd"] + + for kdu in vnfd.get("kdu", []): + return kdu.get("juju-bundle", None) + + +def get_charm_artifact_path(base_folder, charm_name, charm_type, revision=str()) -> str: + """Finds the charm artifact paths - def __init__(self, db, msg, fs, logger): + Args: + base_folder: Main folder which will be looked up for charm + charm_name: Charm name + charm_type: Type of charm native_charm, lxc_proxy_charm or k8s_proxy_charm + revision: vnf package revision number if there is + + Returns: + artifact_path: (str) + + """ + extension = "" + if revision: + extension = ":" + str(revision) + + if base_folder.get("pkg-dir"): + artifact_path = "{}/{}/{}/{}".format( + base_folder["folder"].split(":")[0] + extension, + base_folder["pkg-dir"], + "charms" + if charm_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") + else "helm-charts", + charm_name, + ) + + else: + # For SOL004 packages + artifact_path = "{}/Scripts/{}/{}".format( + base_folder["folder"].split(":")[0] + extension, + "charms" + if charm_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") + else "helm-charts", + charm_name, + ) + + return artifact_path + + +def populate_dict(target_dict, key_list, value): + """ + Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value. + Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}} + :param target_dict: dictionary to be changed + :param key_list: list of keys to insert at target_dict + :param value: + :return: None + """ + for key in key_list[0:-1]: + if key not in target_dict: + target_dict[key] = {} + target_dict = target_dict[key] + target_dict[key_list[-1]] = value + + +def get_ee_id_parts(ee_id): + """ + Parses ee_id stored at database that can be either 'version:namespace.helm_id' or only + namespace.helm_id for backward compatibility + If exists helm version can be helm-v3 or helm (helm-v2 old version) + """ + version, _, part_id = ee_id.rpartition(":") + namespace, _, helm_id = part_id.rpartition(".") + return version, namespace, helm_id + + +def vld_to_ro_ip_profile(source_data): + if source_data: + return { + "ip_version": "IPv4" + if "v4" in source_data.get("ip-version", "ipv4") + else "IPv6", + "subnet_address": source_data.get("cidr") + or source_data.get("subnet-address"), + "gateway_address": source_data.get("gateway-ip") + or source_data.get("gateway-address"), + "dns_address": ";".join( + [v["address"] for v in source_data["dns-server"] if v.get("address")] + ) + if source_data.get("dns-server") + else None, + "dhcp_enabled": source_data.get("dhcp-params", {}).get("enabled", False) + or source_data.get("dhcp-enabled", False), + "dhcp_start_address": source_data["dhcp-params"].get("start-address") + if source_data.get("dhcp-params") + else None, + "dhcp_count": source_data["dhcp-params"].get("count") + if source_data.get("dhcp-params") + else None, + "ipv6_address_mode": source_data["ipv6-address-mode"] + if "ipv6-address-mode" in source_data + else None, + } + + +class LcmBase: + def __init__(self, msg, logger): """ :param db: database connection """ - self.db = db + self.db = Database().instance.db self.msg = msg - self.fs = fs + self.fs = Filesystem().instance.fs self.logger = logger def update_db_2(self, item, _id, _desc): """ Updates database with _desc information. If success _desc is cleared - :param item: - :param _id: + :param item: collection + :param _id: the _id to use in the query filter :param _desc: dictionary with the content to update. Keys are dot separated keys for :return: None. Exception is raised on error """ if not _desc: return + now = time() + _desc["_admin.modified"] = now self.db.set_one(item, {"_id": _id}, _desc) _desc.clear() # except DbException as e: # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e)) + @staticmethod + def calculate_charm_hash(zipped_file): + """Calculate the hash of charm files which ends with .charm + + Args: + zipped_file (str): Existing charm package full path + + Returns: + hex digest (str): The hash of the charm file + """ + filehash = hashlib.sha256() + with open(zipped_file, mode="rb") as file: + contents = file.read() + filehash.update(contents) + return filehash.hexdigest() + + @staticmethod + def compare_charm_hash(current_charm, target_charm): + """Compare the existing charm and the target charm if the charms + are given as zip files ends with .charm + + Args: + current_charm (str): Existing charm package full path + target_charm (str): Target charm package full path + + Returns: + True/False (bool): if charm has changed it returns True + """ + return LcmBase.calculate_charm_hash( + current_charm + ) != LcmBase.calculate_charm_hash(target_charm) + + @staticmethod + def compare_charmdir_hash(current_charm_dir, target_charm_dir): + """Compare the existing charm and the target charm if the charms + are given as directories + + Args: + current_charm_dir (str): Existing charm package directory path + target_charm_dir (str): Target charm package directory path + + Returns: + True/False (bool): if charm has changed it returns True + """ + return checksumdir.dirhash(current_charm_dir) != checksumdir.dirhash( + target_charm_dir + ) + + def check_charm_hash_changed( + self, current_charm_path: str, target_charm_path: str + ) -> bool: + """Find the target charm has changed or not by checking the hash of + old and new charm packages + + Args: + current_charm_path (str): Existing charm package artifact path + target_charm_path (str): Target charm package artifact path + + Returns: + True/False (bool): if charm has changed it returns True + + """ + try: + # Check if the charm artifacts are available + current_charm = self.fs.path + current_charm_path + target_charm = self.fs.path + target_charm_path + + if os.path.exists(current_charm) and os.path.exists(target_charm): + # Compare the hash of .charm files + if current_charm.endswith(".charm"): + return LcmBase.compare_charm_hash(current_charm, target_charm) + + # Compare the hash of charm folders + return LcmBase.compare_charmdir_hash(current_charm, target_charm) + + else: + raise LcmException( + "Charm artifact {} does not exist in the VNF Package".format( + self.fs.path + target_charm_path + ) + ) + except (IOError, OSError, TypeError) as error: + self.logger.debug(traceback.format_exc()) + self.logger.error(f"{error} occured while checking the charm hashes") + raise LcmException(error) + + @staticmethod + def get_charm_name(charm_metadata_file: str) -> str: + """Get the charm name from metadata file. + + Args: + charm_metadata_file (str): charm metadata file full path + + Returns: + charm_name (str): charm name + + """ + # Read charm metadata.yaml to get the charm name + with open(charm_metadata_file, "r") as metadata_file: + content = yaml.safe_load(metadata_file) + charm_name = content["name"] + return str(charm_name) + + def _get_charm_path( + self, nsd_package_path: str, nsd_package_name: str, charm_folder_name: str + ) -> str: + """Get the full path of charm folder. + + Args: + nsd_package_path (str): NSD package full path + nsd_package_name (str): NSD package name + charm_folder_name (str): folder name + + Returns: + charm_path (str): charm folder full path + """ + charm_path = ( + self.fs.path + + nsd_package_path + + "/" + + nsd_package_name + + "/charms/" + + charm_folder_name + ) + return charm_path + + def _get_charm_metadata_file( + self, + charm_folder_name: str, + nsd_package_path: str, + nsd_package_name: str, + charm_path: str = None, + ) -> str: + """Get the path of charm metadata file. + + Args: + charm_folder_name (str): folder name + nsd_package_path (str): NSD package full path + nsd_package_name (str): NSD package name + charm_path (str): Charm full path + + Returns: + charm_metadata_file_path (str): charm metadata file full path + + """ + # Locate the charm metadata.yaml + if charm_folder_name.endswith(".charm"): + extract_path = ( + self.fs.path + + nsd_package_path + + "/" + + nsd_package_name + + "/charms/" + + charm_folder_name.replace(".charm", "") + ) + # Extract .charm to extract path + with ZipFile(charm_path, "r") as zipfile: + zipfile.extractall(extract_path) + return extract_path + "/metadata.yaml" + else: + return charm_path + "/metadata.yaml" + + def find_charm_name(self, db_nsr: dict, charm_folder_name: str) -> str: + """Get the charm name from metadata.yaml of charm package. + + Args: + db_nsr (dict): NS record as a dictionary + charm_folder_name (str): charm folder name + + Returns: + charm_name (str): charm name + """ + try: + if not charm_folder_name: + raise LcmException("charm_folder_name should be provided.") + + # Find nsd_package details: path, name + revision = db_nsr.get("revision", "") + + # Get the NSD package path + if revision: + nsd_package_path = db_nsr["nsd-id"] + ":" + str(revision) + db_nsd = self.db.get_one("nsds_revisions", {"_id": nsd_package_path}) + + else: + nsd_package_path = db_nsr["nsd-id"] + + db_nsd = self.db.get_one("nsds", {"_id": nsd_package_path}) + + # Get the NSD package name + nsd_package_name = db_nsd["_admin"]["storage"]["pkg-dir"] + + # Remove the existing nsd package and sync from FsMongo + shutil.rmtree(self.fs.path + nsd_package_path, ignore_errors=True) + self.fs.sync(from_path=nsd_package_path) + + # Get the charm path + charm_path = self._get_charm_path( + nsd_package_path, nsd_package_name, charm_folder_name + ) + + # Find charm metadata file full path + charm_metadata_file = self._get_charm_metadata_file( + charm_folder_name, nsd_package_path, nsd_package_name, charm_path + ) + + # Return charm name + return self.get_charm_name(charm_metadata_file) + + except ( + yaml.YAMLError, + IOError, + FsException, + KeyError, + TypeError, + FileNotFoundError, + BadZipfile, + ) as error: + self.logger.debug(traceback.format_exc()) + self.logger.error(f"{error} occured while getting the charm name") + raise LcmException(error) + class TaskRegistry(LcmBase): """ @@ -96,25 +468,25 @@ class TaskRegistry(LcmBase): """ # NS/NSI: "services" VIM/WIM/SDN: "accounts" - topic_service_list = ['ns', 'nsi'] - topic_account_list = ['vim', 'wim', 'sdn', 'k8scluster', 'k8srepo'] + topic_service_list = ["ns", "nsi"] + topic_account_list = ["vim", "wim", "sdn", "k8scluster", "vca", "k8srepo"] # Map topic to InstanceID - topic2instid_dict = { - 'ns': 'nsInstanceId', - 'nsi': 'netsliceInstanceId'} + topic2instid_dict = {"ns": "nsInstanceId", "nsi": "netsliceInstanceId"} # Map topic to DB table name topic2dbtable_dict = { - 'ns': 'nslcmops', - 'nsi': 'nsilcmops', - 'vim': 'vim_accounts', - 'wim': 'wim_accounts', - 'sdn': 'sdns', - 'k8scluster': 'k8sclusters', - 'k8srepo': 'k8srepos'} - - def __init__(self, worker_id=None, db=None, logger=None): + "ns": "nslcmops", + "nsi": "nsilcmops", + "vim": "vim_accounts", + "wim": "wim_accounts", + "sdn": "sdns", + "k8scluster": "k8sclusters", + "vca": "vca", + "k8srepo": "k8srepos", + } + + def __init__(self, worker_id=None, logger=None): self.task_registry = { "ns": {}, "nsi": {}, @@ -122,10 +494,11 @@ class TaskRegistry(LcmBase): "wim_account": {}, "sdn": {}, "k8scluster": {}, + "vca": {}, "k8srepo": {}, } self.worker_id = worker_id - self.db = db + self.db = Database().instance.db self.logger = logger def register(self, topic, _id, op_id, task_name, task): @@ -219,19 +592,17 @@ class TaskRegistry(LcmBase): # Input: op_id, example: 'abc123def:3' Output: account_id='abc123def', op_index=3 def _get_account_and_op_HA(self, op_id): if not op_id: - return (None, None) - account_id, _, op_index = op_id.rpartition(':') - if not account_id: - return (None, None) - if not op_index.isdigit(): - return (None, None) + return None, None + account_id, _, op_index = op_id.rpartition(":") + if not account_id or not op_index.isdigit(): + return None, None return account_id, op_index # Get '_id' for any topic and operation def _get_instance_id_HA(self, topic, op_type, op_id): _id = None # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id' - if op_type == 'ANY': + if op_type == "ANY": _id = op_id # NS/NSI: Use op_id as '_id' elif self._is_service_type_HA(topic): @@ -246,26 +617,33 @@ class TaskRegistry(LcmBase): _filter = {} # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id' # In this special case, the timestamp is ignored - if op_type == 'ANY': - _filter = {'operationState': 'PROCESSING'} + if op_type == "ANY": + _filter = {"operationState": "PROCESSING"} # Otherwise, get 'startTime' timestamp for this operation else: # NS/NSI if self._is_service_type_HA(topic): + now = time() starttime_this_op = db_lcmop.get("startTime") instance_id_label = self.topic2instid_dict.get(topic) instance_id = db_lcmop.get(instance_id_label) - _filter = {instance_id_label: instance_id, - 'operationState': 'PROCESSING', - 'startTime.lt': starttime_this_op} + _filter = { + instance_id_label: instance_id, + "operationState": "PROCESSING", + "startTime.lt": starttime_this_op, + "_admin.modified.gt": now + - 2 * 3600, # ignore if tow hours of inactivity + } # VIM/WIM/SDN/K8scluster elif self._is_account_type_HA(topic): _, op_index = self._get_account_and_op_HA(op_id) - _ops = db_lcmop['_admin']['operations'] + _ops = db_lcmop["_admin"]["operations"] _this_op = _ops[int(op_index)] - starttime_this_op = _this_op.get('startTime', None) - _filter = {'operationState': 'PROCESSING', - 'startTime.lt': starttime_this_op} + starttime_this_op = _this_op.get("startTime", None) + _filter = { + "operationState": "PROCESSING", + "startTime.lt": starttime_this_op, + } return _filter # Get DB params for any topic and operation @@ -274,19 +652,24 @@ class TaskRegistry(LcmBase): update_dict = {} # NS/NSI if self._is_service_type_HA(topic): - q_filter = {'_id': op_id, '_admin.worker': None} - update_dict = {'_admin.worker': self.worker_id} + q_filter = {"_id": op_id, "_admin.worker": None} + update_dict = {"_admin.worker": self.worker_id} # VIM/WIM/SDN elif self._is_account_type_HA(topic): account_id, op_index = self._get_account_and_op_HA(op_id) if not account_id: return None, None - if op_type == 'create': + if op_type == "create": # Creating a VIM/WIM/SDN account implies setting '_admin.current_operation' = 0 op_index = 0 - q_filter = {'_id': account_id, "_admin.operations.{}.worker".format(op_index): None} - update_dict = {'_admin.operations.{}.worker'.format(op_index): self.worker_id, - '_admin.current_operation': op_index} + q_filter = { + "_id": account_id, + "_admin.operations.{}.worker".format(op_index): None, + } + update_dict = { + "_admin.operations.{}.worker".format(op_index): self.worker_id, + "_admin.current_operation": op_index, + } return q_filter, update_dict def lock_HA(self, topic, op_type, op_id): @@ -312,29 +695,41 @@ class TaskRegistry(LcmBase): return True # Try to lock this task - db_table_name = self.topic2dbtable_dict.get(topic) + db_table_name = self.topic2dbtable_dict[topic] q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id) - db_lock_task = self.db.set_one(db_table_name, - q_filter=q_filter, - update_dict=update_dict, - fail_on_empty=False) + db_lock_task = self.db.set_one( + db_table_name, + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + ) if db_lock_task is None: - self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id)) + self.logger.debug( + "Task {} operation={} already locked by another worker".format( + topic, op_id + ) + ) return False else: # Set 'detailed-status' to 'In progress' for VIM/WIM/SDN operations if self._is_account_type_HA(topic): - detailed_status = 'In progress' + detailed_status = "In progress" account_id, op_index = self._get_account_and_op_HA(op_id) - q_filter = {'_id': account_id} - update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): detailed_status} - self.db.set_one(db_table_name, - q_filter=q_filter, - update_dict=update_dict, - fail_on_empty=False) + q_filter = {"_id": account_id} + update_dict = { + "_admin.operations.{}.detailed-status".format( + op_index + ): detailed_status + } + self.db.set_one( + db_table_name, + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + ) return True - def register_HA(self, topic, op_type, op_id, operationState, detailed_status): + def unlock_HA(self, topic, op_type, op_id, operationState, detailed_status): """ Register a task, done when finished a VIM/WIM/SDN 'create' operation. :param topic: Can be "vim", "wim", or "sdn" @@ -344,23 +739,29 @@ class TaskRegistry(LcmBase): """ # Backward compatibility - if not self._is_account_type_HA(topic) or (self._is_account_type_HA(topic) and op_id is None): + if not self._is_account_type_HA(topic) or not op_id: return # Get Account ID and Operation Index account_id, op_index = self._get_account_and_op_HA(op_id) - db_table_name = self.topic2dbtable_dict.get(topic) + db_table_name = self.topic2dbtable_dict[topic] # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED) # If the account exist, register the HA task. # Update DB for HA tasks - q_filter = {'_id': account_id} - update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState, - '_admin.operations.{}.detailed-status'.format(op_index): detailed_status} - self.db.set_one(db_table_name, - q_filter=q_filter, - update_dict=update_dict, - fail_on_empty=False) + q_filter = {"_id": account_id} + update_dict = { + "_admin.operations.{}.operationState".format(op_index): operationState, + "_admin.operations.{}.detailed-status".format(op_index): detailed_status, + "_admin.operations.{}.worker".format(op_index): None, + "_admin.current_operation": None, + } + self.db.set_one( + db_table_name, + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + ) return async def waitfor_related_HA(self, topic, op_type, op_id=None): @@ -369,7 +770,9 @@ class TaskRegistry(LcmBase): """ # Backward compatibility - if not (self._is_service_type_HA(topic) or self._is_account_type_HA(topic)) and (op_id is None): + if not ( + self._is_service_type_HA(topic) or self._is_account_type_HA(topic) + ) and (op_id is None): return # Get DB table name @@ -378,9 +781,7 @@ class TaskRegistry(LcmBase): # Get instance ID _id = self._get_instance_id_HA(topic, op_type, op_id) _filter = {"_id": _id} - db_lcmop = self.db.get_one(db_table_name, - _filter, - fail_on_empty=False) + db_lcmop = self.db.get_one(db_table_name, _filter, fail_on_empty=False) if not db_lcmop: return @@ -388,17 +789,18 @@ class TaskRegistry(LcmBase): _filter = self._get_waitfor_filter_HA(db_lcmop, topic, op_type, op_id) # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable. - timeout_wait_for_task = 3600 # Max time (seconds) to wait for a related task to finish + timeout_wait_for_task = ( + 3600 # Max time (seconds) to wait for a related task to finish + ) # interval_wait_for_task = 30 # A too long polling interval slows things down considerably - interval_wait_for_task = 10 # Interval in seconds for polling related tasks + interval_wait_for_task = 10 # Interval in seconds for polling related tasks time_left = timeout_wait_for_task old_num_related_tasks = 0 while True: # Get related tasks (operations within the same instance as this) which are # still running (operationState='PROCESSING') and which were started before this task. # In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps. - db_waitfor_related_task = self.db.get_list(db_table_name, - q_filter=_filter) + db_waitfor_related_task = self.db.get_list(db_table_name, q_filter=_filter) new_num_related_tasks = len(db_waitfor_related_task) # If there are no related tasks, there is nothing to wait for, so return. if not new_num_related_tasks: @@ -406,28 +808,39 @@ class TaskRegistry(LcmBase): # If number of pending related tasks have changed, # update the 'detailed-status' field and log the change. # Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY'). - if (op_type != 'ANY') and (new_num_related_tasks != old_num_related_tasks): - step = "Waiting for {} related tasks to be completed.".format(new_num_related_tasks) + if (op_type != "ANY") and (new_num_related_tasks != old_num_related_tasks): + step = "Waiting for {} related tasks to be completed.".format( + new_num_related_tasks + ) update_dict = {} - q_filter = {'_id': _id} + q_filter = {"_id": _id} # NS/NSI if self._is_service_type_HA(topic): - update_dict = {'detailed-status': step} + update_dict = { + "detailed-status": step, + "queuePosition": new_num_related_tasks, + } # VIM/WIM/SDN elif self._is_account_type_HA(topic): _, op_index = self._get_account_and_op_HA(op_id) - update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): step} + update_dict = { + "_admin.operations.{}.detailed-status".format(op_index): step + } self.logger.debug("Task {} operation={} {}".format(topic, _id, step)) - self.db.set_one(db_table_name, - q_filter=q_filter, - update_dict=update_dict, - fail_on_empty=False) + self.db.set_one( + db_table_name, + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + ) old_num_related_tasks = new_num_related_tasks time_left -= interval_wait_for_task if time_left < 0: raise LcmException( "Timeout ({}) when waiting for related tasks to be completed".format( - timeout_wait_for_task)) + timeout_wait_for_task + ) + ) await asyncio.sleep(interval_wait_for_task) return