Ubuntu 22.04 and Python 3.10 preparation
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
index 6e9f104..03e8b30 100644 (file)
@@ -27,9 +27,9 @@ A ro_task can contain several 'tasks', each one with a target, where to store th
 from copy import deepcopy
 from http import HTTPStatus
 import logging
-from os import mkdir
+from os import makedirs
+from os import path
 import queue
-from shutil import rmtree
 import threading
 import time
 import traceback
@@ -39,7 +39,8 @@ from unittest.mock import Mock
 from importlib_metadata import entry_points
 from osm_common.dbbase import DbException
 from osm_ng_ro.vim_admin import LockRenew
-from osm_ro_plugin import sdnconn, vimconn
+from osm_ro_plugin import sdnconn
+from osm_ro_plugin import vimconn
 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
 from osm_ro_plugin.vim_dummy import VimDummyConnector
 import yaml
@@ -747,8 +748,10 @@ class VimInteractionFlavor(VimInteractionBase):
                 try:
                     flavor_data = task["find_params"]["flavor_data"]
                     vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
-                except vimconn.VimConnNotFoundException:
-                    self.logger.warning("VimConnNotFoundException occured.")
+                except vimconn.VimConnNotFoundException as flavor_not_found_msg:
+                    self.logger.warning(
+                        f"VimConnNotFoundException occured: {flavor_not_found_msg}"
+                    )
 
             if not vim_flavor_id and task.get("params"):
                 # CREATE
@@ -1037,7 +1040,6 @@ class VimInteractionSdnNet(VimInteractionBase):
         return self.new(ro_task, task_create_index, None)
 
     def new(self, ro_task, task_index, task_depends):
-
         task = ro_task["tasks"][task_index]
         task_id = task["task_id"]
         target_vim = self.my_vims[ro_task["target_id"]]
@@ -1587,7 +1589,7 @@ class NsWorker(threading.Thread):
                 self.task_lock.release()
                 return False
 
-    def _process_vim_config(self, target_id, db_vim):
+    def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
         """
         Process vim config, creating vim configuration files as ca_cert
         :param target_id: vim/sdn/wim + id
@@ -1598,17 +1600,14 @@ class NsWorker(threading.Thread):
             return
 
         file_name = ""
+        work_dir = "/app/osm_ro/certs"
 
         try:
             if db_vim["config"].get("ca_cert_content"):
-                file_name = "{}:{}".format(target_id, self.worker_index)
+                file_name = f"{work_dir}/{target_id}:{self.worker_index}"
 
-                try:
-                    mkdir(file_name)
-                except FileExistsError:
-                    self.logger.exception(
-                        "FileExistsError occured while processing vim_config."
-                    )
+                if not path.isdir(file_name):
+                    makedirs(file_name)
 
                 file_name = file_name + "/ca_cert"
 
@@ -1659,10 +1658,6 @@ class NsWorker(threading.Thread):
                 self.vim_targets.remove(target_id)
 
             self.logger.info("Unloaded {}".format(target_id))
-            rmtree("{}:{}".format(target_id, self.worker_index))
-        except FileNotFoundError:
-            # This is raised by rmtree if folder does not exist.
-            self.logger.exception("FileNotFoundError occured while unloading VIM.")
         except Exception as e:
             self.logger.error("Cannot unload {}: {}".format(target_id, e))
 
@@ -1795,9 +1790,9 @@ class NsWorker(threading.Thread):
         )
         plugin_name = ""
         vim = None
+        step = "Getting {}={} from db".format(target, _id)
 
         try:
-            step = "Getting {}={} from db".format(target, _id)
             # TODO process for wim, sdnc, ...
             vim = self.db.get_one(target_database, {"_id": _id})
 
@@ -1947,128 +1942,6 @@ class NsWorker(threading.Thread):
 
         return None
 
-    def _get_db_all_tasks(self):
-        """
-        Read all content of table ro_tasks to log it
-        :return: None
-        """
-        try:
-            # Checking the content of the BD:
-
-            # read and return
-            ro_task = self.db.get_list("ro_tasks")
-            for rt in ro_task:
-                self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
-            return ro_task
-
-        except DbException as e:
-            self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
-        except Exception as e:
-            self.logger.critical(
-                "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
-            )
-
-        return None
-
-    def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
-        """
-        Generate a log with the following format:
-
-        Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
-        target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
-        task_array_index;task_id;task_action;task_item;task_args
-
-        Example:
-
-        TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
-        1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
-        ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
-        'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
-        'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
-        888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
-        CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
-        """
-        try:
-            line = []
-            i = 0
-            if ro_task is not None and isinstance(ro_task, dict):
-                for t in ro_task["tasks"]:
-                    line.clear()
-                    line.append(mark)
-                    line.append(event)
-                    line.append(ro_task.get("_id", ""))
-                    line.append(str(ro_task.get("locked_at", "")))
-                    line.append(str(ro_task.get("modified_at", "")))
-                    line.append(str(ro_task.get("created_at", "")))
-                    line.append(str(ro_task.get("to_check_at", "")))
-                    line.append(str(ro_task.get("locked_by", "")))
-                    line.append(str(ro_task.get("target_id", "")))
-                    line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
-                    line.append(str(ro_task.get("vim_info", "")))
-                    line.append(str(ro_task.get("tasks", "")))
-                    if isinstance(t, dict):
-                        line.append(str(t.get("status", "")))
-                        line.append(str(t.get("action_id", "")))
-                        line.append(str(i))
-                        line.append(str(t.get("task_id", "")))
-                        line.append(str(t.get("action", "")))
-                        line.append(str(t.get("item", "")))
-                        line.append(str(t.get("find_params", "")))
-                        line.append(str(t.get("params", "")))
-                    else:
-                        line.extend([""] * 2)
-                        line.append(str(i))
-                        line.extend([""] * 5)
-
-                    i += 1
-                    self.logger.debug(";".join(line))
-            elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
-                i = 0
-                while True:
-                    st = "tasks.{}.status".format(i)
-                    if st not in db_ro_task_update:
-                        break
-                    line.clear()
-                    line.append(mark)
-                    line.append(event)
-                    line.append(db_ro_task_update.get("_id", ""))
-                    line.append(str(db_ro_task_update.get("locked_at", "")))
-                    line.append(str(db_ro_task_update.get("modified_at", "")))
-                    line.append("")
-                    line.append(str(db_ro_task_update.get("to_check_at", "")))
-                    line.append(str(db_ro_task_update.get("locked_by", "")))
-                    line.append("")
-                    line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
-                    line.append("")
-                    line.append(str(db_ro_task_update.get("vim_info", "")))
-                    line.append(str(str(db_ro_task_update).count(".status")))
-                    line.append(db_ro_task_update.get(st, ""))
-                    line.append("")
-                    line.append(str(i))
-                    line.extend([""] * 3)
-                    i += 1
-                    self.logger.debug(";".join(line))
-
-            elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
-                line.clear()
-                line.append(mark)
-                line.append(event)
-                line.append(db_ro_task_delete.get("_id", ""))
-                line.append("")
-                line.append(db_ro_task_delete.get("modified_at", ""))
-                line.extend([""] * 13)
-                self.logger.debug(";".join(line))
-
-            else:
-                line.clear()
-                line.append(mark)
-                line.append(event)
-                line.extend([""] * 16)
-                self.logger.debug(";".join(line))
-
-        except Exception as e:
-            self.logger.error("Error logging ro_task: {}".format(e))
-
     def _delete_task(self, ro_task, task_index, task_depends, db_update):
         """
         Determine if this task need to be done or superseded
