X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_ro%2Fvim_thread.py;h=d57ec470cff7ab9d4a517e81496ad4eca915c78a;hb=a05b65a809f3680d38f162b1e980e4258b0e37c2;hp=8d44f4728905c1b5fd2c413d9d4ae45d2de759b1;hpb=d0597e0798d118f3e7722d634d168167dbb08b4b;p=osm%2FRO.git diff --git a/osm_ro/vim_thread.py b/osm_ro/vim_thread.py index 8d44f472..d57ec470 100644 --- a/osm_ro/vim_thread.py +++ b/osm_ro/vim_thread.py @@ -149,8 +149,7 @@ class vim_thread(threading.Thread): 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id', 'user', 'passwd', 'dt.config as dt_config') where_ = {"dt.uuid": self.datacenter_tenant_id} - with self.db_lock: - vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_) + vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_) vim = vims[0] vim_config = {} if vim["config"]: @@ -161,8 +160,9 @@ class vim_thread(threading.Thread): vim_config['datacenter_id'] = vim.get('datacenter_id') # get port_mapping - vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings( - db_filter={"region": vim_config['datacenter_id'], "pci": None}) + with self.db_lock: + vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings( + db_filter={"region": vim_config['datacenter_id'], "pci": None}) self.vim = vim_module[vim["type"]].vimconnector( uuid=vim['datacenter_id'], name=vim['datacenter_name'], @@ -193,12 +193,11 @@ class vim_thread(threading.Thread): database_limit = 200 while True: # get 200 (database_limit) entries each time - with self.db_lock: - vim_actions = self.db.get_rows(FROM="vim_wim_actions", - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, - "item_id>=": old_item_id}, - ORDER_BY=("item_id", "item", "created_at",), - LIMIT=database_limit) + vim_actions = self.db.get_rows(FROM="vim_wim_actions", + WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + "item_id>=": old_item_id}, + ORDER_BY=("item_id", "item", "created_at",), + LIMIT=database_limit) for task in vim_actions: item = task["item"] item_id = task["item_id"] @@ -342,12 +341,12 @@ class vim_thread(threading.Thread): with self.db_lock: sdn_port_id = self.ovim.new_external_port( {"compute_node": interface["compute_node"], - "pci": interface["pci"], - "vlan": interface.get("vlan"), - "net_id": sdn_net_id, - "region": self.vim["config"]["datacenter_id"], - "name": sdn_port_name, - "mac": interface.get("mac_address")}) + "pci": interface["pci"], + "vlan": interface.get("vlan"), + "net_id": sdn_net_id, + "region": self.vim["config"]["datacenter_id"], + "name": sdn_port_name, + "mac": interface.get("mac_address")}) task_interface["sdn_port_id"] = sdn_port_id task_need_update = True except (ovimException, Exception) as e: @@ -360,18 +359,17 @@ class vim_thread(threading.Thread): task_warning_msg += error_text # TODO Set error_msg at instance_nets instead of instance VMs - with self.db_lock: - self.db.update_rows( - 'instance_interfaces', - UPDATE={"mac_address": interface.get("mac_address"), - "ip_address": interface.get("ip_address"), - "vim_interface_id": interface.get("vim_interface_id"), - "vim_info": interface.get("vim_info"), - "sdn_port_id": task_interface.get("sdn_port_id"), - "compute_node": interface.get("compute_node"), - "pci": interface.get("pci"), - "vlan": interface.get("vlan")}, - WHERE={'uuid': task_interface["iface_id"]}) + self.db.update_rows( + 'instance_interfaces', + UPDATE={"mac_address": interface.get("mac_address"), + "ip_address": interface.get("ip_address"), + "vim_interface_id": interface.get("vim_interface_id"), + "vim_info": interface.get("vim_info"), + "sdn_port_id": task_interface.get("sdn_port_id"), + "compute_node": interface.get("compute_node"), + "pci": interface.get("pci"), + "vlan": interface.get("vlan")}, + WHERE={'uuid': task_interface["iface_id"]}) task["vim_interfaces"][vim_interface_id] = interface # check and update task and instance_vms database @@ -388,8 +386,7 @@ class vim_thread(threading.Thread): temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg} if vim_info.get("vim_info"): temp_dict["vim_info"] = vim_info["vim_info"] - with self.db_lock: - self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) + self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) task["extra"]["vim_status"] = vim_info["status"] task["error_msg"] = vim_info_error_msg if vim_info.get("vim_info"): @@ -397,13 +394,12 @@ class vim_thread(threading.Thread): task_need_update = True if task_need_update: - with self.db_lock: - self.db.update_rows( - 'vim_wim_actions', - UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), - "error_msg": task.get("error_msg"), "modified_at": now}, - WHERE={'instance_action_id': task['instance_action_id'], - 'task_index': task['task_index']}) + self.db.update_rows( + 'vim_wim_actions', + UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), + "error_msg": task.get("error_msg"), "modified_at": now}, + WHERE={'instance_action_id': task['instance_action_id'], + 'task_index': task['task_index']}) if task["extra"].get("vim_status") == "BUILD": self._insert_refresh(task, now + self.REFRESH_BUILD) else: @@ -466,14 +462,13 @@ class vim_thread(threading.Thread): temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg} if vim_info.get("vim_info"): temp_dict["vim_info"] = vim_info["vim_info"] - with self.db_lock: - self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) - self.db.update_rows( - 'vim_wim_actions', - UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), - "error_msg": task.get("error_msg"), "modified_at": now}, - WHERE={'instance_action_id': task['instance_action_id'], - 'task_index': task['task_index']}) + self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) + self.db.update_rows( + 'vim_wim_actions', + UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), + "error_msg": task.get("error_msg"), "modified_at": now}, + WHERE={'instance_action_id': task['instance_action_id'], + 'task_index': task['task_index']}) if task["extra"].get("vim_status") == "BUILD": self._insert_refresh(task, now + self.REFRESH_BUILD) else: @@ -648,23 +643,22 @@ class vim_thread(threading.Thread): task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"])) try: now = time.time() - with self.db_lock: + self.db.update_rows( + table="vim_wim_actions", + UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now, + "error_msg": task["error_msg"], + "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)}, + WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]}) + if result is not None: self.db.update_rows( - table="vim_wim_actions", - UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now, - "error_msg": task["error_msg"], - "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)}, - WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]}) - if result is not None: - self.db.update_rows( - table="instance_actions", - UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1}, - "modified_at": now}, - WHERE={"uuid": task["instance_action_id"]}) - if database_update: - self.db.update_rows(table=task["item"], - UPDATE=database_update, - WHERE={"uuid": task["item_id"]}) + table="instance_actions", + UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1}, + "modified_at": now}, + WHERE={"uuid": task["instance_action_id"]}) + if database_update: + self.db.update_rows(table=task["item"], + UPDATE=database_update, + WHERE={"uuid": task["item_id"]}) except db_base_Exception as e: self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True) @@ -816,9 +810,8 @@ class vim_thread(threading.Thread): if ins_action_id: instance_action_id = ins_action_id - with self.db_lock: - tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id, - "task_index": task_index}) + tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id, + "task_index": task_index}) if not tasks: return None task = tasks[0] @@ -868,11 +861,10 @@ class vim_thread(threading.Thread): task_interfaces = {} for iface in params_copy[5]: task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]} - with self.db_lock: - result = self.db.get_rows( - SELECT=('sdn_net_id', 'interface_id'), - FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid', - WHERE={'ii.uuid': iface["uuid"]}) + result = self.db.get_rows( + SELECT=('sdn_net_id', 'interface_id'), + FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid', + WHERE={'ii.uuid': iface["uuid"]}) if result: task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id'] task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id'] @@ -946,11 +938,10 @@ class vim_thread(threading.Thread): # Discover if this network is managed by a sdn controller sdn_net_id = None - with self.db_lock: - result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets', - WHERE={'vim_net_id': vim_net_id, - 'datacenter_tenant_id': self.datacenter_tenant_id}, - ORDER="instance_scenario_id") + result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets', + WHERE={'vim_net_id': vim_net_id, + 'datacenter_tenant_id': self.datacenter_tenant_id}, + ORDER="instance_scenario_id") if result: sdn_net_id = result[0]['sdn_net_id'] @@ -999,7 +990,7 @@ class vim_thread(threading.Thread): # CREATE params = task["params"] action_text = "creating VIM" - vim_net_id = self.vim.new_network(*params[0:2]) + vim_net_id = self.vim.new_network(*params[0:3]) net_name = params[0] net_type = params[1] @@ -1035,10 +1026,12 @@ class vim_thread(threading.Thread): "name": sdn_port_name, } try: - sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) + with self.db_lock: + sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) except ovimException: sdn_port_data["compute_node"] = "__WIM" - sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) + with self.db_lock: + sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id, sdn_net_id)) @@ -1229,26 +1222,31 @@ class vim_thread(threading.Thread): ip_proto = int(params.get("ip_proto")) source_ip = params.get("source_ip") destination_ip = params.get("destination_ip") - if ip_proto == 1: - ip_proto = 'icmp' - elif ip_proto == 6: - ip_proto = 'tcp' - elif ip_proto == 17: - ip_proto = 'udp' - if '/' not in source_ip: - source_ip += '/32' - if '/' not in destination_ip: - destination_ip += '/32' - definition = { - "logical_source_port": interfaces[0], - "protocol": ip_proto, - "source_ip_prefix": source_ip, - "destination_ip_prefix": destination_ip, - "source_port_range_min": params.get("source_port"), - "source_port_range_max": params.get("source_port"), - "destination_port_range_min": params.get("destination_port"), - "destination_port_range_max": params.get("destination_port"), - } + source_port = params.get("source_port") + destination_port = params.get("destination_port") + definition = {"logical_source_port": interfaces[0]} + if ip_proto: + if ip_proto == 1: + ip_proto = 'icmp' + elif ip_proto == 6: + ip_proto = 'tcp' + elif ip_proto == 17: + ip_proto = 'udp' + definition["protocol"] = ip_proto + if source_ip: + if '/' not in source_ip: + source_ip += '/32' + definition["source_ip_prefix"] = source_ip + if source_port: + definition["source_port_range_min"] = source_port + definition["source_port_range_max"] = source_port + if destination_port: + definition["destination_port_range_min"] = destination_port + definition["destination_port_range_max"] = destination_port + if destination_ip: + if '/' not in destination_ip: + destination_ip += '/32' + definition["destination_ip_prefix"] = destination_ip vim_classification_id = self.vim.new_classification( name, 'legacy_flow_classifier', definition)