Handle ofcs thread creation form db and openvim.cfg 20/1220/6
authormirabal <leonardo.mirabal@altran.com>
Wed, 1 Mar 2017 15:17:10 +0000 (16:17 +0100)
committermirabal <leonardo.mirabal@altran.com>
Wed, 8 Mar 2017 11:07:27 +0000 (12:07 +0100)
- start_service create a ofc thread per sdn in db, during ofc creation and default openvimd.cfg
- stop_service kill ofcs thread
- OFC creation/delete via REST interface, handel creation/delete of thread

Change-Id: I0c1869b870b296bfb459c2f678bc7afe4d1938bb
Signed-off-by: mirabal <leonardo.mirabal@altran.com>
openflow_thread.py
openvimd.cfg
ovim.py
vim_schema.py

index 265b167..638ab19 100644 (file)
@@ -84,16 +84,18 @@ def change_db2of(flow):
     flow['actions'] = actions
 
 
-
 class of_test_connector():
     '''This is a fake openflow connector for testing.
         It does nothing and it is used for running openvim without an openflow controller 
     '''
     def __init__(self, params):
-        self.name = "ofc_test"
-        self.rules={}
+        name = params.get("name", "test-ofc")
+        self.name = name
+        self.dpid = params.get("dpid")
+        self.rules= {}
         self.logger = logging.getLogger('vim.OF.TEST')
-        self.logger.setLevel( getattr(logging, params.get("of_debug", "ERROR") ) )
+        self.logger.setLevel(getattr(logging, params.get("of_debug", "ERROR")))
+
     def get_of_switches(self):
         return 0, ()
     def obtain_port_correspondence(self):
@@ -121,18 +123,18 @@ class of_test_connector():
 
 
 class openflow_thread(threading.Thread):
-    def __init__(self, OF_connector, db, db_lock, of_test, pmp_with_same_vlan, debug='ERROR'):
+    def __init__(self, of_uuid, OF_connector, db, db_lock, of_test, pmp_with_same_vlan=False, debug='ERROR'):
         threading.Thread.__init__(self)
-
+        self.of_uuid = of_uuid
         self.db = db
         self.pmp_with_same_vlan = pmp_with_same_vlan
         self.name = "openflow"
         self.test = of_test
         self.db_lock = db_lock
         self.OF_connector = OF_connector
-        self.logger = logging.getLogger('vim.OF')
+        self.logger = logging.getLogger('vim.OF-' + of_uuid)
         self.logger.setLevel( getattr(logging, debug) )
-
+        self.logger.name = OF_connector.name + " " + self.OF_connector.dpid
         self.queueLock = threading.Lock()
         self.taskQueue = Queue.Queue(2000)
         
@@ -146,6 +148,7 @@ class openflow_thread(threading.Thread):
             return -1, "timeout inserting a task over openflow thread " + self.name
 
     def run(self):
+        self.logger.debug("Start openflow thread")
         while True:
             self.queueLock.acquire()
             if not self.taskQueue.empty():
index 148e992..9641938 100644 (file)
@@ -35,7 +35,7 @@
 mode: test
 
 #Openflow controller information
-of_controller:      floodlight                   # Type of controller to be used. 
+of_controller:      floodlight                   # Type of controller to be used.
                                                  # Valid controllers are 'opendaylight', 'floodlight' or <custom>
 #of_controller_module: module                    # Only needed for <custom>.  Python module that implement
                                                  # this controller. By default a file with the name  <custom>.py is used 
diff --git a/ovim.py b/ovim.py
index fc8b126..02114a1 100644 (file)
--- a/ovim.py
+++ b/ovim.py
@@ -86,11 +86,16 @@ class ovimException(Exception):
 
 class ovim():
     running_info = {} #TODO OVIM move the info of running threads from config_dic to this static variable
+    of_module = {}
+
     def __init__(self, configuration):
         self.config = configuration
         self.logger = logging.getLogger(configuration["logger_name"])
         self.db = None
         self.db =   self._create_database_connection()
