Avoid database growth by cleaning old vim_actions
[osm/RO.git] / osm_ro / vim_thread.py
index 2c30fb9..69e7f8c 100644 (file)
 """"
 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
 The tasks are stored at database in table vim_actions
-The task content are (M: stored at memory, D: stored at database):
+The task content is (M: stored at memory, D: stored at database):
     MD  instance_action_id:  reference a global action over an instance-scenario: database instance_actions
-    MD  task_index:     index number of the task. This with the previous are a key
+    MD  task_index:     index number of the task. This together with the previous forms a unique key identifier
     MD  datacenter_vim_id:  should contain the uuid of the VIM managed by this thread
     MD  vim_id:     id of the vm,net,etc at VIM
     MD  action:     CREATE, DELETE, FIND
     MD  item:       database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
-    MD  item_id:    uuid of the referenced entry in the preious table
+    MD  item_id:    uuid of the referenced entry in the previous table
     MD  status:     SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
     MD  extra:      text with yaml format at database, dict at memory with:
             params:     list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
             find:       (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
-            depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm depends on a net
+            depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
             sdn_net_id: used for net.
             tries:
             interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
@@ -68,9 +68,6 @@ from lib_osm_openvim.ovim import ovimException
 __author__ = "Alfonso Tierno, Pablo Montes"
 __date__ = "$28-Sep-2017 12:07:15$"
 
-# from logging import Logger
-# import auxiliary_functions as af
-
 
 def is_task_id(task_id):
     return task_id.startswith("TASK-")
@@ -141,34 +138,57 @@ class vim_thread(threading.Thread):
         task_list = []
         old_action_key = None
 
-        with self.db_lock:
-            vim_actions = self.db.get_rows(FROM="vim_actions",
-                                           WHERE={"datacenter_vim_id":  self.datacenter_tenant_id },
-                                           ORDER_BY=("item", "item_id", "created_at",))
-        for task in vim_actions:
-            item = task["item"]
-            item_id = task["item_id"]
-            action_key = item + item_id
-            if old_action_key != action_key:
-                if not action_completed and task_list:
-                    # This will fill needed task parameters into memory, and insert the task if needed in
-                    # self.pending_tasks or self.refresh_tasks
-                    self._insert_pending_tasks(task_list)
-                task_list = []
-                old_action_key = action_key
-                action_completed = False
-            elif action_completed:
-                continue
+        old_item_id = ""
+        old_item = ""
+        old_created_at = 0.0
+        database_limit = 200
+        while True:
+            # get 200 (database_limit) entries each time
+            with self.db_lock:
+                vim_actions = self.db.get_rows(FROM="vim_actions",
+                                               WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+                                                      "item_id>=": old_item_id},
+                                               ORDER_BY=("item_id", "item", "created_at",),
+                                               LIMIT=database_limit)
+            for task in vim_actions:
+                item = task["item"]
+                item_id = task["item_id"]
+
+                # skip the first entries that are already processed in the previous pool of 200
+                if old_item_id:
+                    if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
+                        old_item_id = False  # next one will be a new un-processed task
+                    continue
 
-            if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
-                task_list.append(task)
-            elif task["action"] == "DELETE":
-                # action completed because deleted and status is not SCHEDULED. Not needed anything
-                action_completed = True
+                action_key = item + item_id
+                if old_action_key != action_key:
+                    if not action_completed and task_list:
+                        # This will fill needed task parameters into memory, and insert the task if needed in
+                        # self.pending_tasks or self.refresh_tasks
+                        self._insert_pending_tasks(task_list)
+                    task_list = []
+                    old_action_key = action_key
+                    action_completed = False
+                elif action_completed:
+                    continue
 
+                if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
+                    task_list.append(task)
+                elif task["action"] == "DELETE":
+                    # action completed because deleted and status is not SCHEDULED. Not needed anything
+                    action_completed = True
+            if len(vim_actions) == database_limit:
+                # update variables for get the next database iteration
+                old_item_id = item_id
+                old_item = item
+                old_created_at = task["created_at"]
+            else:
+                break
         # Last actions group need to be inserted too
         if not action_completed and task_list:
             self._insert_pending_tasks(task_list)
+        self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
+            len(self.pending_tasks), len(self.refresh_tasks)))
 
     def _refres_elements(self):
         """Call VIM to get VMs and networks status until 10 elements"""
@@ -347,13 +367,13 @@ class vim_thread(threading.Thread):
                     if vim_info.get("error_msg"):
                         vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
                     if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
-                            task_vim_info != vim_info["vim_info"]:
+                            task_vim_info != vim_info.get("vim_info"):
                         task["extra"]["vim_status"] = vim_info["status"]
                         task["error_msg"] = vim_info.get("error_msg")
-                        task["vim_info"] = vim_info["vim_info"]
+                        task["vim_info"] = vim_info.get("vim_info")
                         temp_dict = {"status": vim_info["status"],
                                          "error_msg": vim_info.get("error_msg"),
-                                         "vim_info": vim_info["vim_info"]}
+                                         "vim_info": vim_info.get("vim_info")}
                         with self.db_lock:
                             self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
                             self.db.update_rows(
@@ -416,45 +436,79 @@ class vim_thread(threading.Thread):
         while self.pending_tasks:
             task = self.pending_tasks.pop(0)
             nb_processed += 1
-            if task["status"] == "SUPERSEDED":
-                # not needed to do anything but update database with the new status
-                result = True
-                database_update = None
-            elif not self.vim:
-                task["status"] == "ERROR"
-                task["error_msg"] = self.error_status
-                result = False
-                database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
-            elif task["item"] == 'instance_vms':
-                if task["action"] == "CREATE":
-                    result, database_update = self.new_vm(task)
-                    nb_created += 1
-                elif task["action"] == "DELETE":
-                    result, database_update = self.del_vm(task)
-                else:
-                    raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
-            elif task["item"] == 'instance_nets':
-                if task["action"] == "CREATE":
-                    result, database_update = self.new_net(task)
-                    nb_created += 1
-                elif task["action"] == "DELETE":
-                    result, database_update = self.del_net(task)
-                elif task["action"] == "FIND":
-                    result, database_update = self.get_net(task)
+            try:
+                # check if tasks that this depends on have been completed
+                dependency_not_completed = False
+                for task_index in task["extra"].get("depends_on", ()):
+                    task_dependency = task["depends"].get("TASK-" + str(task_index))
+                    if not task_dependency:
+                        task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index))
+                        if not task_dependency:
+                            raise VimThreadException(
+                                "Cannot get depending net task trying to get depending task {}.{}".format(
+                                    task["instance_action_id"], task_index))
+                        # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
+                    if task_dependency["status"] == "SCHEDULED":
+                        dependency_not_completed = True
+                        break
+                    elif task_dependency["status"] == "FAILED":
+                        raise VimThreadException(
+                            "Cannot {} {}, task {}.{} {} because depends on failed {} {}, task{}.{}"
+                            "task {}.{}".format(task["action"], task["item"],
+                                                task["instance_action_id"], task["task_index"],
+                                                task_dependency["instance_action_id"], task_dependency["task_index"],
+                                                task_dependency["action"], task_dependency["item"] ))
+                if dependency_not_completed:
+                    # Move this task to the end.
+                    task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
+                    if task["extra"]["tries"] <= 3:
+                        self.pending_tasks.append(task)
+                        continue
+                    else:
+                        raise VimThreadException(
+                            "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
+                            "(task {}.{})".format(task["action"], task["item"],
+                                                  task["instance_action_id"], task["task_index"],
+                                                  task_dependency["instance_action_id"], task_dependency["task_index"],
+                                                  task_dependency["action"], task_dependency["item"]))
+
+                if task["status"] == "SUPERSEDED":
+                    # not needed to do anything but update database with the new status
+                    result = True
+                    database_update = None
+                elif not self.vim:
+                    task["status"] == "ERROR"
+                    task["error_msg"] = self.error_status
+                    result = False
+                    database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
+                elif task["item"] == 'instance_vms':
+                    if task["action"] == "CREATE":
+                        result, database_update = self.new_vm(task)
+                        nb_created += 1
+                    elif task["action"] == "DELETE":
+                        result, database_update = self.del_vm(task)
+                    else:
+                        raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+                elif task["item"] == 'instance_nets':
+                    if task["action"] == "CREATE":
+                        result, database_update = self.new_net(task)
+                        nb_created += 1
+                    elif task["action"] == "DELETE":
+                        result, database_update = self.del_net(task)
+                    elif task["action"] == "FIND":
+                        result, database_update = self.get_net(task)
+                    else:
+                        raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
                 else:
-                    raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
-            else:
-                raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
-                # TODO
+                    raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
+                    # TODO
+            except VimThreadException as e:
+                result = False
+                task["error_msg"] = str(e)
+                task["status"] = "FAILED"
+                database_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": task["error_msg"]}
 
-            if task["status"] == "SCHEDULED":
-                # This is because a depend task is not completed. Moved to the end. NOT USED YET
-                if task["extra"].get("tries", 0) > 3:
-                    task["status"] == "FAILED"
-                else:
-                    task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
-                    self.pending_tasks.append(task)
-            elif task["action"] == "DELETE":
+            if task["action"] == "DELETE":
                 action_key = task["item"] + task["item_id"]
                 del self.grouped_tasks[action_key]
             elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
@@ -469,7 +523,7 @@ class vim_thread(threading.Thread):
                 with self.db_lock:
                     self.db.update_rows(
                         table="vim_actions",
-                        UPDATE={"status": task["status"], "vim_id": task["vim_id"], "modified_at": now,
+                        UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
                                 "error_msg": task["error_msg"],
                                 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
                         WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
@@ -641,18 +695,18 @@ class vim_thread(threading.Thread):
             error_text = ""
             for net in net_list:
                 if "net_id" in net and is_task_id(net["net_id"]):  # change task_id into network_id
-                    if net["net_id"] in depends:
-                        task_net = depends[net["net_id"]]
-                    else:
-                        task_net = self._look_for_task(task["instance_action_id"], net["net_id"])
-                    if not task_net:
-                        raise VimThreadException(
-                            "Error trying to get depending task from task_index={}".format(net["net_id"]))
-                    network_id = task_net.get("vim_id")
+                    task_dependency = task["depends"].get(net["net_id"])
+                    if not task_dependency:
+                        task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
+                        if not task_dependency:
+                            raise VimThreadException(
+                                "Cannot get depending net task trying to get depending task {}.{}".format(
+                                    task["instance_action_id"], net["net_id"]))
+                    network_id = task_dependency.get("vim_id")
                     if not network_id:
                         raise VimThreadException(
                             "Cannot create VM because depends on a network not created or found: " +
-                            str(task_net["error_msg"]))
+                            str(depends[net["net_id"]]["error_msg"]))
                     net["net_id"] = network_id
             vim_vm_id, created_items = self.vim.new_vminstance(*params)