from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
from osm_lcm import version as lcm_version, version_date as lcm_version_date
-from osm_common import dbmemory, dbmongo, fslocal, fsmongo, msglocal, msgkafka
+from osm_common import msglocal, msgkafka
from osm_common import version as common_version
from osm_common.dbbase import DbException
from osm_common.fsbase import FsException
from osm_common.msgbase import MsgException
+from osm_lcm.data_utils.database.database import Database
+from osm_lcm.data_utils.filesystem.filesystem import Filesystem
from os import environ, path
from random import choice as random_choice
from n2vc import version as n2vc_version
+import traceback
if os.getenv('OSMLCM_PDB_DEBUG', None) is not None:
pdb.set_trace()
"loglevel": config["RO"].get("loglevel", "ERROR"),
}
if not self.config["ro_config"]["uri"]:
- if not self.config["ro_config"]["ng"]:
- self.config["ro_config"]["uri"] = "http://{}:{}/openmano".format(config["RO"]["host"],
- config["RO"]["port"])
- else:
- self.config["ro_config"]["uri"] = "http://{}:{}/ro".format(config["RO"]["host"], config["RO"]["port"])
+ self.config["ro_config"]["uri"] = "http://{}:{}/".format(config["RO"]["host"], config["RO"]["port"])
+ elif "/ro" in self.config["ro_config"]["uri"][-4:] or "/openmano" in self.config["ro_config"]["uri"][-10:]:
+ # uri ends with '/ro', '/ro/', '/openmano', '/openmano/'
+ index = self.config["ro_config"]["uri"][-1].rfind("/")
+ self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index+1]
self.loop = loop or asyncio.get_event_loop()
+ self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.k8srepo = None
# logging
log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
common_version, min_common_version))
try:
- # TODO check database version
- if config["database"]["driver"] == "mongo":
- self.db = dbmongo.DbMongo()
- self.db.db_connect(config["database"])
- elif config["database"]["driver"] == "memory":
- self.db = dbmemory.DbMemory()
- self.db.db_connect(config["database"])
- else:
- raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
- config["database"]["driver"]))
-
- if config["storage"]["driver"] == "local":
- self.fs = fslocal.FsLocal()
- self.fs.fs_connect(config["storage"])
- elif config["storage"]["driver"] == "mongo":
- self.fs = fsmongo.FsMongo()
- self.fs.fs_connect(config["storage"])
- else:
- raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
- config["storage"]["driver"]))
+ self.db = Database(config).instance.db
+
+ self.fs = Filesystem(config).instance.fs
+ self.fs.sync()
# copy message configuration in order to remove 'group_id' for msg_admin
config_message = config["message"].copy()
raise LcmException(str(e))
# contains created tasks/futures to be able to cancel
- self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
+ self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
if self.config.get("tsdb") and self.config["tsdb"].get("driver"):
if self.config["tsdb"]["driver"] == "prometheus":
- self.prometheus = prometheus.Prometheus(self.config["tsdb"], self.worker_id, self.db, self.loop)
+ self.prometheus = prometheus.Prometheus(self.config["tsdb"], self.worker_id, self.loop)
else:
raise LcmException("Invalid configuration param '{}' at '[tsdb]':'driver'".format(
config["tsdb"]["driver"]))
else:
self.prometheus = None
- self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop, self.prometheus)
- self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop,
- self.ns)
- self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
- self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
- self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
- self.k8scluster = vim_sdn.K8sClusterLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
- self.k8srepo = vim_sdn.K8sRepoLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
async def check_RO_version(self):
tries = 14
last_error = None
while True:
+ ro_uri = self.config["ro_config"]["uri"]
try:
- if self.config["ro_config"].get("ng"):
+ # try new RO, if fail old RO
+ try:
+ self.config["ro_config"]["uri"] = ro_uri + "ro"
ro_server = NgRoClient(self.loop, **self.config["ro_config"])
- else:
+ ro_version = await ro_server.get_version()
+ self.config["ro_config"]["ng"] = True
+ except Exception:
+ self.config["ro_config"]["uri"] = ro_uri + "openmano"
ro_server = ROClient(self.loop, **self.config["ro_config"])
- ro_version = await ro_server.get_version()
+ ro_version = await ro_server.get_version()
+ self.config["ro_config"]["ng"] = False
if versiontuple(ro_version) < versiontuple(min_RO_version):
raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format(
ro_version, min_RO_version))
- self.logger.info("Connected to RO version {}".format(ro_version))
+ self.logger.info("Connected to RO version {} new-generation version {}".
+ format(ro_version, self.config["ro_config"]["ng"]))
return
except (ROClientException, NgRoException) as e:
+ self.config["ro_config"]["uri"] = ro_uri
tries -= 1
+ traceback.print_tb(e.__traceback__)
error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e)
if tries <= 0:
self.logger.critical(error_text)
elif topic == "vim_account":
vim_id = params["_id"]
if command in ("create", "created"):
- task = asyncio.ensure_future(self.vim.create(params, order_id))
- self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
+ if not self.config["ro_config"].get("ng"):
+ task = asyncio.ensure_future(self.vim.create(params, order_id))
+ self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task)
return
elif command == "delete" or command == "deleted":
self.lcm_tasks.cancel(topic, vim_id)
sys.stdout.flush()
return
elif command in ("edit", "edited"):
- task = asyncio.ensure_future(self.vim.edit(params, order_id))
- self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
+ if not self.config["ro_config"].get("ng"):
+ task = asyncio.ensure_future(self.vim.edit(params, order_id))
+ self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
return
elif command == "deleted":
return # TODO cleaning of task just in case should be done
elif topic == "wim_account":
wim_id = params["_id"]
if command in ("create", "created"):
- task = asyncio.ensure_future(self.wim.create(params, order_id))
- self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
+ if not self.config["ro_config"].get("ng"):
+ task = asyncio.ensure_future(self.wim.create(params, order_id))
+ self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
return
elif command == "delete" or command == "deleted":
self.lcm_tasks.cancel(topic, wim_id)
elif topic == "sdn":
_sdn_id = params["_id"]
if command in ("create", "created"):
- task = asyncio.ensure_future(self.sdn.create(params, order_id))
- self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
+ if not self.config["ro_config"].get("ng"):
+ task = asyncio.ensure_future(self.sdn.create(params, order_id))
+ self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task)
return
elif command == "delete" or command == "deleted":
self.lcm_tasks.cancel(topic, _sdn_id)
# check RO version
self.loop.run_until_complete(self.check_RO_version())
+ self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop, self.prometheus)
+ self.netslice = netslice.NetsliceLcm(self.msg, self.lcm_tasks, self.config, self.loop,
+ self.ns)
+ self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+ self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+ self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+ self.k8scluster = vim_sdn.K8sClusterLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+ self.k8srepo = vim_sdn.K8sRepoLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+
# configure tsdb prometheus
if self.prometheus:
self.loop.run_until_complete(self.prometheus.start())