+        self.db_lock = None
+        self.db_of = None
+        self.of_test_mode = False
 
     def _create_database_connection(self):
         db = vim_db.vim_db((self.config["network_vlan_range_start"], self.config["network_vlan_range_end"]),
@@ -140,27 +145,34 @@ class ovim():
             return False
 
     def start_service(self):
-        #if self.running_info:
+        """
+        Start ovim services
+        :return:
+        """
+        # if self.running_info:
         #    return  #TODO service can be checked and rebuild broken threads
         r = self.db.get_db_version()
-        if r[0]<0:
+        if r[0] < 0:
             raise ovimException("DATABASE is not a VIM one or it is a '0.0' version. Try to upgrade to version '{}' with "\
                                 "'./database_utils/migrate_vim_db.sh'".format(self.config["database_version"]) )
-        elif r[1]!=self.config["database_version"]:
+        elif r[1] != self.config["database_version"]:
             raise ovimException("DATABASE wrong version '{}'. Try to upgrade/downgrade to version '{}' with "\
                                 "'./database_utils/migrate_vim_db.sh'".format(r[1], self.config["database_version"]) )
 
         # create database connection for openflow threads
-        db_of = self._create_database_connection()
-        self.config["db"] = db_of
-        db_lock = threading.Lock()
-        self.config["db_lock"] = db_lock
+        self.db_of = self._create_database_connection()
+        self.config["db"] = self.db_of
+        self.db_lock = threading.Lock()
+        self.config["db_lock"] = self.db_lock
+
+        self.of_test_mode = False if self.config['mode'] == 'normal' or self.config['mode'] == "OF only" else True
+        # precreate interfaces; [bridge:<host_bridge_name>, VLAN used at Host, uuid of network camping in this bridge,
+        # speed in Gbit/s
 
-        # precreate interfaces; [bridge:<host_bridge_name>, VLAN used at Host, uuid of network camping in this bridge, speed in Gbit/s
         self.config['dhcp_nets'] = []
         self.config['bridge_nets'] = []
         for bridge, vlan_speed in self.config["bridge_ifaces"].items():
-        # skip 'development_bridge'
+            # skip 'development_bridge'
             if self.config['mode'] == 'development' and self.config['development_bridge'] == bridge:
                 continue
             self.config['bridge_nets'].append([bridge, vlan_speed[0], vlan_speed[1], None])
@@ -168,7 +180,7 @@ class ovim():
         # check if this bridge is already used (present at database) for a network)
         used_bridge_nets = []
         for brnet in self.config['bridge_nets']:
-            r, nets = db_of.get_table(SELECT=('uuid',), FROM='nets', WHERE={'provider': "bridge:" + brnet[0]})
+            r, nets = self.db_of.get_table(SELECT=('uuid',), FROM='nets', WHERE={'provider': "bridge:" + brnet[0]})
             if r > 0:
                 brnet[3] = nets[0]['uuid']
                 used_bridge_nets.append(brnet[0])
@@ -180,73 +192,22 @@ class ovim():
         # get nets used by dhcp
         if self.config.get("dhcp_server"):
             for net in self.config["dhcp_server"].get("nets", ()):
-                r, nets = db_of.get_table(SELECT=('uuid',), FROM='nets', WHERE={'name': net})
+                r, nets = self.db_of.get_table(SELECT=('uuid',), FROM='nets', WHERE={'name': net})
                 if r > 0:
                     self.config['dhcp_nets'].append(nets[0]['uuid'])
 
-        # get host list from data base before starting threads
-        r, hosts = db_of.get_table(SELECT=('name', 'ip_name', 'user', 'uuid'), FROM='hosts', WHERE={'status': 'ok'})
-        if r < 0:
-            raise ovimException("Cannot get hosts from database {}".format(hosts))
-        # create connector to the openflow controller
-        of_test_mode = False if self.config['mode'] == 'normal' or self.config['mode'] == "OF only" else True
-
-        if of_test_mode:
-            OF_conn = oft.of_test_connector({"of_debug": self.config['log_level_of']})
-        else:
-            # load other parameters starting by of_ from config dict in a temporal dict
-            temp_dict = {"of_ip": self.config['of_controller_ip'],
-                         "of_port": self.config['of_controller_port'],
-                         "of_dpid": self.config['of_controller_dpid'],
-                         "of_debug": self.config['log_level_of']
-                         }
-            for k, v in self.config.iteritems():
-                if type(k) is str and k[0:3] == "of_" and k[0:13] != "of_controller":
-                    temp_dict[k] = v
-            if self.config['of_controller'] == 'opendaylight':
-                module = "ODL"
-            elif "of_controller_module" in self.config:
-                module = self.config["of_controller_module"]
-            else:
-                module = self.config['of_controller']
-            module_info = None
-            try:
-                module_info = imp.find_module(module)
+        # OFC default
+        self._start_ofc_default_task()
 
-                OF_conn = imp.load_module("OF_conn", *module_info)
-                try:
-                    OF_conn = OF_conn.OF_conn(temp_dict)
-                except Exception as e:
-                    self.logger.error("Cannot open the Openflow controller '%s': %s", type(e).__name__, str(e))
-                    if module_info and module_info[0]:
-                        file.close(module_info[0])
-                    exit(-1)
-            except (IOError, ImportError) as e:
-                if module_info and module_info[0]:
-                    file.close(module_info[0])
-                self.logger.error(
-                    "Cannot open openflow controller module '%s'; %s: %s; revise 'of_controller' field of configuration file.",
-                    module, type(e).__name__, str(e))
-                raise ovimException("Cannot open openflow controller module '{}'; {}: {}; revise 'of_controller' field of configuration file.".fromat(
-                        module, type(e).__name__, str(e)))
-
-
-                # create openflow thread
-        thread = oft.openflow_thread(OF_conn, of_test=of_test_mode, db=db_of, db_lock=db_lock,
-                                     pmp_with_same_vlan=self.config['of_controller_nets_with_same_vlan'],
-                                     debug=self.config['log_level_of'])
-        r, c = thread.OF_connector.obtain_port_correspondence()
-        if r < 0:
-            raise ovimException("Cannot get openflow information %s", c)
-        thread.start()
-        self.config['of_thread'] = thread
+        # OFC per tenant in DB
+        self._start_of_db_tasks()
 
         # create dhcp_server thread
         host_test_mode = True if self.config['mode'] == 'test' or self.config['mode'] == "OF only" else False
         dhcp_params = self.config.get("dhcp_server")
         if dhcp_params:
             thread = dt.dhcp_thread(dhcp_params=dhcp_params, test=host_test_mode, dhcp_nets=self.config["dhcp_nets"],
-                                    db=db_of, db_lock=db_lock, debug=self.config['log_level_of'])
+                                    db=self.db_of, db_lock=self.db_lock, debug=self.config['log_level_of'])
             thread.start()
             self.config['dhcp_thread'] = thread
 
