From: Eduardo Sousa Date: Fri, 30 Nov 2018 15:33:35 +0000 (+0000) Subject: Adding support to different ingress and egress ports (SFC) X-Git-Tag: v5.0.3~12 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F12%2F7012%2F16;p=osm%2FRO.git Adding support to different ingress and egress ports (SFC) TODO: update VNFFG example. Change-Id: I26cf6cc8760516203f8a4fa147bdcd2e00887d89 Signed-off-by: Eduardo Sousa --- diff --git a/database_utils/migrate_mano_db.sh b/database_utils/migrate_mano_db.sh index aa2e7186..4819db4f 100755 --- a/database_utils/migrate_mano_db.sh +++ b/database_utils/migrate_mano_db.sh @@ -34,7 +34,7 @@ DBPORT="3306" DBNAME="mano_db" QUIET_MODE="" #TODO update it with the last database version -LAST_DB_VERSION=34 +LAST_DB_VERSION=35 # Detect paths MYSQL=$(which mysql) @@ -203,6 +203,7 @@ fi #[ $OPENMANO_VER_NUM -ge 5070 ] && DB_VERSION=32 #0.5.70 => 32 #[ $OPENMANO_VER_NUM -ge 5082 ] && DB_VERSION=33 #0.5.82 => 33 #[ $OPENMANO_VER_NUM -ge 6000 ] && DB_VERSION=34 #0.6.00 => 34 +#[ $OPENMANO_VER_NUM -ge 6001 ] && DB_VERSION=35 #0.6.01 => 35 #TODO ... put next versions here function upgrade_to_1(){ @@ -1296,20 +1297,18 @@ function downgrade_from_32(){ } function upgrade_to_33(){ - echo " Add PDU information to 'vms" + echo " Add PDU information to 'vms'" sql "ALTER TABLE vms ADD COLUMN pdu_type VARCHAR(255) NULL DEFAULT NULL AFTER osm_id;" sql "ALTER TABLE instance_nets ADD COLUMN vim_name VARCHAR(255) NULL DEFAULT NULL AFTER vim_net_id;" sql "INSERT INTO schema_version (version_int, version, openmano_ver, comments, date) "\ "VALUES (33, '0.33', '0.5.82', 'Add pdu information to vms', '2018-11-13');" } function downgrade_from_33(){ - echo " Remove back PDU information from' vms'" + echo " Remove back PDU information from 'vms'" sql "ALTER TABLE vms DROP COLUMN pdu_type;" sql "ALTER TABLE instance_nets DROP COLUMN vim_name;" sql "DELETE FROM schema_version WHERE version_int='33';" } - - function upgrade_to_X(){ echo " change 'datacenter_nets'" sql "ALTER TABLE datacenter_nets ADD COLUMN vim_tenant_id VARCHAR(36) NOT NULL AFTER datacenter_id, DROP INDEX name_datacenter_id, ADD UNIQUE INDEX name_datacenter_id (name, datacenter_id, vim_tenant_id);" @@ -1318,19 +1317,26 @@ function downgrade_from_X(){ echo " Change back 'datacenter_nets'" sql "ALTER TABLE datacenter_nets DROP COLUMN vim_tenant_id, DROP INDEX name_datacenter_id, ADD UNIQUE INDEX name_datacenter_id (name, datacenter_id);" } - function upgrade_to_34() { echo " Create databases required for WIM features" script="$(find "${DBUTILS}/migrations/up" -iname "34*.sql" | tail -1)" sql "source ${script}" } - function downgrade_from_34() { echo " Drop databases required for WIM features" script="$(find "${DBUTILS}/migrations/down" -iname "34*.sql" | tail -1)" sql "source ${script}" } - +function upgrade_to_35(){ + echo " Create databases required for WIM features" + script="$(find "${DBUTILS}/migrations/up" -iname "35*.sql" | tail -1)" + sql "source ${script}" +} +function downgrade_from_35(){ + echo " Drop databases required for WIM features" + script="$(find "${DBUTILS}/migrations/down" -iname "35*.sql" | tail -1)" + sql "source ${script}" +} #TODO ... put functions here # echo "db version = "${DATABASE_VER_NUM} diff --git a/database_utils/migrations/down/35_remove_sfc_ingress_and_egress.sql b/database_utils/migrations/down/35_remove_sfc_ingress_and_egress.sql new file mode 100644 index 00000000..01f38f4d --- /dev/null +++ b/database_utils/migrations/down/35_remove_sfc_ingress_and_egress.sql @@ -0,0 +1,16 @@ +-- +-- Removing ingress and egress ports for SFC purposes. +-- Inserting only one port for ingress and egress. +-- + +ALTER TABLE sce_rsp_hops + DROP FOREIGN KEY FK_interfaces_rsp_hop_ingress, + CHANGE COLUMN ingress_interface_id interface_id VARCHAR(36) NOT NULL + AFTER if_order, + ADD CONSTRAINT FK_interfaces_rsp_hop + FOREIGN KEY (interface_id) + REFERENCES interfaces (uuid) ON UPDATE CASCADE ON DELETE CASCADE, + DROP FOREIGN KEY FK_interfaces_rsp_hop_egress, + DROP COLUMN egress_interface_id; + +DELETE FROM schema_version WHERE version_int='35'; diff --git a/database_utils/migrations/up/35_add_sfc_ingress_and_egress.sql b/database_utils/migrations/up/35_add_sfc_ingress_and_egress.sql new file mode 100644 index 00000000..b528c6da --- /dev/null +++ b/database_utils/migrations/up/35_add_sfc_ingress_and_egress.sql @@ -0,0 +1,29 @@ +-- +-- Adding different ingress and egress ports for SFC. +-- + +ALTER TABLE sce_rsp_hops + DROP FOREIGN KEY FK_interfaces_rsp_hop, + CHANGE COLUMN interface_id ingress_interface_id VARCHAR(36) NOT NULL + AFTER if_order, + ADD CONSTRAINT FK_interfaces_rsp_hop_ingress + FOREIGN KEY (ingress_interface_id) + REFERENCES interfaces (uuid) ON UPDATE CASCADE ON DELETE CASCADE, + ADD COLUMN egress_interface_id VARCHAR(36) NULL DEFAULT NULL + AFTER ingress_interface_id; + +UPDATE sce_rsp_hops + SET egress_interface_id = ingress_interface_id; + +ALTER TABLE sce_rsp_hops + ALTER COLUMN egress_interface_id DROP DEFAULT; + +ALTER TABLE sce_rsp_hops + MODIFY COLUMN egress_interface_id VARCHAR(36) NOT NULL + AFTER ingress_interface_id, + ADD CONSTRAINT FK_interfaces_rsp_hop_egress + FOREIGN KEY (egress_interface_id) + REFERENCES interfaces (uuid) ON UPDATE CASCADE ON DELETE CASCADE; + +INSERT INTO schema_version (version_int, version, openmano_ver, comments, date) + VALUES (35, '0.35', '0.6.02', 'Adding ingress and egress ports for RSPs', '2018-12-11'); diff --git a/openmanod b/openmanod index 4ba00fd0..f16a54cd 100755 --- a/openmanod +++ b/openmanod @@ -50,9 +50,9 @@ import osm_ro __author__ = "Alfonso Tierno, Gerardo Garcia, Pablo Montes" __date__ = "$26-aug-2014 11:09:29$" -__version__ = "0.6.01" +__version__ = "0.6.02" version_date = "Dec 2018" -database_version = 34 # expected database schema version +database_version = 35 # expected database schema version global global_config global logger diff --git a/osm_ro/nfvo.py b/osm_ro/nfvo.py index 19c8b7d3..56062c70 100644 --- a/osm_ro/nfvo.py +++ b/osm_ro/nfvo.py @@ -2459,25 +2459,41 @@ def new_nsd_v3(mydb, tenant_id, nsd_descriptor): str(nsd["id"]), str(rsp["id"]), str(iface["member-vnf-index-ref"])), httperrors.Bad_Request) - existing_ifaces = mydb.get_rows(SELECT=('i.uuid as uuid',), - FROM="interfaces as i join vms on i.vm_id=vms.uuid", - WHERE={'vnf_id': vnf_index2vnf_uuid[vnf_index], - 'external_name': get_str(iface, "vnfd-connection-point-ref", - 255)}) - if not existing_ifaces: + ingress_existing_ifaces = mydb.get_rows(SELECT=('i.uuid as uuid',), + FROM="interfaces as i join vms on i.vm_id=vms.uuid", + WHERE={ + 'vnf_id': vnf_index2vnf_uuid[vnf_index], + 'external_name': get_str(iface, "vnfd-ingress-connection-point-ref", + 255)}) + if not ingress_existing_ifaces: raise NfvoException("Error. Invalid NS descriptor at 'nsd[{}]':'rsp[{}]':'vnfd-connection-point" - "-ref':'vnfd-connection-point-ref':'{}'. Reference to a non-existing " + "-ref':'vnfd-ingress-connection-point-ref':'{}'. Reference to a non-existing " "connection-point name at VNFD '{}'".format( - str(nsd["id"]), str(rsp["id"]), str(iface["vnfd-connection-point-ref"]), - str(iface.get("vnfd-id-ref"))[:255]), - httperrors.Bad_Request) - interface_uuid = existing_ifaces[0]["uuid"] + str(nsd["id"]), str(rsp["id"]), str(iface["vnfd-ingress-connection-point-ref"]), + str(iface.get("vnfd-id-ref"))[:255]), httperrors.Bad_Request) + + egress_existing_ifaces = mydb.get_rows(SELECT=('i.uuid as uuid',), + FROM="interfaces as i join vms on i.vm_id=vms.uuid", + WHERE={ + 'vnf_id': vnf_index2vnf_uuid[vnf_index], + 'external_name': get_str(iface, "vnfd-egress-connection-point-ref", + 255)}) + if not egress_existing_ifaces: + raise NfvoException("Error. Invalid NS descriptor at 'nsd[{}]':'rsp[{}]':'vnfd-connection-point" + "-ref':'vnfd-egress-connection-point-ref':'{}'. Reference to a non-existing " + "connection-point name at VNFD '{}'".format( + str(nsd["id"]), str(rsp["id"]), str(iface["vnfd-egress-connection-point-ref"]), + str(iface.get("vnfd-id-ref"))[:255]), HTTP_Bad_Request) + + ingress_interface_uuid = ingress_existing_ifaces[0]["uuid"] + egress_interface_uuid = egress_existing_ifaces[0]["uuid"] sce_rsp_hop_uuid = str(uuid4()) uuid_list.append(sce_rsp_hop_uuid) db_sce_rsp_hop = { "uuid": sce_rsp_hop_uuid, "if_order": if_order, - "interface_id": interface_uuid, + "ingress_interface_id": ingress_interface_uuid, + "egress_interface_id": egress_interface_uuid, "sce_vnf_id": vnf_index2scevnf_uuid[vnf_index], "sce_rsp_id": sce_rsp_uuid, } @@ -2958,8 +2974,8 @@ def get_datacenter_by_name_uuid(mydb, tenant_id, datacenter_id_name=None, **extr def update(d, u): - '''Takes dict d and updates it with the values in dict u.''' - '''It merges all depth levels''' + """Takes dict d and updates it with the values in dict u. + It merges all depth levels""" for k, v in u.iteritems(): if isinstance(v, collections.Mapping): r = update(d.get(k, {}), v) @@ -3352,8 +3368,9 @@ def create_instance(mydb, tenant_id, instance_dict): sfs_created = [] for cp in rsp['connection_points']: count = mydb.get_rows( - SELECT=('vms.count'), - FROM="vms join interfaces on vms.uuid=interfaces.vm_id join sce_rsp_hops as h on interfaces.uuid=h.interface_id", + SELECT='vms.count', + FROM="vms join interfaces on vms.uuid=interfaces.vm_id join sce_rsp_hops as h " + "on interfaces.uuid=h.ingress_interface_id", WHERE={'h.uuid': cp['uuid']})[0]['count'] instance_vnf = next((item for item in db_instance_vnfs if item['sce_vnf_id'] == cp['sce_vnf_id']), None) instance_vms = [item for item in db_instance_vms if item['instance_vnf_id'] == instance_vnf['uuid']] @@ -3368,6 +3385,10 @@ def create_instance(mydb, tenant_id, instance_dict): for i in range(count): # create sfis sfi_uuid = str(uuid4()) + extra_params = { + "ingress_interface_id": cp["ingress_interface_id"], + "egress_interface_id": cp["egress_interface_id"] + } uuid_list.append(sfi_uuid) db_sfi = { "uuid": sfi_uuid, @@ -3386,7 +3407,7 @@ def create_instance(mydb, tenant_id, instance_dict): "status": "SCHEDULED", "item": "instance_sfis", "item_id": sfi_uuid, - "extra": yaml.safe_dump({"params": "", "depends_on": [dependencies[i]]}, + "extra": yaml.safe_dump({"params": extra_params, "depends_on": [dependencies[i]]}, default_flow_style=True, width=256) } sfis_created.append(task_index) diff --git a/osm_ro/nfvo_db.py b/osm_ro/nfvo_db.py index 9d528030..99562e0e 100644 --- a/osm_ro/nfvo_db.py +++ b/osm_ro/nfvo_db.py @@ -21,9 +21,9 @@ # contact with: nfvlabs@tid.es ## -''' +""" NFVO DB engine. It implements all the methods to interact with the Openmano Database -''' +""" __author__="Alfonso Tierno, Gerardo Garcia, Pablo Montes" __date__ ="$28-aug-2014 10:05:01$" @@ -724,7 +724,8 @@ class nfvo_db(db_base.db_base): self.cur.execute(cmd) vnffg['rsps'] = self.cur.fetchall() for rsp in vnffg['rsps']: - cmd = "SELECT uuid,if_order,interface_id,sce_vnf_id FROM sce_rsp_hops WHERE sce_rsp_id='{}' "\ + cmd = "SELECT uuid,if_order,ingress_interface_id,egress_interface_id,sce_vnf_id " \ + "FROM sce_rsp_hops WHERE sce_rsp_id='{}' "\ "ORDER BY created_at".format(rsp['uuid']) self.logger.debug(cmd) self.cur.execute(cmd) diff --git a/osm_ro/vim_thread.py b/osm_ro/vim_thread.py index e611c5e7..f7bc40ba 100644 --- a/osm_ro/vim_thread.py +++ b/osm_ro/vim_thread.py @@ -83,6 +83,7 @@ vim_module = { "vmware": vimconn_vmware, } + def is_task_id(task_id): return task_id.startswith("TASK-") @@ -96,8 +97,8 @@ class VimThreadExceptionNotFound(VimThreadException): class vim_thread(threading.Thread): - REFRESH_BUILD = 5 # 5 seconds - REFRESH_ACTIVE = 60 # 1 minute + REFRESH_BUILD = 5 # 5 seconds + REFRESH_ACTIVE = 60 # 1 minute def __init__(self, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None, db=None, db_lock=None, ovim=None): @@ -120,7 +121,7 @@ class vim_thread(threading.Thread): self.name = name self.vim_persistent_info = {} - self.logger = logging.getLogger('openmano.vim.'+self.name) + self.logger = logging.getLogger('openmano.vim.' + self.name) self.db = db self.db_lock = db_lock @@ -142,7 +143,7 @@ class vim_thread(threading.Thread): def get_vimconnector(self): try: - from_= "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid" + from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid" select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin', 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id', 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id', @@ -346,11 +347,11 @@ class vim_thread(threading.Thread): task_interface["sdn_port_id"] = sdn_port_id task_need_update = True except (ovimException, Exception) as e: - error_text = "ovimException creating new_external_port compute_node={}"\ + error_text = "ovimException creating new_external_port compute_node={}" \ " pci={} vlan={} {}".format( - interface["compute_node"], - interface["pci"], - interface.get("vlan"), e) + interface["compute_node"], + interface["pci"], + interface.get("vlan"), e) self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) task_warning_msg += error_text # TODO Set error_msg at instance_nets instead of instance VMs @@ -360,6 +361,7 @@ class vim_thread(threading.Thread): 'instance_interfaces', UPDATE={"mac_address": interface.get("mac_address"), "ip_address": interface.get("ip_address"), + "vim_interface_id": interface.get("vim_interface_id"), "vim_info": interface.get("vim_info"), "sdn_port_id": task_interface.get("sdn_port_id"), "compute_node": interface.get("compute_node"), @@ -441,8 +443,8 @@ class vim_thread(threading.Thread): vim_info_error_msg = str(sdn_net.get("last_error")) else: vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format( - self._format_vim_error_msg(vim_info_error_msg, 1024//2-14), - self._format_vim_error_msg(sdn_net["last_error"], 1024//2-14)) + self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14), + self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14)) vim_info_status = "ERROR" elif sdn_net["status"] == "BUILD": if vim_info_status == "ACTIVE": @@ -628,7 +630,7 @@ class vim_thread(threading.Thread): database_update["vim_net_id"] = None no_refresh_tasks = ['instance_sfis', 'instance_sfs', - 'instance_classifications', 'instance_sfps'] + 'instance_classifications', 'instance_sfps'] if task["action"] == "DELETE": action_key = task["item"] + task["item_id"] del self.grouped_tasks[action_key] @@ -692,10 +694,10 @@ class vim_thread(threading.Thread): continue index = int(task_id) - if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\ - vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]: - task["depends"]["TASK-" + str(index)] = vim_actions_list[index] - task["depends"]["TASK-{}.{}".format(task["instance_action_id"], index)] = vim_actions_list[index] + if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and \ + vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]: + task["depends"]["TASK-" + str(index)] = vim_actions_list[index] + task["depends"]["TASK-{}.{}".format(task["instance_action_id"], index)] = vim_actions_list[index] if extra.get("interfaces"): task["vim_interfaces"] = {} else: @@ -754,7 +756,7 @@ class vim_thread(threading.Thread): if task["status"] == "SCHEDULED": task["status"] = "SUPERSEDED" return True - else: # task["status"] == "processing" + else: # task["status"] == "processing" self.task_lock.release() return False @@ -831,7 +833,7 @@ class vim_thread(threading.Thread): @staticmethod def _format_vim_error_msg(error_text, max_length=1024): if error_text and len(error_text) >= max_length: - return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:] + return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:] return error_text def new_vm(self, task): @@ -864,14 +866,15 @@ class vim_thread(threading.Thread): task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]} with self.db_lock: result = self.db.get_rows( - SELECT=('sdn_net_id',), + SELECT=('sdn_net_id', 'interface_id'), FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid', WHERE={'ii.uuid': iface["uuid"]}) if result: task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id'] + task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id'] else: self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id, - iface["uuid"]), exc_info=True) + iface["uuid"]), exc_info=True) task["vim_info"] = {} task["vim_interfaces"] = {} @@ -1064,17 +1067,39 @@ class vim_thread(threading.Thread): def new_sfi(self, task): vim_sfi_id = None try: + # Waits for interfaces to be ready (avoids failure) + time.sleep(1) dep_id = "TASK-" + str(task["extra"]["depends_on"][0]) task_id = task["instance_action_id"] + "." + str(task["task_index"]) error_text = "" - interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys() + interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces") + ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id") + egress_interface_id = task.get("extra").get("params").get("egress_interface_id") + ingress_vim_interface_id = None + egress_vim_interface_id = None + for vim_interface, interface_data in interfaces.iteritems(): + if interface_data.get("interface_id") == ingress_interface_id: + ingress_vim_interface_id = vim_interface + break + if ingress_interface_id != egress_interface_id: + for vim_interface, interface_data in interfaces.iteritems(): + if interface_data.get("interface_id") == egress_interface_id: + egress_vim_interface_id = vim_interface + break + else: + egress_vim_interface_id = ingress_vim_interface_id + if not ingress_vim_interface_id or not egress_vim_interface_id: + self.logger.error("Error creating Service Function Instance, Ingress: %s, Egress: %s", + ingress_vim_interface_id, egress_vim_interface_id) + return False, None # At the moment, every port associated with the VM will be used both as ingress and egress ports. # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the # first ingress and first egress ports will be used to create the SFI (Port Pair). - port_id_list = [interfaces[0]] + ingress_port_id_list = [ingress_vim_interface_id] + egress_port_id_list = [egress_vim_interface_id] name = "sfi-%s" % task["item_id"][:8] # By default no form of IETF SFC Encapsulation will be used - vim_sfi_id = self.vim.new_sfi(name, port_id_list, port_id_list, sfc_encap=False) + vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False) task["extra"]["created"] = True task["error_msg"] = None @@ -1114,8 +1139,8 @@ class vim_thread(threading.Thread): try: task_id = task["instance_action_id"] + "." + str(task["task_index"]) error_text = "" - depending_tasks = [ "TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]] - #sfis = task.get("depends").values()[0].get("extra").get("params")[5] + depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]] + # sfis = task.get("depends").values()[0].get("extra").get("params")[5] sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks] sfi_id_list = [] for sfi in sfis: @@ -1186,14 +1211,14 @@ class vim_thread(threading.Thread): if '/' not in destination_ip: destination_ip += '/32' definition = { - "logical_source_port": interfaces[0], - "protocol": ip_proto, - "source_ip_prefix": source_ip, - "destination_ip_prefix": destination_ip, - "source_port_range_min": params.get("source_port"), - "source_port_range_max": params.get("source_port"), - "destination_port_range_min": params.get("destination_port"), - "destination_port_range_max": params.get("destination_port"), + "logical_source_port": interfaces[0], + "protocol": ip_proto, + "source_ip_prefix": source_ip, + "destination_ip_prefix": destination_ip, + "source_port_range_min": params.get("source_port"), + "source_port_range_max": params.get("source_port"), + "destination_port_range_min": params.get("destination_port"), + "destination_port_range_max": params.get("destination_port"), } vim_classification_id = self.vim.new_classification(