Feature 10960 Performance optimizations for the polling of VM status in RO
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
index cb52d77..d278957 100644 (file)
@@ -27,9 +27,9 @@ A ro_task can contain several 'tasks', each one with a target, where to store th
 from copy import deepcopy
 from http import HTTPStatus
 import logging
-from os import mkdir
+from os import makedirs
+from os import path
 import queue
-from shutil import rmtree
 import threading
 import time
 import traceback
@@ -39,7 +39,8 @@ from unittest.mock import Mock
 from importlib_metadata import entry_points
 from osm_common.dbbase import DbException
 from osm_ng_ro.vim_admin import LockRenew
-from osm_ro_plugin import sdnconn, vimconn
+from osm_ro_plugin import sdnconn
+from osm_ro_plugin import vimconn
 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
 from osm_ro_plugin.vim_dummy import VimDummyConnector
 import yaml
@@ -747,8 +748,10 @@ class VimInteractionFlavor(VimInteractionBase):
                 try:
                     flavor_data = task["find_params"]["flavor_data"]
                     vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
-                except vimconn.VimConnNotFoundException:
-                    self.logger.exception("VimConnNotFoundException occured.")
+                except vimconn.VimConnNotFoundException as flavor_not_found_msg:
+                    self.logger.warning(
+                        f"VimConnNotFoundException occured: {flavor_not_found_msg}"
+                    )
 
             if not vim_flavor_id and task.get("params"):
                 # CREATE
@@ -1037,7 +1040,6 @@ class VimInteractionSdnNet(VimInteractionBase):
         return self.new(ro_task, task_create_index, None)
 
     def new(self, ro_task, task_index, task_depends):
-
         task = ro_task["tasks"][task_index]
         task_id = task["task_id"]
         target_vim = self.my_vims[ro_task["target_id"]]
@@ -1055,11 +1057,15 @@ class VimInteractionSdnNet(VimInteractionBase):
         try:
             # CREATE
             params = task["params"]
-            vlds_to_connect = params["vlds"]
-            associated_vim = params["target_vim"]
+            vlds_to_connect = params.get("vlds", [])
+            associated_vim = params.get("target_vim")
             # external additional ports
             additional_ports = params.get("sdn-ports") or ()
-            _, _, vim_account_id = associated_vim.partition(":")
+            _, _, vim_account_id = (
+                (None, None, None)
+                if associated_vim is None
+                else associated_vim.partition(":")
+            )
 
             if associated_vim:
                 # get associated VIM
@@ -1583,7 +1589,7 @@ class NsWorker(threading.Thread):
                 self.task_lock.release()
                 return False
 
-    def _process_vim_config(self, target_id, db_vim):
+    def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
         """
         Process vim config, creating vim configuration files as ca_cert
         :param target_id: vim/sdn/wim + id
@@ -1594,17 +1600,14 @@ class NsWorker(threading.Thread):
             return
 
         file_name = ""
+        work_dir = "/app/osm_ro/certs"
 
         try:
             if db_vim["config"].get("ca_cert_content"):
-                file_name = "{}:{}".format(target_id, self.worker_index)
+                file_name = f"{work_dir}/{target_id}:{self.worker_index}"
 
-                try:
-                    mkdir(file_name)
-                except FileExistsError:
-                    self.logger.exception(
-                        "FileExistsError occured while processing vim_config."
-                    )
+                if not path.isdir(file_name):
+                    makedirs(file_name)
 
                 file_name = file_name + "/ca_cert"
 
@@ -1655,10 +1658,6 @@ class NsWorker(threading.Thread):
                 self.vim_targets.remove(target_id)
 
             self.logger.info("Unloaded {}".format(target_id))
-            rmtree("{}:{}".format(target_id, self.worker_index))
-        except FileNotFoundError:
-            # This is raised by rmtree if folder does not exist.
-            self.logger.exception("FileNotFoundError occured while unloading VIM.")
         except Exception as e:
             self.logger.error("Cannot unload {}: {}".format(target_id, e))
 
@@ -1830,14 +1829,17 @@ class NsWorker(threading.Thread):
                     persistent_info={},
                 )
             else:  # sdn
-                plugin_name = "rosdn_" + vim["type"]
+                plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
                 step = "Loading plugin '{}'".format(plugin_name)
                 vim_module_conn = self._load_plugin(plugin_name, "sdn")
                 step = "Loading {}'".format(target_id)
                 wim = deepcopy(vim)
                 wim_config = wim.pop("config", {}) or {}
                 wim["uuid"] = wim["_id"]
-                wim["wim_url"] = wim["url"]
+                if "url" in wim and "wim_url" not in wim:
+                    wim["wim_url"] = wim["url"]
+                elif "url" not in wim and "wim_url" in wim:
+                    wim["url"] = wim["wim_url"]
 
                 if wim.get("dpid"):
                     wim_config["dpid"] = wim.pop("dpid")
@@ -2073,7 +2075,7 @@ class NsWorker(threading.Thread):
             "created_items", False
         )
 
-        self.logger.warning("Needed delete: {}".format(needed_delete))
+        self.logger.debug("Needed delete: {}".format(needed_delete))
         if my_task["status"] == "FAILED":
             return None, None  # TODO need to be retry??
 
@@ -2097,7 +2099,7 @@ class NsWorker(threading.Thread):
                     needed_delete = False
 
             if needed_delete:
-                self.logger.warning(
+                self.logger.debug(
                     "Deleting ro_task={} task_index={}".format(ro_task, task_index)
                 )
                 return self.item2class[my_task["item"]].delete(ro_task, task_index)
@@ -2202,25 +2204,22 @@ class NsWorker(threading.Thread):
                 fail_on_empty=False,
             )
 
-            self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
+            self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
             if ro_task_dependency:
                 for task_index, task in enumerate(ro_task_dependency["tasks"]):
                     if task["task_id"] == task_id:
                         return ro_task_dependency, task_index
         raise NsWorkerException("Cannot get depending task {}".format(task_id))
 
-    def update_vm_refresh(self):
+    def update_vm_refresh(self, ro_task):
         """Enables the VM status updates if self.refresh_config.active parameter