@@ -254,12 +215,18 @@ class ovim():
         host_test_mode = True if self.config['mode'] == 'test' or self.config['mode'] == "OF only" else False
         host_develop_mode = True if self.config['mode'] == 'development' else False
         host_develop_bridge_iface = self.config.get('development_bridge', None)
+
+        # get host list from data base before starting threads
+        r, hosts = self.db_of.get_table(SELECT=('name', 'ip_name', 'user', 'uuid'), FROM='hosts', WHERE={'status': 'ok'})
+        if r < 0:
+            raise ovimException("Cannot get hosts from database {}".format(hosts))
+
         self.config['host_threads'] = {}
         for host in hosts:
             host['image_path'] = '/opt/VNF/images/openvim'
-            thread = ht.host_thread(name=host['name'], user=host['user'], host=host['ip_name'], db=db_of, db_lock=db_lock,
-                                    test=host_test_mode, image_path=self.config['image_path'], version=self.config['version'],
-                                    host_id=host['uuid'], develop_mode=host_develop_mode,
+            thread = ht.host_thread(name=host['name'], user=host['user'], host=host['ip_name'], db=self.db_of,
+                                    db_lock=self.db_lock, test=host_test_mode, image_path=self.config['image_path'],
+                                    version=self.config['version'], host_id=host['uuid'], develop_mode=host_develop_mode,
                                     develop_bridge_iface=host_develop_bridge_iface)
             thread.start()
             self.config['host_threads'][host['uuid']] = thread
@@ -280,10 +247,155 @@ class ovim():
                                             net['cidr'],
                                             net['gateway_ip'])
 
