X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_ro%2Fvim_thread.py;fp=osm_ro%2Fvim_thread.py;h=69e7f8c9607c9543c5f662866b3a6af804e01d02;hb=3fcfdb7674436861d6ab0740972573293b9a355f;hp=2c30fb9046c19657da100144445a7c1e7afda7ea;hpb=5fd09ae8b82f73266da2d103ff6f3c88afb1cc9c;p=osm%2FRO.git diff --git a/osm_ro/vim_thread.py b/osm_ro/vim_thread.py index 2c30fb90..69e7f8c9 100644 --- a/osm_ro/vim_thread.py +++ b/osm_ro/vim_thread.py @@ -24,19 +24,19 @@ """" This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. The tasks are stored at database in table vim_actions -The task content are (M: stored at memory, D: stored at database): +The task content is (M: stored at memory, D: stored at database): MD instance_action_id: reference a global action over an instance-scenario: database instance_actions - MD task_index: index number of the task. This with the previous are a key + MD task_index: index number of the task. This together with the previous forms a unique key identifier MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread MD vim_id: id of the vm,net,etc at VIM MD action: CREATE, DELETE, FIND MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images - MD item_id: uuid of the referenced entry in the preious table + MD item_id: uuid of the referenced entry in the previous table MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED MD extra: text with yaml format at database, dict at memory with: params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params - depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm depends on a net + depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation sdn_net_id: used for net. tries: interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database @@ -68,9 +68,6 @@ from lib_osm_openvim.ovim import ovimException __author__ = "Alfonso Tierno, Pablo Montes" __date__ = "$28-Sep-2017 12:07:15$" -# from logging import Logger -# import auxiliary_functions as af - def is_task_id(task_id): return task_id.startswith("TASK-") @@ -141,34 +138,57 @@ class vim_thread(threading.Thread): task_list = [] old_action_key = None - with self.db_lock: - vim_actions = self.db.get_rows(FROM="vim_actions", - WHERE={"datacenter_vim_id": self.datacenter_tenant_id }, - ORDER_BY=("item", "item_id", "created_at",)) - for task in vim_actions: - item = task["item"] - item_id = task["item_id"] - action_key = item + item_id - if old_action_key != action_key: - if not action_completed and task_list: - # This will fill needed task parameters into memory, and insert the task if needed in - # self.pending_tasks or self.refresh_tasks - self._insert_pending_tasks(task_list) - task_list = [] - old_action_key = action_key - action_completed = False - elif action_completed: - continue + old_item_id = "" + old_item = "" + old_created_at = 0.0 + database_limit = 200 + while True: + # get 200 (database_limit) entries each time + with self.db_lock: + vim_actions = self.db.get_rows(FROM="vim_actions", + WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + "item_id>=": old_item_id}, + ORDER_BY=("item_id", "item", "created_at",), + LIMIT=database_limit) + for task in vim_actions: + item = task["item"] + item_id = task["item_id"] + + # skip the first entries that are already processed in the previous pool of 200 + if old_item_id: + if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at: + old_item_id = False # next one will be a new un-processed task + continue - if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND": - task_list.append(task) - elif task["action"] == "DELETE": - # action completed because deleted and status is not SCHEDULED. Not needed anything - action_completed = True + action_key = item + item_id + if old_action_key != action_key: + if not action_completed and task_list: + # This will fill needed task parameters into memory, and insert the task if needed in + # self.pending_tasks or self.refresh_tasks + self._insert_pending_tasks(task_list) + task_list = [] + old_action_key = action_key + action_completed = False + elif action_completed: + continue + if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND": + task_list.append(task) + elif task["action"] == "DELETE": + # action completed because deleted and status is not SCHEDULED. Not needed anything + action_completed = True + if len(vim_actions) == database_limit: + # update variables for get the next database iteration + old_item_id = item_id + old_item = item + old_created_at = task["created_at"] + else: + break # Last actions group need to be inserted too if not action_completed and task_list: self._insert_pending_tasks(task_list) + self.logger.debug("reloaded vim actions pending:{} refresh:{}".format( + len(self.pending_tasks), len(self.refresh_tasks))) def _refres_elements(self): """Call VIM to get VMs and networks status until 10 elements""" @@ -347,13 +367,13 @@ class vim_thread(threading.Thread): if vim_info.get("error_msg"): vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"]) if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \ - task_vim_info != vim_info["vim_info"]: + task_vim_info != vim_info.get("vim_info"): task["extra"]["vim_status"] = vim_info["status"] task["error_msg"] = vim_info.get("error_msg") - task["vim_info"] = vim_info["vim_info"] + task["vim_info"] = vim_info.get("vim_info") temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg"), - "vim_info": vim_info["vim_info"]} + "vim_info": vim_info.get("vim_info")} with self.db_lock: self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) self.db.update_rows( @@ -416,45 +436,79 @@ class vim_thread(threading.Thread): while self.pending_tasks: task = self.pending_tasks.pop(0) nb_processed += 1 - if task["status"] == "SUPERSEDED": - # not needed to do anything but update database with the new status - result = True - database_update = None - elif not self.vim: - task["status"] == "ERROR" - task["error_msg"] = self.error_status - result = False - database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]} - elif task["item"] == 'instance_vms': - if task["action"] == "CREATE": - result, database_update = self.new_vm(task) - nb_created += 1 - elif task["action"] == "DELETE": - result, database_update = self.del_vm(task) - else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) - elif task["item"] == 'instance_nets': - if task["action"] == "CREATE": - result, database_update = self.new_net(task) - nb_created += 1 - elif task["action"] == "DELETE": - result, database_update = self.del_net(task) - elif task["action"] == "FIND": - result, database_update = self.get_net(task) + try: + # check if tasks that this depends on have been completed + dependency_not_completed = False + for task_index in task["extra"].get("depends_on", ()): + task_dependency = task["depends"].get("TASK-" + str(task_index)) + if not task_dependency: + task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index)) + if not task_dependency: + raise VimThreadException( + "Cannot get depending net task trying to get depending task {}.{}".format( + task["instance_action_id"], task_index)) + # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again + if task_dependency["status"] == "SCHEDULED": + dependency_not_completed = True + break + elif task_dependency["status"] == "FAILED": + raise VimThreadException( + "Cannot {} {}, task {}.{} {} because depends on failed {} {}, task{}.{}" + "task {}.{}".format(task["action"], task["item"], + task["instance_action_id"], task["task_index"], + task_dependency["instance_action_id"], task_dependency["task_index"], + task_dependency["action"], task_dependency["item"] )) + if dependency_not_completed: + # Move this task to the end. + task["extra"]["tries"] = task["extra"].get("tries", 0) + 1 + if task["extra"]["tries"] <= 3: + self.pending_tasks.append(task) + continue + else: + raise VimThreadException( + "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, " + "(task {}.{})".format(task["action"], task["item"], + task["instance_action_id"], task["task_index"], + task_dependency["instance_action_id"], task_dependency["task_index"], + task_dependency["action"], task_dependency["item"])) + + if task["status"] == "SUPERSEDED": + # not needed to do anything but update database with the new status + result = True + database_update = None + elif not self.vim: + task["status"] == "ERROR" + task["error_msg"] = self.error_status + result = False + database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]} + elif task["item"] == 'instance_vms': + if task["action"] == "CREATE": + result, database_update = self.new_vm(task) + nb_created += 1 + elif task["action"] == "DELETE": + result, database_update = self.del_vm(task) + else: + raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + elif task["item"] == 'instance_nets': + if task["action"] == "CREATE": + result, database_update = self.new_net(task) + nb_created += 1 + elif task["action"] == "DELETE": + result, database_update = self.del_net(task) + elif task["action"] == "FIND": + result, database_update = self.get_net(task) + else: + raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) - else: - raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"])) - # TODO + raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"])) + # TODO + except VimThreadException as e: + result = False + task["error_msg"] = str(e) + task["status"] = "FAILED" + database_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": task["error_msg"]} - if task["status"] == "SCHEDULED": - # This is because a depend task is not completed. Moved to the end. NOT USED YET - if task["extra"].get("tries", 0) > 3: - task["status"] == "FAILED" - else: - task["extra"]["tries"] = task["extra"].get("tries", 0) + 1 - self.pending_tasks.append(task) - elif task["action"] == "DELETE": + if task["action"] == "DELETE": action_key = task["item"] + task["item_id"] del self.grouped_tasks[action_key] elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"): @@ -469,7 +523,7 @@ class vim_thread(threading.Thread): with self.db_lock: self.db.update_rows( table="vim_actions", - UPDATE={"status": task["status"], "vim_id": task["vim_id"], "modified_at": now, + UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now, "error_msg": task["error_msg"], "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)}, WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]}) @@ -641,18 +695,18 @@ class vim_thread(threading.Thread): error_text = "" for net in net_list: if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id - if net["net_id"] in depends: - task_net = depends[net["net_id"]] - else: - task_net = self._look_for_task(task["instance_action_id"], net["net_id"]) - if not task_net: - raise VimThreadException( - "Error trying to get depending task from task_index={}".format(net["net_id"])) - network_id = task_net.get("vim_id") + task_dependency = task["depends"].get(net["net_id"]) + if not task_dependency: + task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"]) + if not task_dependency: + raise VimThreadException( + "Cannot get depending net task trying to get depending task {}.{}".format( + task["instance_action_id"], net["net_id"])) + network_id = task_dependency.get("vim_id") if not network_id: raise VimThreadException( "Cannot create VM because depends on a network not created or found: " + - str(task_net["error_msg"])) + str(depends[net["net_id"]]["error_msg"])) net["net_id"] = network_id vim_vm_id, created_items = self.vim.new_vminstance(*params)