Fix bug 1442: set SDN target in vim_info for NS VLD
[osm/LCM.git] / osm_lcm / lcm.py
index b306720..6c8be98 100644 (file)
@@ -29,23 +29,25 @@ import logging.handlers
 import getopt
 import sys
 
-from osm_lcm import ns
-from osm_lcm import vim_sdn
-from osm_lcm import netslice
-from osm_lcm import ROclient
+from osm_lcm import ns, prometheus, vim_sdn, netslice
+from osm_lcm.ng_ro import NgRoException, NgRoClient
+from osm_lcm.ROclient import ROClient, ROClientException
 
 from time import time
 from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
 from osm_lcm import version as lcm_version, version_date as lcm_version_date
 
-from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
+from osm_common import msglocal, msgkafka
 from osm_common import version as common_version
 from osm_common.dbbase import DbException
 from osm_common.fsbase import FsException
 from osm_common.msgbase import MsgException
+from osm_lcm.data_utils.database.database import Database
+from osm_lcm.data_utils.filesystem.filesystem import Filesystem
 from os import environ, path
 from random import choice as random_choice
 from n2vc import version as n2vc_version
+import traceback
 
 if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
     pdb.set_trace()
@@ -63,6 +65,8 @@ class Lcm:
 
     ping_interval_pace = 120  # how many time ping is send once is confirmed all is running
     ping_interval_boot = 5    # how many time ping is sent when booting