+    def _start_of_db_tasks(self):
+        """
+        Start ofc task for existing ofcs in database
+        :param db_of:
+        :param db_lock:
+        :return:
+        """
+        ofcs = self.get_of_controllers()
+
+        for ofc in ofcs:
+            of_conn = self._load_of_module(ofc)
+            # create ofc thread per of controller
+            self._create_ofc_task(ofc['uuid'], ofc['dpid'], of_conn)
+
+    def _create_ofc_task(self, ofc_uuid, dpid, of_conn):
+        """
+        Create an ofc thread for handle each sdn controllers
+        :param ofc_uuid: sdn controller uuid
+        :param dpid:  sdn controller dpid
+        :param of_conn: OF_conn module
+        :return:
+        """
+        if 'ofcs_thread' not in self.config and 'ofcs_thread_dpid' not in self.config:
+            ofcs_threads = {}
+            ofcs_thread_dpid = []
+        else:
+            ofcs_threads = self.config['ofcs_thread']
+            ofcs_thread_dpid = self.config['ofcs_thread_dpid']
+
+        if ofc_uuid not in ofcs_threads:
+            ofc_thread = self._create_ofc_thread(of_conn, ofc_uuid)
+            if ofc_uuid == "Default":
+                self.config['of_thread'] = ofc_thread
+
+            ofcs_threads[ofc_uuid] = ofc_thread
+            self.config['ofcs_thread'] = ofcs_threads
+
+            ofcs_thread_dpid.append({dpid: ofc_thread})
+            self.config['ofcs_thread_dpid'] = ofcs_thread_dpid
+
+    def _start_ofc_default_task(self):
+        """
+        Create default ofc thread
+        """
+        if 'of_controller' not in self.config \
+                and 'of_controller_ip' not in self.config \
+                and 'of_controller_port' not in self.config \
+                and 'of_controller_dpid' not in self.config:
+            return
+
+        # OF THREAD
+        db_config = {}
+        db_config['ip'] = self.config.get('of_controller_ip')
+        db_config['port'] = self.config.get('of_controller_port')
+        db_config['dpid'] = self.config.get('of_controller_dpid')
+        db_config['type'] = self.config.get('of_controller')
+        db_config['user'] = self.config.get('of_user')
+        db_config['password'] = self.config.get('of_password')
+
+        # create connector to the openflow controller
+        # load other parameters starting by of_ from config dict in a temporal dict
+
+        of_conn = self._load_of_module(db_config)
+        # create openflow thread
+        self._create_ofc_task("Default", db_config['dpid'], of_conn)
+
+    def _load_of_module(self, db_config):
+        """
+        import python module for each SDN controller supported
+        :param default: SDN dn information
+        :return: Module
+        """
+        if not db_config:
+            raise ovimException("No module found it", HTTP_Internal_Server_Error)
+
+        module_info = None
+
+        try:
+            if self.of_test_mode:
+                return  oft.of_test_connector({"name": db_config['type'], "dpid": db_config['dpid'],
+                                               "of_debug": self.config['log_level_of']})
+            temp_dict = {}
+
+            if db_config:
+                temp_dict['of_ip'] = db_config['ip']
+                temp_dict['of_port'] = db_config['port']
+                temp_dict['of_dpid'] = db_config['dpid']
+                temp_dict['of_controller'] = db_config['type']
+
+            temp_dict['of_debug'] = self.config['log_level_of']
+
+            if temp_dict['of_controller'] == 'opendaylight':
+                module = "ODL"
+            else:
+                module = temp_dict['of_controller']
+
+            if module not in ovim.of_module:
+                module_info = imp.find_module(module)
+                of_conn_module = imp.load_module("OF_conn", *module_info)
+                ovim.of_module[module] = of_conn_module
+            else:
+                of_conn_module = ovim.of_module[module]
+
+            try:
+                return of_conn_module.OF_conn(temp_dict)
+            except Exception as e:
+                self.logger.error("Cannot open the Openflow controller '%s': %s", type(e).__name__, str(e))
+                if module_info and module_info[0]:
+                    file.close(module_info[0])
+                raise ovimException("Cannot open the Openflow controller '{}': '{}'".format(type(e).__name__, str(e)),
+                                    HTTP_Internal_Server_Error)
+        except (IOError, ImportError) as e:
+            if module_info and module_info[0]:
+                file.close(module_info[0])
+            self.logger.error("Cannot open openflow controller module '%s'; %s: %s; revise 'of_controller' "
+                              "field of configuration file.", module, type(e).__name__, str(e))
+            raise ovimException("Cannot open openflow controller module '{}'; {}: {}; revise 'of_controller' "
+                                "field of configuration file.".format(module, type(e).__name__, str(e)),
+                                HTTP_Internal_Server_Error)
+
+    def _create_ofc_thread(self, of_conn, ofc_uuid="Default"):
+        """
+        Create and launch a of thread
+        :return: thread obj
+        """
+        # create openflow thread
+
+        if 'of_controller_nets_with_same_vlan' in self.config:
+            ofc_net_same_vlan = self.config['of_controller_nets_with_same_vlan']
+        else:
+            ofc_net_same_vlan = False
+
+        thread = oft.openflow_thread(ofc_uuid, of_conn, of_test=self.of_test_mode, db=self.db_of, db_lock=self.db_lock,
+                                     pmp_with_same_vlan=ofc_net_same_vlan, debug=self.config['log_level_of'])
+        #r, c = thread.OF_connector.obtain_port_correspondence()
+        #if r < 0:
+        #    raise ovimException("Cannot get openflow information %s", c)
+        thread.start()
+        return thread
+
     def stop_service(self):
         threads = self.config.get('host_threads', {})
         if 'of_thread' in self.config:
             threads['of'] = (self.config['of_thread'])
