From: mirabal Date: Wed, 1 Mar 2017 15:17:10 +0000 (+0100) Subject: Handle ofcs thread creation form db and openvim.cfg X-Git-Tag: v2.0.0~45 X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2Fopenvim.git;a=commitdiff_plain;h=580435e27aa04a40b8893145d181855d37965027 Handle ofcs thread creation form db and openvim.cfg - start_service create a ofc thread per sdn in db, during ofc creation and default openvimd.cfg - stop_service kill ofcs thread - OFC creation/delete via REST interface, handel creation/delete of thread Change-Id: I0c1869b870b296bfb459c2f678bc7afe4d1938bb Signed-off-by: mirabal --- diff --git a/openflow_thread.py b/openflow_thread.py index 265b167..638ab19 100644 --- a/openflow_thread.py +++ b/openflow_thread.py @@ -84,16 +84,18 @@ def change_db2of(flow): flow['actions'] = actions - class of_test_connector(): '''This is a fake openflow connector for testing. It does nothing and it is used for running openvim without an openflow controller ''' def __init__(self, params): - self.name = "ofc_test" - self.rules={} + name = params.get("name", "test-ofc") + self.name = name + self.dpid = params.get("dpid") + self.rules= {} self.logger = logging.getLogger('vim.OF.TEST') - self.logger.setLevel( getattr(logging, params.get("of_debug", "ERROR") ) ) + self.logger.setLevel(getattr(logging, params.get("of_debug", "ERROR"))) + def get_of_switches(self): return 0, () def obtain_port_correspondence(self): @@ -121,18 +123,18 @@ class of_test_connector(): class openflow_thread(threading.Thread): - def __init__(self, OF_connector, db, db_lock, of_test, pmp_with_same_vlan, debug='ERROR'): + def __init__(self, of_uuid, OF_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'): threading.Thread.__init__(self) - + self.of_uuid = of_uuid self.db = db self.pmp_with_same_vlan = pmp_with_same_vlan self.name = "openflow" self.test = of_test self.db_lock = db_lock self.OF_connector = OF_connector - self.logger = logging.getLogger('vim.OF') + self.logger = logging.getLogger('vim.OF-' + of_uuid) self.logger.setLevel( getattr(logging, debug) ) - + self.logger.name = OF_connector.name + " " + self.OF_connector.dpid self.queueLock = threading.Lock() self.taskQueue = Queue.Queue(2000) @@ -146,6 +148,7 @@ class openflow_thread(threading.Thread): return -1, "timeout inserting a task over openflow thread " + self.name def run(self): + self.logger.debug("Start openflow thread") while True: self.queueLock.acquire() if not self.taskQueue.empty(): diff --git a/openvimd.cfg b/openvimd.cfg index 148e992..9641938 100644 --- a/openvimd.cfg +++ b/openvimd.cfg @@ -35,7 +35,7 @@ mode: test #Openflow controller information -of_controller: floodlight # Type of controller to be used. +of_controller: floodlight # Type of controller to be used. # Valid controllers are 'opendaylight', 'floodlight' or #of_controller_module: module # Only needed for . Python module that implement # this controller. By default a file with the name .py is used diff --git a/ovim.py b/ovim.py index fc8b126..02114a1 100644 --- a/ovim.py +++ b/ovim.py @@ -86,11 +86,16 @@ class ovimException(Exception): class ovim(): running_info = {} #TODO OVIM move the info of running threads from config_dic to this static variable + of_module = {} + def __init__(self, configuration): self.config = configuration self.logger = logging.getLogger(configuration["logger_name"]) self.db = None self.db = self._create_database_connection() + self.db_lock = None + self.db_of = None + self.of_test_mode = False def _create_database_connection(self): db = vim_db.vim_db((self.config["network_vlan_range_start"], self.config["network_vlan_range_end"]), @@ -140,27 +145,34 @@ class ovim(): return False def start_service(self): - #if self.running_info: + """ + Start ovim services + :return: + """ + # if self.running_info: # return #TODO service can be checked and rebuild broken threads r = self.db.get_db_version() - if r[0]<0: + if r[0] < 0: raise ovimException("DATABASE is not a VIM one or it is a '0.0' version. Try to upgrade to version '{}' with "\ "'./database_utils/migrate_vim_db.sh'".format(self.config["database_version"]) ) - elif r[1]!=self.config["database_version"]: + elif r[1] != self.config["database_version"]: raise ovimException("DATABASE wrong version '{}'. Try to upgrade/downgrade to version '{}' with "\ "'./database_utils/migrate_vim_db.sh'".format(r[1], self.config["database_version"]) ) # create database connection for openflow threads - db_of = self._create_database_connection() - self.config["db"] = db_of - db_lock = threading.Lock() - self.config["db_lock"] = db_lock + self.db_of = self._create_database_connection() + self.config["db"] = self.db_of + self.db_lock = threading.Lock() + self.config["db_lock"] = self.db_lock + + self.of_test_mode = False if self.config['mode'] == 'normal' or self.config['mode'] == "OF only" else True + # precreate interfaces; [bridge:, VLAN used at Host, uuid of network camping in this bridge, + # speed in Gbit/s - # precreate interfaces; [bridge:, VLAN used at Host, uuid of network camping in this bridge, speed in Gbit/s self.config['dhcp_nets'] = [] self.config['bridge_nets'] = [] for bridge, vlan_speed in self.config["bridge_ifaces"].items(): - # skip 'development_bridge' + # skip 'development_bridge' if self.config['mode'] == 'development' and self.config['development_bridge'] == bridge: continue self.config['bridge_nets'].append([bridge, vlan_speed[0], vlan_speed[1], None]) @@ -168,7 +180,7 @@ class ovim(): # check if this bridge is already used (present at database) for a network) used_bridge_nets = [] for brnet in self.config['bridge_nets']: - r, nets = db_of.get_table(SELECT=('uuid',), FROM='nets', WHERE={'provider': "bridge:" + brnet[0]}) + r, nets = self.db_of.get_table(SELECT=('uuid',), FROM='nets', WHERE={'provider': "bridge:" + brnet[0]}) if r > 0: brnet[3] = nets[0]['uuid'] used_bridge_nets.append(brnet[0]) @@ -180,73 +192,22 @@ class ovim(): # get nets used by dhcp if self.config.get("dhcp_server"): for net in self.config["dhcp_server"].get("nets", ()): - r, nets = db_of.get_table(SELECT=('uuid',), FROM='nets', WHERE={'name': net}) + r, nets = self.db_of.get_table(SELECT=('uuid',), FROM='nets', WHERE={'name': net}) if r > 0: self.config['dhcp_nets'].append(nets[0]['uuid']) - # get host list from data base before starting threads - r, hosts = db_of.get_table(SELECT=('name', 'ip_name', 'user', 'uuid'), FROM='hosts', WHERE={'status': 'ok'}) - if r < 0: - raise ovimException("Cannot get hosts from database {}".format(hosts)) - # create connector to the openflow controller - of_test_mode = False if self.config['mode'] == 'normal' or self.config['mode'] == "OF only" else True - - if of_test_mode: - OF_conn = oft.of_test_connector({"of_debug": self.config['log_level_of']}) - else: - # load other parameters starting by of_ from config dict in a temporal dict - temp_dict = {"of_ip": self.config['of_controller_ip'], - "of_port": self.config['of_controller_port'], - "of_dpid": self.config['of_controller_dpid'], - "of_debug": self.config['log_level_of'] - } - for k, v in self.config.iteritems(): - if type(k) is str and k[0:3] == "of_" and k[0:13] != "of_controller": - temp_dict[k] = v - if self.config['of_controller'] == 'opendaylight': - module = "ODL" - elif "of_controller_module" in self.config: - module = self.config["of_controller_module"] - else: - module = self.config['of_controller'] - module_info = None - try: - module_info = imp.find_module(module) + # OFC default + self._start_ofc_default_task() - OF_conn = imp.load_module("OF_conn", *module_info) - try: - OF_conn = OF_conn.OF_conn(temp_dict) - except Exception as e: - self.logger.error("Cannot open the Openflow controller '%s': %s", type(e).__name__, str(e)) - if module_info and module_info[0]: - file.close(module_info[0]) - exit(-1) - except (IOError, ImportError) as e: - if module_info and module_info[0]: - file.close(module_info[0]) - self.logger.error( - "Cannot open openflow controller module '%s'; %s: %s; revise 'of_controller' field of configuration file.", - module, type(e).__name__, str(e)) - raise ovimException("Cannot open openflow controller module '{}'; {}: {}; revise 'of_controller' field of configuration file.".fromat( - module, type(e).__name__, str(e))) - - - # create openflow thread - thread = oft.openflow_thread(OF_conn, of_test=of_test_mode, db=db_of, db_lock=db_lock, - pmp_with_same_vlan=self.config['of_controller_nets_with_same_vlan'], - debug=self.config['log_level_of']) - r, c = thread.OF_connector.obtain_port_correspondence() - if r < 0: - raise ovimException("Cannot get openflow information %s", c) - thread.start() - self.config['of_thread'] = thread + # OFC per tenant in DB + self._start_of_db_tasks() # create dhcp_server thread host_test_mode = True if self.config['mode'] == 'test' or self.config['mode'] == "OF only" else False dhcp_params = self.config.get("dhcp_server") if dhcp_params: thread = dt.dhcp_thread(dhcp_params=dhcp_params, test=host_test_mode, dhcp_nets=self.config["dhcp_nets"], - db=db_of, db_lock=db_lock, debug=self.config['log_level_of']) + db=self.db_of, db_lock=self.db_lock, debug=self.config['log_level_of']) thread.start() self.config['dhcp_thread'] = thread @@ -254,12 +215,18 @@ class ovim(): host_test_mode = True if self.config['mode'] == 'test' or self.config['mode'] == "OF only" else False host_develop_mode = True if self.config['mode'] == 'development' else False host_develop_bridge_iface = self.config.get('development_bridge', None) + + # get host list from data base before starting threads + r, hosts = self.db_of.get_table(SELECT=('name', 'ip_name', 'user', 'uuid'), FROM='hosts', WHERE={'status': 'ok'}) + if r < 0: + raise ovimException("Cannot get hosts from database {}".format(hosts)) + self.config['host_threads'] = {} for host in hosts: host['image_path'] = '/opt/VNF/images/openvim' - thread = ht.host_thread(name=host['name'], user=host['user'], host=host['ip_name'], db=db_of, db_lock=db_lock, - test=host_test_mode, image_path=self.config['image_path'], version=self.config['version'], - host_id=host['uuid'], develop_mode=host_develop_mode, + thread = ht.host_thread(name=host['name'], user=host['user'], host=host['ip_name'], db=self.db_of, + db_lock=self.db_lock, test=host_test_mode, image_path=self.config['image_path'], + version=self.config['version'], host_id=host['uuid'], develop_mode=host_develop_mode, develop_bridge_iface=host_develop_bridge_iface) thread.start() self.config['host_threads'][host['uuid']] = thread @@ -280,10 +247,155 @@ class ovim(): net['cidr'], net['gateway_ip']) + def _start_of_db_tasks(self): + """ + Start ofc task for existing ofcs in database + :param db_of: + :param db_lock: + :return: + """ + ofcs = self.get_of_controllers() + + for ofc in ofcs: + of_conn = self._load_of_module(ofc) + # create ofc thread per of controller + self._create_ofc_task(ofc['uuid'], ofc['dpid'], of_conn) + + def _create_ofc_task(self, ofc_uuid, dpid, of_conn): + """ + Create an ofc thread for handle each sdn controllers + :param ofc_uuid: sdn controller uuid + :param dpid: sdn controller dpid + :param of_conn: OF_conn module + :return: + """ + if 'ofcs_thread' not in self.config and 'ofcs_thread_dpid' not in self.config: + ofcs_threads = {} + ofcs_thread_dpid = [] + else: + ofcs_threads = self.config['ofcs_thread'] + ofcs_thread_dpid = self.config['ofcs_thread_dpid'] + + if ofc_uuid not in ofcs_threads: + ofc_thread = self._create_ofc_thread(of_conn, ofc_uuid) + if ofc_uuid == "Default": + self.config['of_thread'] = ofc_thread + + ofcs_threads[ofc_uuid] = ofc_thread + self.config['ofcs_thread'] = ofcs_threads + + ofcs_thread_dpid.append({dpid: ofc_thread}) + self.config['ofcs_thread_dpid'] = ofcs_thread_dpid + + def _start_ofc_default_task(self): + """ + Create default ofc thread + """ + if 'of_controller' not in self.config \ + and 'of_controller_ip' not in self.config \ + and 'of_controller_port' not in self.config \ + and 'of_controller_dpid' not in self.config: + return + + # OF THREAD + db_config = {} + db_config['ip'] = self.config.get('of_controller_ip') + db_config['port'] = self.config.get('of_controller_port') + db_config['dpid'] = self.config.get('of_controller_dpid') + db_config['type'] = self.config.get('of_controller') + db_config['user'] = self.config.get('of_user') + db_config['password'] = self.config.get('of_password') + + # create connector to the openflow controller + # load other parameters starting by of_ from config dict in a temporal dict + + of_conn = self._load_of_module(db_config) + # create openflow thread + self._create_ofc_task("Default", db_config['dpid'], of_conn) + + def _load_of_module(self, db_config): + """ + import python module for each SDN controller supported + :param default: SDN dn information + :return: Module + """ + if not db_config: + raise ovimException("No module found it", HTTP_Internal_Server_Error) + + module_info = None + + try: + if self.of_test_mode: + return oft.of_test_connector({"name": db_config['type'], "dpid": db_config['dpid'], + "of_debug": self.config['log_level_of']}) + temp_dict = {} + + if db_config: + temp_dict['of_ip'] = db_config['ip'] + temp_dict['of_port'] = db_config['port'] + temp_dict['of_dpid'] = db_config['dpid'] + temp_dict['of_controller'] = db_config['type'] + + temp_dict['of_debug'] = self.config['log_level_of'] + + if temp_dict['of_controller'] == 'opendaylight': + module = "ODL" + else: + module = temp_dict['of_controller'] + + if module not in ovim.of_module: + module_info = imp.find_module(module) + of_conn_module = imp.load_module("OF_conn", *module_info) + ovim.of_module[module] = of_conn_module + else: + of_conn_module = ovim.of_module[module] + + try: + return of_conn_module.OF_conn(temp_dict) + except Exception as e: + self.logger.error("Cannot open the Openflow controller '%s': %s", type(e).__name__, str(e)) + if module_info and module_info[0]: + file.close(module_info[0]) + raise ovimException("Cannot open the Openflow controller '{}': '{}'".format(type(e).__name__, str(e)), + HTTP_Internal_Server_Error) + except (IOError, ImportError) as e: + if module_info and module_info[0]: + file.close(module_info[0]) + self.logger.error("Cannot open openflow controller module '%s'; %s: %s; revise 'of_controller' " + "field of configuration file.", module, type(e).__name__, str(e)) + raise ovimException("Cannot open openflow controller module '{}'; {}: {}; revise 'of_controller' " + "field of configuration file.".format(module, type(e).__name__, str(e)), + HTTP_Internal_Server_Error) + + def _create_ofc_thread(self, of_conn, ofc_uuid="Default"): + """ + Create and launch a of thread + :return: thread obj + """ + # create openflow thread + + if 'of_controller_nets_with_same_vlan' in self.config: + ofc_net_same_vlan = self.config['of_controller_nets_with_same_vlan'] + else: + ofc_net_same_vlan = False + + thread = oft.openflow_thread(ofc_uuid, of_conn, of_test=self.of_test_mode, db=self.db_of, db_lock=self.db_lock, + pmp_with_same_vlan=ofc_net_same_vlan, debug=self.config['log_level_of']) + #r, c = thread.OF_connector.obtain_port_correspondence() + #if r < 0: + # raise ovimException("Cannot get openflow information %s", c) + thread.start() + return thread + def stop_service(self): threads = self.config.get('host_threads', {}) if 'of_thread' in self.config: threads['of'] = (self.config['of_thread']) + if 'ofcs_thread' in self.config: + ofcs_thread = self.config['ofcs_thread'] + for ofc in ofcs_thread: + threads[ofc] = ofcs_thread[ofc] + if 'dhcp_thread' in self.config: threads['dhcp'] = (self.config['dhcp_thread']) @@ -774,10 +886,15 @@ class ovim(): :return: openflow controller dpid """ - result, content = self.db.new_row('ofcs', ofc_data, True, True) + result, ofc_uuid = self.db.new_row('ofcs', ofc_data, True, True) if result < 0: - raise ovimException("New ofc Error %s" % content, HTTP_Internal_Server_Error) - return content + raise ovimException("New ofc Error %s" % ofc_uuid, HTTP_Internal_Server_Error) + + ofc_data['uuid'] = ofc_uuid + of_conn = self._load_of_module(ofc_data) + self._create_ofc_task(ofc_uuid, ofc_data['dpid'], of_conn) + + return ofc_uuid def edit_of_controller(self, of_id, ofc_data): """ @@ -807,11 +924,23 @@ class ovim(): :return: """ + ofc = self.show_of_controller(of_id) + result, content = self.db.delete_row("ofcs", of_id) if result < 0: raise ovimException("Cannot delete ofc from database: {}".format(content), http_code=-result) elif result == 0: raise ovimException("ofc {} not found ".format(content), http_code=HTTP_Not_Found) + + ofc_thread = self.config['ofcs_thread'][of_id] + del self.config['ofcs_thread'][of_id] + for ofc_th in self.config['ofcs_thread_dpid']: + if ofc['dpid'] in ofc_th: + self.config['ofcs_thread_dpid'].remove(ofc_th) + + ofc_thread.insert_task("exit") + #ofc_thread.join() + return content def show_of_controller(self, uuid): diff --git a/vim_schema.py b/vim_schema.py index 7da3a48..1dbd53c 100644 --- a/vim_schema.py +++ b/vim_schema.py @@ -73,8 +73,8 @@ config_schema = { "of_controller_nets_with_same_vlan": {"type" : "boolean"}, "of_controller": nameshort_schema, #{"type":"string", "enum":["floodlight", "opendaylight"]}, "of_controller_module": {"type":"string"}, - #"of_user": nameshort_schema, - #"of_password": nameshort_schema, + "of_user": nameshort_schema, + "of_password": nameshort_schema, "test_mode": {"type": "boolean"}, #leave for backward compatibility "mode": {"type":"string", "enum":["normal", "host only", "OF only", "development", "test"] }, "development_bridge": {"type":"string"}, @@ -126,8 +126,7 @@ config_schema = { "patternProperties": { "of_*" : {"type": ["string", "integer", "boolean"]} }, - "required": ['db_host', 'db_user', 'db_passwd', 'db_name', - 'of_controller_ip', 'of_controller_port', 'of_controller_dpid', 'of_controller'], + "required": ['db_host', 'db_user', 'db_passwd', 'db_name'], "additionalProperties": False }