+class NsWorker(threading.Thread):
+ REFRESH_BUILD = 5 # 5 seconds
+ REFRESH_ACTIVE = 60 # 1 minute
+ REFRESH_ERROR = 600
+ REFRESH_IMAGE = 3600 * 10
+ REFRESH_DELETE = 3600 * 10
+ QUEUE_SIZE = 100
+ terminate = False
+
+ def __init__(self, worker_index, config, plugins, db):
+ """
+
+ :param worker_index: thread index
+ :param config: general configuration of RO, among others the process_id with the docker id where it runs
+ :param plugins: global shared dict with the loaded plugins
+ :param db: database class instance to use
+ """
+ threading.Thread.__init__(self)
+ self.config = config
+ self.plugins = plugins
+ self.plugin_name = "unknown"
+ self.logger = logging.getLogger('ro.worker{}'.format(worker_index))
+ self.worker_index = worker_index
+ self.task_queue = queue.Queue(self.QUEUE_SIZE)
+ self.my_vims = {} # targetvim: vimplugin class
+ self.db_vims = {} # targetvim: vim information from database
+ self.vim_targets = [] # targetvim list
+ self.my_id = config["process_id"] + ":" + str(worker_index)
+ self.db = db
+ self.item2class = {
+ "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
+ "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
+ "image": VimInteractionImage(self.db, self.my_vims, self.db_vims, self.logger),
+ "flavor": VimInteractionFlavor(self.db, self.my_vims, self.db_vims, self.logger),
+ "sdn_net": VimInteractionSdnNet(self.db, self.my_vims, self.db_vims, self.logger),
+ }
+ self.time_last_task_processed = None
+ self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db
+ self.idle = True # it is idle when there are not vim_targets associated
+ self.task_locked_time = config["global"]["task_locked_time"]
+
+ def insert_task(self, task):
+ try:
+ self.task_queue.put(task, False)
+ return None
+ except queue.Full:
+ raise NsWorkerException("timeout inserting a task")
+
+ def terminate(self):
+ self.insert_task("exit")
+
+ def del_task(self, task):
+ with self.task_lock:
+ if task["status"] == "SCHEDULED":
+ task["status"] = "SUPERSEDED"
+ return True
+ else: # task["status"] == "processing"
+ self.task_lock.release()
+ return False
+
+ def _process_vim_config(self, target_id, db_vim):
+ """
+ Process vim config, creating vim configuration files as ca_cert
+ :param target_id: vim/sdn/wim + id
+ :param db_vim: Vim dictionary obtained from database
+ :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
+ """
+ if not db_vim.get("config"):
+ return
+ file_name = ""
+ try:
+ if db_vim["config"].get("ca_cert_content"):
+ file_name = "{}:{}".format(target_id, self.worker_index)
+ try:
+ mkdir(file_name)
+ except FileExistsError:
+ pass
+ file_name = file_name + "/ca_cert"
+ with open(file_name, "w") as f:
+ f.write(db_vim["config"]["ca_cert_content"])
+ del db_vim["config"]["ca_cert_content"]
+ db_vim["config"]["ca_cert"] = file_name
+ except Exception as e:
+ raise NsWorkerException("Error writing to file '{}': {}".format(file_name, e))
+
+ def _load_plugin(self, name, type="vim"):
+ # type can be vim or sdn
+ if "rovim_dummy" not in self.plugins:
+ self.plugins["rovim_dummy"] = VimDummyConnector
+ if "rosdn_dummy" not in self.plugins:
+ self.plugins["rosdn_dummy"] = SdnDummyConnector
+ if name in self.plugins:
+ return self.plugins[name]
+ try:
+ for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
+ self.plugins[name] = v.load()
+ except Exception as e:
+ raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
+ if name and name not in self.plugins:
+ raise NsWorkerException("Plugin 'osm_{n}' has not been installed".format(n=name))
+ return self.plugins[name]
+
+ def _unload_vim(self, target_id):
+ """
+ Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
+ :param target_id: Contains type:_id; where type can be 'vim', ...
+ :return: None.
+ """
+ try:
+ self.db_vims.pop(target_id, None)
+ self.my_vims.pop(target_id, None)
+ if target_id in self.vim_targets:
+ self.vim_targets.remove(target_id)
+ self.logger.info("Unloaded {}".format(target_id))
+ rmtree("{}:{}".format(target_id, self.worker_index))
+ except FileNotFoundError:
+ pass # this is raised by rmtree if folder does not exist
+ except Exception as e:
+ self.logger.error("Cannot unload {}: {}".format(target_id, e))
+
+ def _check_vim(self, target_id):
+ """
+ Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
+ :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
+ :return: None.
+ """
+ target, _, _id = target_id.partition(":")
+ now = time.time()
+ update_dict = {}
+ unset_dict = {}
+ op_text = ""
+ step = ""
+ loaded = target_id in self.vim_targets
+ target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
+ try:
+ step = "Getting {} from db".format(target_id)
+ db_vim = self.db.get_one(target_database, {"_id": _id})
+ for op_index, operation in enumerate(db_vim["_admin"].get("operations", ())):
+ if operation["operationState"] != "PROCESSING":
+ continue
+ locked_at = operation.get("locked_at")
+ if locked_at is not None and locked_at >= now - self.task_locked_time:
+ # some other thread is doing this operation
+ return
+ # lock
+ op_text = "_admin.operations.{}.".format(op_index)
+ if not self.db.set_one(target_database,
+ q_filter={"_id": _id,
+ op_text + "operationState": "PROCESSING",
+ op_text + "locked_at": locked_at
+ },
+ update_dict={op_text + "locked_at": now,
+ "admin.current_operation": op_index},
+ fail_on_empty=False):
+ return
+ unset_dict[op_text + "locked_at"] = None
+ unset_dict["current_operation"] = None
+ step = "Loading " + target_id
+ error_text = self._load_vim(target_id)
+ if not error_text:
+ step = "Checking connectivity"
+ if target == 'vim':
+ self.my_vims[target_id].check_vim_connectivity()
+ else:
+ self.my_vims[target_id].check_credentials()
+ update_dict["_admin.operationalState"] = "ENABLED"
+ update_dict["_admin.detailed-status"] = ""
+ unset_dict[op_text + "detailed-status"] = None
+ update_dict[op_text + "operationState"] = "COMPLETED"
+ return
+
+ except Exception as e:
+ error_text = "{}: {}".format(step, e)
+ self.logger.error("{} for {}: {}".format(step, target_id, e))
+
+ finally:
+ if update_dict or unset_dict:
+ if error_text:
+ update_dict[op_text + "operationState"] = "FAILED"
+ update_dict[op_text + "detailed-status"] = error_text
+ unset_dict.pop(op_text + "detailed-status", None)
+ update_dict["_admin.operationalState"] = "ERROR"
+ update_dict["_admin.detailed-status"] = error_text
+ if op_text:
+ update_dict[op_text + "statusEnteredTime"] = now
+ self.db.set_one(target_database, q_filter={"_id": _id}, update_dict=update_dict, unset=unset_dict,
+ fail_on_empty=False)
+ if not loaded:
+ self._unload_vim(target_id)
+
+ def _reload_vim(self, target_id):
+ if target_id in self.vim_targets:
+ self._load_vim(target_id)
+ else:
+ # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
+ # just remove it to force load again next time it is needed
+ self.db_vims.pop(target_id, None)
+
+ def _load_vim(self, target_id):
+ """
+ Load or reload a vim_account, sdn_controller or wim_account.
+ Read content from database, load the plugin if not loaded.
+ In case of error loading the plugin, it load a failing VIM_connector
+ It fills self db_vims dictionary, my_vims dictionary and vim_targets list
+ :param target_id: Contains type:_id; where type can be 'vim', ...
+ :return: None if ok, descriptive text if error
+ """
+ target, _, _id = target_id.partition(":")
+ target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
+ plugin_name = ""
+ vim = None
+ try:
+ step = "Getting {}={} from db".format(target, _id)
+ # TODO process for wim, sdnc, ...
+ vim = self.db.get_one(target_database, {"_id": _id})
+
+ # if deep_get(vim, "config", "sdn-controller"):
+ # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
+ # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
+
+ step = "Decrypting password"
+ schema_version = vim.get("schema_version")
+ self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
+ schema_version=schema_version, salt=_id)
+ self._process_vim_config(target_id, vim)
+ if target == "vim":
+ plugin_name = "rovim_" + vim["vim_type"]
+ step = "Loading plugin '{}'".format(plugin_name)
+ vim_module_conn = self._load_plugin(plugin_name)
+ step = "Loading {}'".format(target_id)
+ self.my_vims[target_id] = vim_module_conn(
+ uuid=vim['_id'], name=vim['name'],
+ tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
+ url=vim['vim_url'], url_admin=None,
+ user=vim['vim_user'], passwd=vim['vim_password'],
+ config=vim.get('config') or {}, persistent_info={}
+ )
+ else: # sdn
+ plugin_name = "rosdn_" + vim["type"]
+ step = "Loading plugin '{}'".format(plugin_name)
+ vim_module_conn = self._load_plugin(plugin_name, "sdn")
+ step = "Loading {}'".format(target_id)
+ wim = deepcopy(vim)
+ wim_config = wim.pop("config", {}) or {}
+ wim["uuid"] = wim["_id"]
+ wim["wim_url"] = wim["url"]
+ if wim.get("dpid"):
+ wim_config["dpid"] = wim.pop("dpid")
+ if wim.get("switch_id"):
+ wim_config["switch_id"] = wim.pop("switch_id")
+ self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config) # wim, wim_account, config
+ self.db_vims[target_id] = vim
+ self.error_status = None
+ self.logger.info("Connector loaded for {}, plugin={}".format(target_id, plugin_name))
+ except Exception as e:
+ self.logger.error("Cannot load {} plugin={}: {} {}".format(
+ target_id, plugin_name, step, e))
+ self.db_vims[target_id] = vim or {}
+ self.db_vims[target_id] = FailingConnector(str(e))
+ error_status = "{} Error: {}".format(step, e)
+ return error_status
+ finally:
+ if target_id not in self.vim_targets:
+ self.vim_targets.append(target_id)
+
+ def _get_db_task(self):
+ """
+ Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
+ :return: None
+ """
+ now = time.time()
+ if not self.time_last_task_processed:
+ self.time_last_task_processed = now
+ try:
+ while True:
+ locked = self.db.set_one(
+ "ro_tasks",
+ q_filter={"target_id": self.vim_targets,
+ "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "locked_at.lt": now - self.task_locked_time,
+ "to_check_at.lt": self.time_last_task_processed},
+ update_dict={"locked_by": self.my_id, "locked_at": now},
+ fail_on_empty=False)
+ if locked:
+ # read and return
+ ro_task = self.db.get_one(
+ "ro_tasks",
+ q_filter={"target_id": self.vim_targets,
+ "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "locked_at": now})
+ return ro_task
+ if self.time_last_task_processed == now:
+ self.time_last_task_processed = None
+ return None
+ else:
+ self.time_last_task_processed = now
+ # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
+
+ except DbException as e:
+ self.logger.error("Database exception at _get_db_task: {}".format(e))
+ except Exception as e:
+ self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
+ return None
+
+ def _delete_task(self, ro_task, task_index, task_depends, db_update):
+ """
+ Determine if this task need to be done or superseded
+ :return: None
+ """
+ my_task = ro_task["tasks"][task_index]
+ task_id = my_task["task_id"]
+ needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
+ if my_task["status"] == "FAILED":
+ return None, None # TODO need to be retry??
+ try:
+ for index, task in enumerate(ro_task["tasks"]):
+ if index == task_index or not task:
+ continue # own task
+ if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
+ # set to finished
+ db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
+ elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
+ needed_delete = False
+ if needed_delete:
+ return self.item2class[my_task["item"]].delete(ro_task, task_index)
+ else:
+ return "SUPERSEDED", None
+ except Exception as e:
+ if not isinstance(e, NsWorkerException):
+ self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
+ exc_info=True)
+ return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+
+ def _create_task(self, ro_task, task_index, task_depends, db_update):
+ """
+ Determine if this task need to create something at VIM
+ :return: None
+ """
+ my_task = ro_task["tasks"][task_index]
+ task_id = my_task["task_id"]
+ task_status = None
+ if my_task["status"] == "FAILED":
+ return None, None # TODO need to be retry??
+ elif my_task["status"] == "SCHEDULED":
+ # check if already created by another task
+ for index, task in enumerate(ro_task["tasks"]):
+ if index == task_index or not task:
+ continue # own task
+ if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
+ return task["status"], "COPY_VIM_INFO"
+
+ try:
+ task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
+ ro_task, task_index, task_depends)
+ # TODO update other CREATE tasks
+ except Exception as e:
+ if not isinstance(e, NsWorkerException):
+ self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
+ task_status = "FAILED"
+ ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+ # TODO update ro_vim_item_update
+ return task_status, ro_vim_item_update
+ else:
+ return None, None
+
+ def _get_dependency(self, task_id, ro_task=None, target_id=None):
+ """
+ Look for dependency task
+ :param task_id: Can be one of
+ 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
+ 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
+ 3. task.task_id: "<action_id>:number"
+ :param ro_task:
+ :param target_id:
+ :return: database ro_task plus index of task
+ """
+ if task_id.startswith("vim:") or task_id.startswith("sdn:") or task_id.startswith("wim:"):
+ target_id, _, task_id = task_id.partition(" ")
+
+ if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
+ ro_task_dependency = self.db.get_one(
+ "ro_tasks",
+ q_filter={"target_id": target_id,
+ "tasks.target_record_id": task_id
+ },
+ fail_on_empty=False)
+ if ro_task_dependency:
+ for task_index, task in enumerate(ro_task_dependency["tasks"]):
+ if task["target_record_id"] == task_id:
+ return ro_task_dependency, task_index
+
+ else:
+ if ro_task:
+ for task_index, task in enumerate(ro_task["tasks"]):
+ if task and task["task_id"] == task_id:
+ return ro_task, task_index
+ ro_task_dependency = self.db.get_one(
+ "ro_tasks",
+ q_filter={"tasks.ANYINDEX.task_id": task_id,
+ "tasks.ANYINDEX.target_record.ne": None
+ },
+ fail_on_empty=False)
+ if ro_task_dependency:
+ for task_index, task in ro_task_dependency["tasks"]:
+ if task["task_id"] == task_id:
+ return ro_task_dependency, task_index
+ raise NsWorkerException("Cannot get depending task {}".format(task_id))
+
+ def _process_pending_tasks(self, ro_task):
+ ro_task_id = ro_task["_id"]
+ now = time.time()
+ next_check_at = now + (24*60*60) # one day
+ db_ro_task_update = {}
+
+ def _update_refresh(new_status):
+ # compute next_refresh
+ nonlocal task
+ nonlocal next_check_at
+ nonlocal db_ro_task_update
+ nonlocal ro_task