+        if 'ofcs_thread' in self.config:
+            ofcs_thread = self.config['ofcs_thread']
+            for ofc in ofcs_thread:
+                threads[ofc] = ofcs_thread[ofc]
+
         if 'dhcp_thread' in self.config:
             threads['dhcp'] = (self.config['dhcp_thread'])
 
@@ -774,10 +886,15 @@ class ovim():
         :return: openflow controller dpid
         """
 
-        result, content = self.db.new_row('ofcs', ofc_data, True, True)
+        result, ofc_uuid = self.db.new_row('ofcs', ofc_data, True, True)
         if result < 0:
-            raise ovimException("New ofc Error %s" % content, HTTP_Internal_Server_Error)
-        return content
+            raise ovimException("New ofc Error %s" % ofc_uuid, HTTP_Internal_Server_Error)
+
+        ofc_data['uuid'] = ofc_uuid
+        of_conn = self._load_of_module(ofc_data)
+        self._create_ofc_task(ofc_uuid, ofc_data['dpid'], of_conn)
+
+        return ofc_uuid
 
     def edit_of_controller(self, of_id, ofc_data):
         """
@@ -807,11 +924,23 @@ class ovim():
         :return:
         """
 
+        ofc = self.show_of_controller(of_id)
+
         result, content = self.db.delete_row("ofcs", of_id)
         if result < 0:
             raise ovimException("Cannot delete ofc from database: {}".format(content), http_code=-result)
         elif result == 0:
             raise ovimException("ofc {} not found ".format(content), http_code=HTTP_Not_Found)
+
+        ofc_thread = self.config['ofcs_thread'][of_id]
+        del self.config['ofcs_thread'][of_id]
+        for ofc_th in self.config['ofcs_thread_dpid']:
+            if ofc['dpid'] in ofc_th:
+                self.config['ofcs_thread_dpid'].remove(ofc_th)
+
+        ofc_thread.insert_task("exit")
+        #ofc_thread.join()
+
         return content
 
     def show_of_controller(self, uuid):
index 7da3a48..1dbd53c 100644 (file)
@@ -73,8 +73,8 @@ config_schema = {
         "of_controller_nets_with_same_vlan": {"type" : "boolean"},
         "of_controller": nameshort_schema, #{"type":"string", "enum":["floodlight", "opendaylight"]},
         "of_controller_module": {"type":"string"},
-        #"of_user": nameshort_schema,
-        #"of_password": nameshort_schema,
+        "of_user": nameshort_schema,
+        "of_password": nameshort_schema,
         "test_mode": {"type": "boolean"}, #leave for backward compatibility
         "mode": {"type":"string", "enum":["normal", "host only", "OF only", "development", "test"] },
         "development_bridge": {"type":"string"},
@@ -126,8 +126,7 @@ config_schema = {
     "patternProperties": {
         "of_*" : {"type": ["string", "integer", "boolean"]}
     },
-    "required": ['db_host', 'db_user', 'db_passwd', 'db_name',
-            'of_controller_ip', 'of_controller_port', 'of_controller_dpid', 'of_controller'],
+    "required": ['db_host', 'db_user', 'db_passwd', 'db_name'],
     "additionalProperties": False
 }