-        is not -1 and than updates the DB accordingly
+        is not -1 and then updates the DB accordingly
 
         """
         try:
             self.logger.debug("Checking if VM status update config")
             next_refresh = time.time()
-            if self.refresh_config.active == -1:
-                next_refresh = -1
-            else:
-                next_refresh += self.refresh_config.active
+            next_refresh = self._get_next_refresh(ro_task, next_refresh)
 
             if next_refresh != -1:
                 db_ro_task_update = {}
@@ -2255,6 +2254,23 @@ class NsWorker(threading.Thread):
         except Exception as e:
             self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
 
+    def _get_next_refresh(self, ro_task: dict, next_refresh: float):
+        """Decide the next_refresh according to vim type and refresh config period.
+        Args:
+            ro_task (dict):             ro_task details
+            next_refresh    (float):    next refresh time as epoch format
+
+        Returns:
+            next_refresh    (float)     -1 if vm updates are disabled or vim type is openstack.
+        """
+        target_vim = ro_task["target_id"]
+        vim_type = self.db_vims[target_vim]["vim_type"]
+        if self.refresh_config.active == -1 or vim_type == "openstack":
+            next_refresh = -1
+        else:
+            next_refresh += self.refresh_config.active
+        return next_refresh
+
     def _process_pending_tasks(self, ro_task):
         ro_task_id = ro_task["_id"]
         now = time.time()
@@ -2276,10 +2292,7 @@ class NsWorker(threading.Thread):
             elif new_status == "BUILD":
                 next_refresh += self.refresh_config.build
             elif new_status == "DONE":
-                if self.refresh_config.active == -1:
-                    next_refresh = -1
-                else:
-                    next_refresh += self.refresh_config.active
+                next_refresh = self._get_next_refresh(ro_task, next_refresh)
             else:
                 next_refresh += self.refresh_config.error
 
@@ -2294,7 +2307,7 @@ class NsWorker(threading.Thread):
                 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
             """
             # Check if vim status refresh is enabled again
-            self.update_vm_refresh()
+            self.update_vm_refresh(ro_task)
             # 0: get task_status_create
             lock_object = None
             task_status_create = None
@@ -2355,7 +2368,7 @@ class NsWorker(threading.Thread):
                                 dependency_task = dependency_ro_task["tasks"][
                                     dependency_task_index
                                 ]
-                                self.logger.warning(
+                                self.logger.debug(
                                     "dependency_ro_task={} dependency_task_index={}".format(
                                         dependency_ro_task, dependency_task_index
                                     )
@@ -2412,7 +2425,10 @@ class NsWorker(threading.Thread):
                             )
 
                         if task["action"] == "DELETE":
-                            (new_status, db_vim_info_update,) = self._delete_task(
+                            (
+                                new_status,
+                                db_vim_info_update,
+                            ) = self._delete_task(
                                 ro_task, task_index, task_depends, db_ro_task_update
                             )
                             new_status = (
@@ -2460,7 +2476,10 @@ class NsWorker(threading.Thread):
                             else:
                                 refresh_at = ro_task["vim_info"]["refresh_at"]
                                 if refresh_at and refresh_at != -1 and now > refresh_at:
-                                    (new_status, db_vim_info_update,) = self.item2class[
+                                    (
+                                        new_status,
+                                        db_vim_info_update,
+                                    ) = self.item2class[
                                         task["item"]
                                     ].refresh(ro_task)
                                     _update_refresh(new_status)
@@ -2824,7 +2843,7 @@ class NsWorker(threading.Thread):
                 """
                 ro_task = self._get_db_task()
                 if ro_task:
-                    self.logger.warning("Task to process: {}".format(ro_task))
+                    self.logger.debug("Task to process: {}".format(ro_task))
                     time.sleep(1)
                     self._process_pending_tasks(ro_task)
                     busy = True