+    cfg_logger_name = {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs", "tsdb": "lcm.prometheus"}
+    # ^ contains for each section at lcm.cfg the used logger name
 
     def __init__(self, config_file, loop=None):
         """
@@ -86,13 +90,21 @@ class Lcm:
         config = self.read_config_file(config_file)
         self.config = config
         self.config["ro_config"] = {
-            "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
+            "ng": config["RO"].get("ng", False),
+            "uri": config["RO"].get("uri"),
             "tenant": config.get("tenant", "osm"),
-            "logger_name": "lcm.ROclient",
-            "loglevel": "ERROR",
+            "logger_name": "lcm.roclient",
+            "loglevel": config["RO"].get("loglevel", "ERROR"),
         }
+        if not self.config["ro_config"]["uri"]:
+            self.config["ro_config"]["uri"] = "http://{}:{}/".format(config["RO"]["host"], config["RO"]["port"])
+        elif "/ro" in self.config["ro_config"]["uri"][-4:] or "/openmano" in self.config["ro_config"]["uri"][-10:]:
+            # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
+            index = self.config["ro_config"]["uri"][-1].rfind("/")
+            self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index+1]
 
         self.loop = loop or asyncio.get_event_loop()
+        self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.k8srepo = None
 
         # logging
         log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
@@ -114,7 +126,7 @@ class Lcm:
             self.logger.setLevel(config["global"]["loglevel"])
 
         # logging other modules
-        for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
+        for k1, logname in self.cfg_logger_name.items():
             config[k1]["logger_name"] = logname
             logger_module = logging.getLogger(logname)
             if config[k1].get("logfile"):
@@ -138,26 +150,9 @@ class Lcm:
                 common_version, min_common_version))
 
         try:
-            # TODO check database version
-            if config["database"]["driver"] == "mongo":
-                self.db = dbmongo.DbMongo()
-                self.db.db_connect(config["database"])
-            elif config["database"]["driver"] == "memory":
-                self.db = dbmemory.DbMemory()
-                self.db.db_connect(config["database"])
-            else:
-                raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
-                    config["database"]["driver"]))
-
-            if config["storage"]["driver"] == "local":
-                self.fs = fslocal.FsLocal()
-                self.fs.fs_connect(config["storage"])
-            elif config["storage"]["driver"] == "mongo":
-                self.fs = fsmongo.FsMongo()
-                self.fs.fs_connect(config["storage"])
-            else:
-                raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
-                    config["storage"]["driver"]))
+            self.db = Database(config).instance.db
+
+            self.fs = Filesystem(config).instance.fs
 
             # copy message configuration in order to remove 'group_id' for msg_admin
             config_message = config["message"].copy()
@@ -182,32 +177,45 @@ class Lcm:
             raise LcmException(str(e))
 
         # contains created tasks/futures to be able to cancel
-        self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
+        self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
 
-        self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
-        self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
-        self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
-        self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
-        self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
-        self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
-        self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
+        if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
+            if self.config["tsdb"]["driver"] == "prometheus":
+                self.prometheus = prometheus.Prometheus(self.config["tsdb"], self.worker_id, self.loop)
+            else:
+                raise LcmException("Invalid configuration param '{}' at '[tsdb]':'driver'".format(
+                    config["tsdb"]["driver"]))
+        else:
+            self.prometheus = None
 
     async def check_RO_version(self):
         tries = 14
         last_error = None
         while True:
+            ro_uri = self.config["ro_config"]["uri"]
             try:
-                ro_server = ROclient.ROClient(self.loop, **self.config["ro_config"])
-                ro_version = await ro_server.get_version()
+                # try new  RO, if fail old RO
+                try:
+                    self.config["ro_config"]["uri"] = ro_uri + "ro"
+                    ro_server = NgRoClient(self.loop, **self.config["ro_config"])
+                    ro_version = await ro_server.get_version()
+                    self.config["ro_config"]["ng"] = True
+                except Exception:
+                    self.config["ro_config"]["uri"] = ro_uri + "openmano"
+                    ro_server = ROClient(self.loop, **self.config["ro_config"])
+                    ro_version = await ro_server.get_version()
+                    self.config["ro_config"]["ng"] = False
                 if versiontuple(ro_version) < versiontuple(min_RO_version):
                     raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
                         ro_version, min_RO_version))
-                self.logger.info("Connected to RO version {}".format(ro_version))
+                self.logger.info("Connected to RO version {} new-generation version {}".
+                                 format(ro_version, self.config["ro_config"]["ng"]))
                 return
-            except ROclient.ROClientException as e:
+            except (ROClientException, NgRoException) as e:
+                self.config["ro_config"]["uri"] = ro_uri
                 tries -= 1
-                error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["endpoint_url"],
-                                                                             e)
+                traceback.print_tb(e.__traceback__)
+                error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
                 if tries <= 0:
                     self.logger.critical(error_text)
                     raise LcmException(error_text)
@@ -402,8 +410,9 @@ class Lcm:
         elif topic == "vim_account":
             vim_id = params["_id"]
             if command in ("create", "created"):
-                task = asyncio.ensure_future(self.vim.create(params, order_id))
-                self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
+                if not self.config["ro_config"].get("ng"):
+                    task = asyncio.ensure_future(self.vim.create(params, order_id))
+                    self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
                 return
             elif command == "delete" or command == "deleted":
                 self.lcm_tasks.cancel(topic, vim_id)
@@ -415,16 +424,18 @@ class Lcm:
                 sys.stdout.flush()
                 return
             elif command in ("edit", "edited"):
-                task = asyncio.ensure_future(self.vim.edit(params, order_id))
-                self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
+                if not self.config["ro_config"].get("ng"):
+                    task = asyncio.ensure_future(self.vim.edit(params, order_id))
+                    self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
                 return
             elif command == "deleted":
                 return  # TODO cleaning of task just in case should be done
         elif topic == "wim_account":
             wim_id = params["_id"]
             if command in ("create", "created"):
-                task = asyncio.ensure_future(self.wim.create(params, order_id))
-                self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
+                if not self.config["ro_config"].get("ng"):
+                    task = asyncio.ensure_future(self.wim.create(params, order_id))
+                    self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
                 return
             elif command == "delete" or command == "deleted":
                 self.lcm_tasks.cancel(topic, wim_id)
@@ -444,8 +455,9 @@ class Lcm:
         elif topic == "sdn":
             _sdn_id = params["_id"]
             if command in ("create", "created"):
-                task = asyncio.ensure_future(self.sdn.create(params, order_id))
-                self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
+                if not self.config["ro_config"].get("ng"):
+                    task = asyncio.ensure_future(self.sdn.create(params, order_id))
+                    self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
                 return
             elif command == "delete" or command == "deleted":
                 self.lcm_tasks.cancel(topic, _sdn_id)
@@ -470,7 +482,7 @@ class Lcm:
                 topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
                 topics_admin = ("admin", )
                 await asyncio.gather(
-                    self.msg.aioread(topics, self.loop, self.kafka_read_callback),
+                    self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True),
                     self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False)
                 )
 
@@ -496,6 +508,19 @@ class Lcm:
         # check RO version
         self.loop.run_until_complete(self.check_RO_version())
 
+        self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop, self.prometheus)
+        self.netslice = netslice.NetsliceLcm(self.msg, self.lcm_tasks, self.config, self.loop,
+                                             self.ns)
+        self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+        self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+        self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+        self.k8scluster = vim_sdn.K8sClusterLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+        self.k8srepo = vim_sdn.K8sRepoLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+
+        # configure tsdb prometheus
+        if self.prometheus:
+            self.loop.run_until_complete(self.prometheus.start())
+
         self.loop.run_until_complete(asyncio.gather(
             self.kafka_read(),
             self.kafka_ping()