@@ -2216,18 +2089,15 @@ class NsWorker(threading.Thread):
                         return ro_task_dependency, task_index
         raise NsWorkerException("Cannot get depending task {}".format(task_id))
 
-    def update_vm_refresh(self):
+    def update_vm_refresh(self, ro_task):
         """Enables the VM status updates if self.refresh_config.active parameter
-        is not -1 and than updates the DB accordingly
+        is not -1 and then updates the DB accordingly
 
         """
         try:
             self.logger.debug("Checking if VM status update config")
             next_refresh = time.time()
-            if self.refresh_config.active == -1:
-                next_refresh = -1
-            else:
-                next_refresh += self.refresh_config.active
+            next_refresh = self._get_next_refresh(ro_task, next_refresh)
 
             if next_refresh != -1:
                 db_ro_task_update = {}
@@ -2262,6 +2132,23 @@ class NsWorker(threading.Thread):
         except Exception as e:
             self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
 
+    def _get_next_refresh(self, ro_task: dict, next_refresh: float):
+        """Decide the next_refresh according to vim type and refresh config period.
+        Args:
+            ro_task (dict):             ro_task details
+            next_refresh    (float):    next refresh time as epoch format
+
+        Returns:
+            next_refresh    (float)     -1 if vm updates are disabled or vim type is openstack.
+        """
+        target_vim = ro_task["target_id"]
+        vim_type = self.db_vims[target_vim]["vim_type"]
+        if self.refresh_config.active == -1 or vim_type == "openstack":
+            next_refresh = -1
+        else:
+            next_refresh += self.refresh_config.active
+        return next_refresh
+
     def _process_pending_tasks(self, ro_task):
         ro_task_id = ro_task["_id"]
         now = time.time()
@@ -2283,10 +2170,7 @@ class NsWorker(threading.Thread):
             elif new_status == "BUILD":
                 next_refresh += self.refresh_config.build
             elif new_status == "DONE":
-                if self.refresh_config.active == -1:
-                    next_refresh = -1
-                else:
-                    next_refresh += self.refresh_config.active
+                next_refresh = self._get_next_refresh(ro_task, next_refresh)
             else:
                 next_refresh += self.refresh_config.error
 
@@ -2301,7 +2185,7 @@ class NsWorker(threading.Thread):
                 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
             """
             # Check if vim status refresh is enabled again
-            self.update_vm_refresh()
+            self.update_vm_refresh(ro_task)
             # 0: get task_status_create
             lock_object = None
             task_status_create = None
@@ -2419,7 +2303,10 @@ class NsWorker(threading.Thread):
                             )
 
                         if task["action"] == "DELETE":
-                            (new_status, db_vim_info_update,) = self._delete_task(
+                            (
+                                new_status,
+                                db_vim_info_update,
+                            ) = self._delete_task(
                                 ro_task, task_index, task_depends, db_ro_task_update
                             )
                             new_status = (
@@ -2462,12 +2349,14 @@ class NsWorker(threading.Thread):
                                     new_status, db_vim_info_update = self.item2class[
                                         task["item"]
                                     ].new(ro_task, task_index, task_depends)
-                                    # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
                                     _update_refresh(new_status)
                             else:
                                 refresh_at = ro_task["vim_info"]["refresh_at"]
                                 if refresh_at and refresh_at != -1 and now > refresh_at:
-                                    (new_status, db_vim_info_update,) = self.item2class[
+                                    (
+                                        new_status,
+                                        db_vim_info_update,
+                                    ) = self.item2class[
                                         task["item"]
                                     ].refresh(ro_task)
                                     _update_refresh(new_status)