X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns_thread.py;h=bca21e0cbdd814217ce2276ea46c6cc8c33e8785;hp=4a7de944c83aa833135023a1c7ee34c29865ec93;hb=df48655ed39c3b5202289bcc84cd1618ce5464af;hpb=06e914fba9a8f3b8861a98f175c6a6975882be00 diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py index 4a7de944..bca21e0c 100644 --- a/NG-RO/osm_ng_ro/ns_thread.py +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -337,6 +337,399 @@ class VimInteractionNet(VimInteractionBase): return "DONE", ro_vim_item_update_ok +class VimInteractionClassification(VimInteractionBase): + def new(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + target_vim = self.my_vims[ro_task["target_id"]] + + try: + created = True + params = task["params"] + params_copy = deepcopy(params) + + name = params_copy.pop("name") + logical_source_port_index = int( + params_copy.pop("logical_source_port_index") + ) + logical_source_port = params_copy["logical_source_port"] + + if logical_source_port.startswith("TASK-"): + vm_id = task_depends[logical_source_port] + params_copy["logical_source_port"] = target_vim.refresh_vms_status( + [vm_id] + )[vm_id]["interfaces"][logical_source_port_index]["vim_interface_id"] + + vim_classification_id = target_vim.new_classification( + name, "legacy_flow_classifier", params_copy + ) + + ro_vim_item_update = { + "vim_id": vim_classification_id, + "vim_status": "DONE", + "created": created, + "vim_details": None, + "vim_message": None, + } + self.logger.debug( + "task={} {} created={}".format(task_id, ro_task["target_id"], created) + ) + + return "DONE", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.debug(traceback.format_exc()) + self.logger.error( + "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update + + def delete(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + classification_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_message": "DELETED", + "vim_id": None, + } + + try: + if classification_vim_id: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_classification(classification_vim_id) + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_message"] = "already deleted" + except vimconn.VimConnException as e: + self.logger.error( + "ro_task={} vim={} del-classification={}: {}".format( + ro_task["_id"], ro_task["target_id"], classification_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_message": "Error while deleting: {}".format(e), + } + + return "FAILED", ro_vim_item_update + + self.logger.debug( + "task={} {} del-classification={} {}".format( + task_id, + ro_task["target_id"], + classification_vim_id, + ro_vim_item_update_ok.get("vim_message", ""), + ) + ) + + return "DONE", ro_vim_item_update_ok + + +class VimInteractionSfi(VimInteractionBase): + def new(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + target_vim = self.my_vims[ro_task["target_id"]] + + try: + created = True + params = task["params"] + params_copy = deepcopy(params) + name = params_copy["name"] + ingress_port = params_copy["ingress_port"] + egress_port = params_copy["egress_port"] + ingress_port_index = params_copy["ingress_port_index"] + egress_port_index = params_copy["egress_port_index"] + + ingress_port_id = ingress_port + egress_port_id = egress_port + + vm_id = task_depends[ingress_port] + + if ingress_port.startswith("TASK-"): + ingress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][ + "interfaces" + ][ingress_port_index]["vim_interface_id"] + + if ingress_port == egress_port: + egress_port_id = ingress_port_id + else: + if egress_port.startswith("TASK-"): + egress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][ + "interfaces" + ][egress_port_index]["vim_interface_id"] + + ingress_port_id_list = [ingress_port_id] + egress_port_id_list = [egress_port_id] + + vim_sfi_id = target_vim.new_sfi( + name, ingress_port_id_list, egress_port_id_list, sfc_encap=False + ) + + ro_vim_item_update = { + "vim_id": vim_sfi_id, + "vim_status": "DONE", + "created": created, + "vim_details": None, + "vim_message": None, + } + self.logger.debug( + "task={} {} created={}".format(task_id, ro_task["target_id"], created) + ) + + return "DONE", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.debug(traceback.format_exc()) + self.logger.error( + "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update + + def delete(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + sfi_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_message": "DELETED", + "vim_id": None, + } + + try: + if sfi_vim_id: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_sfi(sfi_vim_id) + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_message"] = "already deleted" + except vimconn.VimConnException as e: + self.logger.error( + "ro_task={} vim={} del-sfi={}: {}".format( + ro_task["_id"], ro_task["target_id"], sfi_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_message": "Error while deleting: {}".format(e), + } + + return "FAILED", ro_vim_item_update + + self.logger.debug( + "task={} {} del-sfi={} {}".format( + task_id, + ro_task["target_id"], + sfi_vim_id, + ro_vim_item_update_ok.get("vim_message", ""), + ) + ) + + return "DONE", ro_vim_item_update_ok + + +class VimInteractionSf(VimInteractionBase): + def new(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + target_vim = self.my_vims[ro_task["target_id"]] + + try: + created = True + params = task["params"] + params_copy = deepcopy(params) + name = params_copy["name"] + sfi_list = params_copy["sfis"] + sfi_id_list = [] + + for sfi in sfi_list: + sfi_id = task_depends[sfi] if sfi.startswith("TASK-") else sfi + sfi_id_list.append(sfi_id) + + vim_sf_id = target_vim.new_sf(name, sfi_id_list, sfc_encap=False) + + ro_vim_item_update = { + "vim_id": vim_sf_id, + "vim_status": "DONE", + "created": created, + "vim_details": None, + "vim_message": None, + } + self.logger.debug( + "task={} {} created={}".format(task_id, ro_task["target_id"], created) + ) + + return "DONE", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.debug(traceback.format_exc()) + self.logger.error( + "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update + + def delete(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + sf_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_message": "DELETED", + "vim_id": None, + } + + try: + if sf_vim_id: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_sf(sf_vim_id) + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_message"] = "already deleted" + except vimconn.VimConnException as e: + self.logger.error( + "ro_task={} vim={} del-sf={}: {}".format( + ro_task["_id"], ro_task["target_id"], sf_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_message": "Error while deleting: {}".format(e), + } + + return "FAILED", ro_vim_item_update + + self.logger.debug( + "task={} {} del-sf={} {}".format( + task_id, + ro_task["target_id"], + sf_vim_id, + ro_vim_item_update_ok.get("vim_message", ""), + ) + ) + + return "DONE", ro_vim_item_update_ok + + +class VimInteractionSfp(VimInteractionBase): + def new(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + target_vim = self.my_vims[ro_task["target_id"]] + + try: + created = True + params = task["params"] + params_copy = deepcopy(params) + name = params_copy["name"] + sf_list = params_copy["sfs"] + classification_list = params_copy["classifications"] + + classification_id_list = [] + sf_id_list = [] + + for classification in classification_list: + classi_id = ( + task_depends[classification] + if classification.startswith("TASK-") + else classification + ) + classification_id_list.append(classi_id) + + for sf in sf_list: + sf_id = task_depends[sf] if sf.startswith("TASK-") else sf + sf_id_list.append(sf_id) + + vim_sfp_id = target_vim.new_sfp( + name, classification_id_list, sf_id_list, sfc_encap=False + ) + + ro_vim_item_update = { + "vim_id": vim_sfp_id, + "vim_status": "DONE", + "created": created, + "vim_details": None, + "vim_message": None, + } + self.logger.debug( + "task={} {} created={}".format(task_id, ro_task["target_id"], created) + ) + + return "DONE", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.debug(traceback.format_exc()) + self.logger.error( + "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "created": created, + "vim_message": str(e), + } + + return "FAILED", ro_vim_item_update + + def delete(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + sfp_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = { + "vim_status": "DELETED", + "created": False, + "vim_message": "DELETED", + "vim_id": None, + } + + try: + if sfp_vim_id: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_sfp(sfp_vim_id) + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_message"] = "already deleted" + except vimconn.VimConnException as e: + self.logger.error( + "ro_task={} vim={} del-sfp={}: {}".format( + ro_task["_id"], ro_task["target_id"], sfp_vim_id, e + ) + ) + ro_vim_item_update = { + "vim_status": "VIM_ERROR", + "vim_message": "Error while deleting: {}".format(e), + } + + return "FAILED", ro_vim_item_update + + self.logger.debug( + "task={} {} del-sfp={} {}".format( + task_id, + ro_task["target_id"], + sfp_vim_id, + ro_vim_item_update_ok.get("vim_message", ""), + ) + ) + + return "DONE", ro_vim_item_update_ok + + class VimInteractionVdu(VimInteractionBase): max_retries_inject_ssh_key = 20 # 20 times time_retries_inject_ssh_key = 30 # wevery 30 seconds @@ -1647,6 +2040,12 @@ class NsWorker(threading.Thread): "shared-volumes": VimInteractionSharedVolume( self.db, self.my_vims, self.db_vims, self.logger ), + "classification": VimInteractionClassification( + self.db, self.my_vims, self.db_vims, self.logger + ), + "sfi": VimInteractionSfi(self.db, self.my_vims, self.db_vims, self.logger), + "sf": VimInteractionSf(self.db, self.my_vims, self.db_vims, self.logger), + "sfp": VimInteractionSfp(self.db, self.my_vims, self.db_vims, self.logger), "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger), "image": VimInteractionImage( self.db, self.my_vims, self.db_vims, self.logger