X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fmonitor.py;fp=NG-RO%2Fosm_ng_ro%2Fmonitor.py;h=b9a590857b1c4feafe13132a99e5ba760a0d55af;hp=0000000000000000000000000000000000000000;hb=d586d89bde00acaf22debd7f657d605c9d095571;hpb=4c1dd54ae02e82f11a60058a1b7c7b0137ac572e diff --git a/NG-RO/osm_ng_ro/monitor.py b/NG-RO/osm_ng_ro/monitor.py new file mode 100644 index 00000000..b9a59085 --- /dev/null +++ b/NG-RO/osm_ng_ro/monitor.py @@ -0,0 +1,899 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +from copy import deepcopy +from dataclasses import dataclass +import logging +from os import makedirs, path +from pprint import pformat +import random +import threading +from typing import Optional + +from importlib_metadata import entry_points +from osm_common import dbmemory, dbmongo +from osm_common.dbbase import DbException +from osm_ng_ro.ns_thread import ConfigValidate +from osm_ro_plugin import vimconn +import yaml +from yaml.representer import RepresenterError + + +openStackvmStatusOk = [ + "ACTIVE", + "PAUSED", + "SUSPENDED", + "SHUTOFF", + "BUILD", +] + +openStacknetStatusOk = [ + "ACTIVE", + "PAUSED", + "BUILD", +] + +db_vim_collection = "vim_accounts" +vim_type = "openstack" +ro_task_collection = "ro_tasks" +plugin_name = "rovim_openstack" +monitoring_task = None + + +@dataclass +class VmToMonitor: + vm_id: str + target_record: str + + +@dataclass +class VimToMonitor: + vim_id: str + vms: list + + +class MonitorVmsException(Exception): + def __init__(self, message): + super(Exception, self).__init__(message) + + +class MonitorDbException(Exception): + def __init__(self, message): + super(Exception, self).__init__(message) + + +class MonitorVimException(Exception): + def __init__(self, message): + super(Exception, self).__init__(message) + + +class SafeDumper(yaml.SafeDumper): + def represent_data(self, data): + if isinstance(data, dict) and data.__class__ != dict: + # A solution to convert subclasses of dict to dicts which is not handled by pyyaml. + data = dict(data.items()) + return super(SafeDumper, self).represent_data(data) + + +class MonitorVms: + def __init__(self, config: dict): + self.config = config + self.db = None + self.refresh_config = ConfigValidate(config) + self.my_vims = {} + self.plugins = {} + self.logger = logging.getLogger("ro.monitor") + self.connect_db() + self.db_vims = self.get_db_vims() + self.load_vims() + + def load_vims(self) -> None: + for vim in self.db_vims: + if vim["_id"] not in self.my_vims: + self._load_vim(vim["_id"]) + + def connect_db(self) -> None: + """Connect to the Database. + + Raises: + MonitorDbException + """ + try: + if not self.db: + if self.config["database"]["driver"] == "mongo": + self.db = dbmongo.DbMongo() + self.db.db_connect(self.config["database"]) + elif self.config["database"]["driver"] == "memory": + self.db = dbmemory.DbMemory() + self.db.db_connect(self.config["database"]) + else: + raise MonitorDbException( + "Invalid configuration param '{}' at '[database]':'driver'".format( + self.config["database"]["driver"] + ) + ) + except (DbException, MonitorDbException, ValueError) as e: + raise MonitorDbException(str(e)) + + def get_db_vims(self) -> list: + """Get all VIM accounts which types are Openstack.""" + return self.db.get_list(db_vim_collection, {"vim_type": vim_type}) + + def find_ro_tasks_to_monitor(self) -> list: + """Get the ro_tasks which belongs to vdu and status DONE.""" + return self.db.get_list( + ro_task_collection, + q_filter={ + "tasks.status": ["DONE"], + "tasks.item": ["vdu"], + }, + ) + + @staticmethod + def _initialize_target_vim(vim_module_conn, vim: dict) -> object: + """Create the VIM connector object with given vim details. + + Args: + vim_module_conn (class): VIM connector class + vim (dict): VIM details to initialize VIM connecter object + + Returns: + VIM connector (object): VIM connector object + """ + return vim_module_conn( + uuid=vim["_id"], + name=vim["name"], + tenant_id=vim.get("vim_tenant_id"), + tenant_name=vim.get("vim_tenant_name"), + url=vim["vim_url"], + url_admin=None, + user=vim["vim_user"], + passwd=vim["vim_password"], + config=vim.get("config") or {}, + persistent_info={}, + ) + + def _load_vim(self, target_id) -> None: + """Load or reload a vim_account. + Read content from database, load the plugin if not loaded, then it fills my_vims dictionary. + + Args: + target_id (str): ID of vim account + + Raises: + MonitorVimException + """ + try: + vim = self.db.get_one(db_vim_collection, {"_id": target_id}) + schema_version = vim.get("schema_version") + self.db.encrypt_decrypt_fields( + vim, + "decrypt", + fields=("password", "secret"), + schema_version=schema_version, + salt=target_id, + ) + self._process_vim_config(target_id, vim) + vim_module_conn = self._load_plugin(plugin_name) + self.my_vims[target_id] = self._initialize_target_vim(vim_module_conn, vim) + self.logger.debug( + "Connector loaded for {}, plugin={}".format(target_id, plugin_name) + ) + except ( + DbException, + IOError, + AttributeError, + MonitorDbException, + MonitorVimException, + TypeError, + ) as e: + raise MonitorVimException( + "Cannot load {} plugin={}: {}".format(target_id, plugin_name, str(e)) + ) + + @staticmethod + def _process_vim_config(target_id: str, db_vim: dict) -> None: + """ + Process vim config, creating vim configuration files as ca_cert + Args: + target_id (str): vim id + db_vim (dict): Vim dictionary obtained from database + + Raises: + MonitorVimException + """ + if not db_vim.get("config"): + return + file_name = "" + work_dir = "/app/osm_ro/certs" + try: + if db_vim["config"].get("ca_cert_content"): + file_name = f"{work_dir}/{target_id}:{random.randint(0, 99999)}" + + if not path.isdir(file_name): + makedirs(file_name) + + file_name = file_name + "/ca_cert" + + with open(file_name, "w") as f: + f.write(db_vim["config"]["ca_cert_content"]) + del db_vim["config"]["ca_cert_content"] + db_vim["config"]["ca_cert"] = file_name + + except (FileNotFoundError, IOError, OSError) as e: + raise MonitorVimException( + "Error writing to file '{}': {}".format(file_name, e) + ) + + def _load_plugin(self, name: str = "rovim_openstack", type: str = "vim"): + """Finds the proper VIM connector and returns VIM connector class name. + Args: + name (str): rovim_openstack + type (str): vim + + Returns: + VIM connector class name (class) + + Raises: + MonitorVimException + """ + try: + if name in self.plugins: + return self.plugins[name] + + for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name): + self.plugins[name] = ep.load() + return self.plugins[name] + + except Exception as e: + raise MonitorVimException("Cannot load plugin osm_{}: {}".format(name, e)) + + @staticmethod + def create_vm_to_monitor(ro_task: dict) -> Optional[object]: + """Create VM using dataclass with ro task details. + + Args: + ro_task (dict): Details of ro_task + + Returns: + VmToMonitor (object) + """ + if not ro_task: + return + return VmToMonitor( + ro_task["vim_info"]["vim_id"], ro_task["tasks"][0]["target_record"] + ) + + @staticmethod + def add_vm_to_existing_vim( + vims_to_monitor: list, ro_task: dict, target_vim: str + ) -> bool: + """Add VmToMonitor to existing VIM list. + + Args: + vims_to_monitor (list): List of VIMs to monitor + ro_task (dict): ro_task details + target_vim (str): ID of target VIM + + Returns: + Boolean If VM is added to VIM list, it returns True else False. + """ + for vim in vims_to_monitor: + if target_vim == vim.vim_id: + vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task) + vim.vms.append(vm_to_monitor) + return True + return False + + @staticmethod + def add_new_vim_for_monitoring( + vims_to_monitor: list, ro_task: dict, target_vim: str + ) -> None: + """Create a new VIM object and add to vims_to_monitor list. + + Args: + vims_to_monitor (list): List of VIMs to monitor + ro_task (dict): ro_task details + target_vim (str): ID of target VIM + """ + vim_to_monitor = VimToMonitor(target_vim, []) + vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task) + vim_to_monitor.vms.append(vm_to_monitor) + vims_to_monitor.append(vim_to_monitor) + + @staticmethod + def prepare_vims_to_monitor( + vims_to_monitor: list, ro_task: dict, target_vim: str + ) -> None: + """If the required VIM exists in the vims_to_monitor list, add VM under related VIM, + otherwise create a new VIM object and add VM to this new created VIM. + + Args: + vims_to_monitor (list): List of VIMs to monitor + ro_task (dict): ro_task details + target_vim (str): ID of target VIM + """ + if not MonitorVms.add_vm_to_existing_vim(vims_to_monitor, ro_task, target_vim): + MonitorVms.add_new_vim_for_monitoring(vims_to_monitor, ro_task, target_vim) + + def _get_db_paths(self, target_record: str) -> tuple: + """Get the database paths and info of target VDU and VIM. + + Args: + target_record (str): A string which includes vnfr_id, vdur_id, vim_id + + Returns: + (vim_info_path: str, vim_id: str, vnfr_id: str, vdur_path:str, vdur_index: int, db_vnfr: dict) tuple + + Raises: + MonitorVmsException + """ + try: + [_, vnfr_id, vdur_info, vim_id] = target_record.split(":") + vim_info_path = vdur_info + ":" + vim_id + vdur_path = vim_info_path.split(".vim_info.")[0] + vdur_index = int(vdur_path.split(".")[1]) + db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}, fail_on_empty=False) + return vim_info_path, vim_id, vnfr_id, vdur_path, vdur_index, db_vnfr + except (DbException, ValueError) as e: + raise MonitorVmsException(str(e)) + + @staticmethod + def _check_if_vdur_vim_info_exists( + db_vnfr: dict, vdur_index: int + ) -> Optional[bool]: + """Check if VNF record and vdur vim_info record exists. + + Args: + db_vnfr (dict): VNF record + vdur_index (int): index of vdur under db_vnfr["vdur"] + + Returns: + Boolean True if VNF record and vdur vim_info record exists. + """ + try: + if db_vnfr and db_vnfr.get("vdur") and isinstance(vdur_index, int): + if db_vnfr["vdur"][vdur_index] and db_vnfr["vdur"][vdur_index].get( + "vim_info" + ): + return True + except IndexError: + return + + def _get_vm_data_from_db(self, vm_to_monitor: object) -> Optional[tuple]: + """Get the required DB path and VIM info data from database. + + Args: + vm_to_monitor (object): Includes vm_id and target record in DB. + + Returns: + (vdur_path: str, vdur_vim_info_update: dict, db_vnfr: dict, existing_vim_info: dict, vnfr_id,vim_info_path: str) (Tuple): + Required VM info if _check_if_vdur_vim_info_exists else None + """ + ( + vim_info_path, + vim_id, + vnfr_id, + vdur_path, + vdur_index, + db_vnfr, + ) = self._get_db_paths(vm_to_monitor.target_record) + if not self._check_if_vdur_vim_info_exists(db_vnfr, vdur_index): + return + + existing_vim_info = db_vnfr["vdur"][vdur_index]["vim_info"].get("vim:" + vim_id) + if not existing_vim_info: + return + + vdur_vim_info_update = deepcopy(existing_vim_info) + return ( + vdur_path, + vdur_vim_info_update, + db_vnfr, + existing_vim_info, + vnfr_id, + vim_info_path, + ) + + @staticmethod + def update_vim_info_for_deleted_vm(vdur_vim_info_update: dict) -> None: + """Updates the vdur_vim_info_update to report that VM is deleted. + + Args: + vdur_vim_info_update (dict): Dictionary to be updated and used to update VDUR later. + """ + vdur_vim_info_update.update( + { + "vim_status": "DELETED", + "vim_message": "Deleted externally", + "vim_details": None, + "vim_id": None, + "vim_name": None, + "interfaces": None, + } + ) + + def report_deleted_vdur(self, vm_to_monitor: object) -> None: + """VM does not exist in the Openstack Cloud so update the VNFR to report VM deletion. + + Args: + vm_to_monitor (object): VM needs to be reported as deleted. + """ + vm_data = self._get_vm_data_from_db(vm_to_monitor) + if not vm_data: + return + ( + vdur_path, + vdur_vim_info_update, + _, + existing_vim_info, + vnfr_id, + vim_info_path, + ) = vm_data + self.update_vim_info_for_deleted_vm(vdur_vim_info_update) + vdur_update = { + vdur_path + ".status": "DELETED", + } + + if existing_vim_info != vdur_vim_info_update: + # VNFR record is updated one time upon VM deletion. + self.logger.info(f"Reporting deletion of VM: {vm_to_monitor.vm_id}") + self.backup_vdu_interfaces(vdur_vim_info_update) + all_updates = [vdur_update, {vim_info_path: vdur_vim_info_update}] + self.update_in_database(all_updates, vnfr_id) + self.logger.info(f"Updated vnfr for vm_id: {vm_to_monitor.vm_id}.") + + def update_vnfrs(self, servers: list, ports: dict, vms_to_monitor: list) -> None: + """Update the VDURs according to the latest information provided by servers list. + + Args: + servers (list): List of existing VMs comes from single Openstack VIM account + ports (dict): List of all ports comes from single Openstack VIM account + vms_to_monitor (list): List of VMs to be monitored and updated. + """ + for vm_to_monitor in vms_to_monitor: + server = next( + filter(lambda server: server.id == vm_to_monitor.vm_id, servers), None + ) + if server: + self.report_vdur_updates(server, vm_to_monitor, ports) + else: + self.report_deleted_vdur(vm_to_monitor) + + def serialize(self, value: dict) -> Optional[str]: + """Serialization of python basic types. + In the case value is not serializable a message will be logged. + + Args: + value (dict/str): Data to serialize + + Returns: + serialized_value (str, yaml) + """ + if isinstance(value, str): + return value + try: + return yaml.dump( + value, Dumper=SafeDumper, default_flow_style=True, width=256 + ) + except RepresenterError: + self.logger.info( + "The following entity cannot be serialized in YAML:\n\n%s\n\n", + pformat(value), + exc_info=True, + ) + return str(value) + + def _get_server_info(self, server: object) -> str: + """Get the server info, extract some fields and returns info as string. + + Args: + server (object): VM info object + + Returns: + server_info (string) + """ + server_info = server.to_dict() + server_info.pop("OS-EXT-SRV-ATTR:user_data", None) + server_info.pop("user_data", None) + return self.serialize(server_info) + + def check_vm_status_updates( + self, + vdur_vim_info_update: dict, + vdur_update: dict, + server: object, + vdur_path: str, + ) -> None: + """Fills up dictionaries to update VDUR according to server.status. + + Args: + vdur_vim_info_update (dict): Dictionary which keeps the differences of vdur_vim_info + vdur_update (dict): Dictionary which keeps the differences of vdur + server (server): VM info + vdur_path (str): Path of VDUR in DB + """ + if server.status in openStackvmStatusOk: + vdur_vim_info_update["vim_status"] = vdur_update[ + vdur_path + ".status" + ] = server.status + + else: + vdur_vim_info_update["vim_status"] = vdur_update[ + vdur_path + ".status" + ] = server.status + vdur_vim_info_update["vim_message"] = "VIM status reported " + server.status + + vdur_vim_info_update["vim_details"] = self._get_server_info(server) + vdur_vim_info_update["vim_id"] = server.id + vdur_vim_info_update["vim_name"] = vdur_update[ + vdur_path + ".name" + ] = server.name + + @staticmethod + def get_interface_info( + ports: dict, interface: dict, server: object + ) -> Optional[dict]: + """Get the updated port info regarding with existing interface of server. + + Args: + ports (dict): List of all ports belong to single VIM account + interface (dict): Existing interface info which is taken from DB + server (object): Server info + + Returns: + port (dict): The updated port info related to existing interface of server + """ + return next( + filter( + lambda port: port.get("id") == interface.get("vim_interface_id") + and port.get("device_id") == server.id, + ports["ports"], + ), + None, + ) + + @staticmethod + def check_vlan_pci_updates( + interface_info: dict, index: int, vdur_vim_info_update: dict + ) -> None: + """If interface has pci and vlan, update vdur_vim_info dictionary with the refreshed data. + + Args: + interface_info (dict): Refreshed interface info + index (int): Index of interface in VDUR + vdur_vim_info_update (dict): Dictionary to be updated and used to update VDUR later. + """ + if interface_info.get("binding:profile") and interface_info[ + "binding:profile" + ].get("pci_slot"): + pci = interface_info["binding:profile"]["pci_slot"] + vdur_vim_info_update["interfaces"][index]["pci"] = pci + + if interface_info.get("binding:vif_details"): + vdur_vim_info_update["interfaces"][index]["vlan"] = interface_info[ + "binding:vif_details" + ].get("vlan") + + @staticmethod + def check_vdur_interface_updates( + vdur_update: dict, + vdur_path: str, + index: int, + interface_info: dict, + old_interface: dict, + vnfr_update: dict, + vnfr_id: str, + ) -> None: + """Updates the vdur_update dictionary which stores differences between the latest interface data and data in DB. + + Args: + vdur_update (dict): Dictionary used to store vdur updates + vdur_path (str): VDUR record path in DB + index (int): Index of interface in VDUR + interface_info (dict): Refreshed interface info + old_interface (dict): The previous interface info comes from DB + vnfr_update (dict): VDUR record path in DB + vnfr_id (str): VNFR ID + """ + current_ip_address = MonitorVms._get_current_ip_address(interface_info) + if current_ip_address: + vdur_update[ + vdur_path + ".interfaces." + str(index) + ".ip-address" + ] = current_ip_address + + if old_interface.get("mgmt_vdu_interface"): + vdur_update[vdur_path + ".ip-address"] = current_ip_address + + if old_interface.get("mgmt_vnf_interface"): + vnfr_update[vnfr_id + ".ip-address"] = current_ip_address + + vdur_update[ + vdur_path + ".interfaces." + str(index) + ".mac-address" + ] = interface_info.get("mac_address") + + @staticmethod + def _get_current_ip_address(interface_info: dict) -> Optional[str]: + if interface_info.get("fixed_ips") and interface_info["fixed_ips"][0]: + return interface_info["fixed_ips"][0].get("ip_address") + + @staticmethod + def backup_vdu_interfaces(vdur_vim_info_update: dict) -> None: + """Backup VDU interfaces as interfaces_backup. + + Args: + vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates + """ + if vdur_vim_info_update.get("interfaces") and not vdur_vim_info_update.get( + "vim_message" + ): + vdur_vim_info_update["interfaces_backup"] = vdur_vim_info_update[ + "interfaces" + ] + + def update_vdur_vim_info_interfaces( + self, + vdur_vim_info_update: dict, + index: int, + interface_info: dict, + server: object, + ) -> None: + """Update the vdur_vim_info dictionary with the latest interface info. + + Args: + vdur_vim_info_update (dict): The dictionary which is used to store vdur_vim_info updates + index (int): Interface index + interface_info (dict): The latest interface info + server (object): The latest VM info + """ + if not ( + vdur_vim_info_update.get("interfaces") + and vdur_vim_info_update["interfaces"][index] + ): + raise MonitorVmsException("Existing interfaces info could not found.") + + vdur_vim_info_update["interfaces"][index].update( + { + "mac_address": interface_info["mac_address"], + "ip_address": interface_info["fixed_ips"][0].get("ip_address") + if interface_info.get("fixed_ips") + else None, + "vim_net_id": interface_info["network_id"], + "vim_info": self.serialize(interface_info), + "compute_node": server.to_dict()["OS-EXT-SRV-ATTR:host"] + if server.to_dict().get("OS-EXT-SRV-ATTR:host") + else None, + } + ) + + def prepare_interface_updates( + self, + vdur_vim_info_update: dict, + index: int, + interface_info: dict, + server: object, + vdur_path: str, + vnfr_update: dict, + old_interface: dict, + vdur_update: dict, + vnfr_id: str, + ) -> None: + """Updates network related info in vdur_vim_info and vdur by using the latest interface info. + + Args: + vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates + index (int): Interface index + interface_info (dict): The latest interface info + server (object): The latest VM info + vdur_path (str): VDUR record path in DB + vnfr_update (dict): VDUR record path in DB + old_interface (dict): The previous interface info comes from DB + vdur_update (dict): Dictionary used to store vdur updates + vnfr_id (str): VNFR ID + """ + self.update_vdur_vim_info_interfaces( + vdur_vim_info_update, index, interface_info, server + ) + self.check_vlan_pci_updates(interface_info, index, vdur_vim_info_update) + self.check_vdur_interface_updates( + vdur_update, + vdur_path, + index, + interface_info, + old_interface, + vnfr_update, + vnfr_id, + ) + + def check_vm_interface_updates( + self, + server: object, + existing_vim_info: dict, + ports: dict, + vdur_vim_info_update: dict, + vdur_update: dict, + vdur_path: str, + vnfr_update: dict, + vnfr_id: str, + ) -> None: + """Gets the refreshed interfaces info of server and updates the VDUR if interfaces exist, + otherwise reports that interfaces are deleted. + + Args: + server (object): The latest VM info + existing_vim_info (dict): VM info details comes from DB + ports (dict): All ports info belongs to single VIM account + vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates + vdur_update (dict): Dictionary used to store vdur updates + vdur_path (str): VDUR record path in DB + vnfr_update (dict): VDUR record path in DB + vnfr_id (str): VNFR ID + """ + for index, old_interface in enumerate(existing_vim_info["interfaces"]): + interface_info = self.get_interface_info(ports, old_interface, server) + if not interface_info: + vdur_vim_info_update[ + "vim_message" + ] = f"Interface {old_interface['vim_interface_id']} deleted externally." + + else: + if interface_info.get("status") in openStacknetStatusOk: + self.prepare_interface_updates( + vdur_vim_info_update, + index, + interface_info, + server, + vdur_path, + vnfr_update, + old_interface, + vdur_update, + vnfr_id, + ) + + else: + vdur_vim_info_update["vim_message"] = ( + f"Interface {old_interface['vim_interface_id']} status: " + + interface_info.get("status") + ) + + def update_in_database(self, all_updates: list, vnfr_id: str) -> None: + """Update differences in VNFR. + + Args: + all_updates (list): List of dictionaries which includes differences + vnfr_id (str): VNF record ID + + Raises: + MonitorDbException + """ + try: + for updated_dict in all_updates: + if updated_dict: + self.db.set_list( + "vnfrs", + update_dict=updated_dict, + q_filter={"_id": vnfr_id}, + ) + except DbException as e: + raise MonitorDbException( + f"Error while updating differences in VNFR {str(e)}" + ) + + def report_vdur_updates( + self, server: object, vm_to_monitor: object, ports: dict + ) -> None: + """Report VDU updates by changing the VDUR records in DB. + + Args: + server (object): Refreshed VM info + vm_to_monitor (object): VM to be monitored + ports (dict): Ports dict includes all ports details regarding with single VIM account + """ + vm_data = self._get_vm_data_from_db(vm_to_monitor) + if not vm_data: + return + ( + vdur_path, + vdur_vim_info_update, + _, + existing_vim_info, + vnfr_id, + vim_info_path, + ) = vm_data + vdur_update, vnfr_update = {}, {} + + self.check_vm_status_updates( + vdur_vim_info_update, vdur_update, server, vdur_path + ) + + self.check_vm_interface_updates( + server, + existing_vim_info, + ports, + vdur_vim_info_update, + vdur_update, + vdur_path, + vnfr_update, + vnfr_id, + ) + # Update vnfr in MongoDB if there are differences + if existing_vim_info != vdur_vim_info_update: + self.logger.info(f"Reporting status updates of VM: {vm_to_monitor.vm_id}.") + self.backup_vdu_interfaces(vdur_vim_info_update) + all_updates = [ + vdur_update, + {vim_info_path: vdur_vim_info_update}, + vnfr_update, + ] + self.update_in_database(all_updates, vnfr_id) + self.logger.info(f"Updated vnfr for vm_id: {server.id}.") + + def run(self) -> None: + """Perfoms the periodic updates of Openstack VMs by sending only two requests to Openstack APIs + for each VIM account (in order to get details of all servers, all ports). + + Raises: + MonitorVmsException + """ + try: + # If there is not any Openstack type VIM account in DB or VM status updates are disabled by config, + # Openstack VMs will not be monitored. + if not self.db_vims or self.refresh_config.active == -1: + return + + ro_tasks_to_monitor = self.find_ro_tasks_to_monitor() + db_vims = [vim["_id"] for vim in self.db_vims] + vims_to_monitor = [] + + for ro_task in ro_tasks_to_monitor: + _, _, target_vim = ro_task["target_id"].partition(":") + if target_vim in db_vims: + self.prepare_vims_to_monitor(vims_to_monitor, ro_task, target_vim) + + for vim in vims_to_monitor: + all_servers, all_ports = self.my_vims[vim.vim_id].get_monitoring_data() + self.update_vnfrs(all_servers, all_ports, vim.vms) + except ( + DbException, + MonitorDbException, + MonitorVimException, + MonitorVmsException, + ValueError, + KeyError, + TypeError, + AttributeError, + vimconn.VimConnException, + ) as e: + raise MonitorVmsException( + f"Exception while monitoring Openstack VMs: {str(e)}" + ) + + +def start_monitoring(config: dict): + global monitoring_task + if not (config and config.get("period")): + raise MonitorVmsException("Wrong configuration format is provided.") + instance = MonitorVms(config) + period = instance.refresh_config.active + instance.run() + monitoring_task = threading.Timer(period, start_monitoring, args=(config,)) + monitoring_task.start() + + +def stop_monitoring(): + global monitoring_task + if monitoring_task: + monitoring_task.cancel()