X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=48c07eb3961f1761668975789b9f9ad149168a3c;hb=e64f7fb3e0efd6fcc78ea322c90106c7403a8a62;hp=a42f1cb400823109b6575d4a74aae5a33d1e16db;hpb=3e359b1f0c36fb97145b0bfcbd4d8cc89117924a;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index a42f1cb..48c07eb 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -23,30 +23,29 @@ import logging import logging.handlers import getopt import sys -import ROclient -import ns -import vim_sdn -import netslice + +from osm_lcm import ROclient, ns, vim_sdn, netslice from time import time, sleep -from lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit +from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit +from osm_lcm import version as lcm_version, version_date as lcm_version_date -# from osm_lcm import version as lcm_version, version_date as lcm_version_date, ROclient from osm_common import dbmemory, dbmongo, fslocal, msglocal, msgkafka from osm_common import version as common_version from osm_common.dbbase import DbException from osm_common.fsbase import FsException from osm_common.msgbase import MsgException from os import environ, path +from random import choice as random_choice from n2vc import version as n2vc_version __author__ = "Alfonso Tierno" -min_RO_version = [0, 6, 3] +min_RO_version = "6.0.2" min_n2vc_version = "0.0.2" -min_common_version = "0.1.11" +min_common_version = "0.1.19" # uncomment if LCM is installed as library and installed, and get them from __init__.py -lcm_version = '0.1.35' -lcm_version_date = '2019-01-31' +# lcm_version = '0.1.41' +# lcm_version_date = '2019-06-19' health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file @@ -64,15 +63,16 @@ class Lcm: self.db = None self.msg = None + self.msg_admin = None self.fs = None self.pings_not_received = 1 self.consecutive_errors = 0 self.first_start = False - # contains created tasks/futures to be able to cancel - self.lcm_tasks = TaskRegistry() # logging self.logger = logging.getLogger('lcm') + # get id + self.worker_id = self.get_process_id() # load configuration config = self.read_config_file(config_file) self.config = config @@ -154,9 +154,15 @@ class Lcm: if config_message["driver"] == "local": self.msg = msglocal.MsgLocal() self.msg.connect(config_message) + self.msg_admin = msglocal.MsgLocal() + config_message.pop("group_id", None) + self.msg_admin.connect(config_message) elif config_message["driver"] == "kafka": self.msg = msgkafka.MsgKafka() self.msg.connect(config_message) + self.msg_admin = msgkafka.MsgKafka() + config_message.pop("group_id", None) + self.msg_admin.connect(config_message) else: raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format( config["message"]["driver"])) @@ -164,6 +170,9 @@ class Lcm: self.logger.critical(str(e), exc_info=True) raise LcmException(str(e)) + # contains created tasks/futures to be able to cancel + self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger) + self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop) self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop) @@ -172,17 +181,27 @@ class Lcm: self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop) async def check_RO_version(self): - try: - RO = ROclient.ROClient(self.loop, **self.ro_config) - RO_version = await RO.get_version() - if RO_version < min_RO_version: - raise LcmException("Not compatible osm/RO version '{}.{}.{}'. Needed '{}.{}.{}' or higher".format( - *RO_version, *min_RO_version - )) - except ROclient.ROClientException as e: - error_text = "Error while conneting to osm/RO " + str(e) - self.logger.critical(error_text, exc_info=True) - raise LcmException(error_text) + tries = 14 + last_error = None + while True: + try: + ro_server = ROclient.ROClient(self.loop, **self.ro_config) + ro_version = await ro_server.get_version() + if versiontuple(ro_version) < versiontuple(min_RO_version): + raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format( + ro_version, min_RO_version)) + self.logger.info("Connected to RO version {}".format(ro_version)) + return + except ROclient.ROClientException as e: + tries -= 1 + error_text = "Error while connecting to RO on {}: {}".format(self.ro_config["endpoint_url"], e) + if tries <= 0: + self.logger.critical(error_text) + raise LcmException(error_text) + if last_error != error_text: + last_error = error_text + self.logger.error(error_text + ". Waiting until {} seconds".format(5*tries)) + await asyncio.sleep(5) async def test(self, param=None): self.logger.debug("Starting/Ending test task: {}".format(param)) @@ -195,7 +214,10 @@ class Lcm: self.pings_not_received = 1 while True: try: - await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop) + await self.msg_admin.aiowrite( + "admin", "ping", + {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version}, + self.loop) # time between pings are low when it is not received and at starting wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace if not self.pings_not_received: @@ -216,7 +238,7 @@ class Lcm: raise consecutive_errors += 1 self.logger.error("Task kafka_read retrying after Exception {}".format(e)) - wait_time = 1 if not first_start else 5 + wait_time = 2 if not first_start else 5 await asyncio.sleep(wait_time, loop=self.loop) def kafka_read_callback(self, topic, command, params): @@ -242,6 +264,8 @@ class Lcm: if topic == "admin": if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm": + if params.get("worker_id") != self.worker_id: + return self.pings_not_received = 0 try: with open(health_check_file, "w") as f: @@ -343,7 +367,7 @@ class Lcm: return elif command == "delete": self.lcm_tasks.cancel(topic, vim_id) - task = asyncio.ensure_future(self.vim.delete(vim_id, order_id)) + task = asyncio.ensure_future(self.vim.delete(params, order_id)) self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task) return elif command == "show": @@ -362,7 +386,7 @@ class Lcm: return elif command == "delete": self.lcm_tasks.cancel(topic, wim_id) - task = asyncio.ensure_future(self.wim.delete(wim_id, order_id)) + task = asyncio.ensure_future(self.wim.delete(params, order_id)) self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task) return elif command == "show": @@ -381,7 +405,7 @@ class Lcm: return elif command == "delete": self.lcm_tasks.cancel(topic, _sdn_id) - task = asyncio.ensure_future(self.sdn.delete(_sdn_id, order_id)) + task = asyncio.ensure_future(self.sdn.delete(params, order_id)) self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task) return elif command == "edit": @@ -391,14 +415,18 @@ class Lcm: self.logger.critical("unknown topic {} and command '{}'".format(topic, command)) async def kafka_read(self): - self.logger.debug("Task kafka_read Enter") + self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id)) # future = asyncio.Future() self.consecutive_errors = 0 self.first_start = True while self.consecutive_errors < 10: try: - topics = ("admin", "ns", "vim_account", "wim_account", "sdn", "nsi") - await self.msg.aioread(topics, self.loop, self.kafka_read_callback) + topics = ("ns", "vim_account", "wim_account", "sdn", "nsi") + topics_admin = ("admin", ) + await asyncio.gather( + self.msg.aioread(topics, self.loop, self.kafka_read_callback), + self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False) + ) except LcmExceptionExit: self.logger.debug("Bye!") @@ -442,6 +470,8 @@ class Lcm: self.db.db_disconnect() if self.msg: self.msg.disconnect() + if self.msg_admin: + self.msg_admin.disconnect() if self.fs: self.fs.fs_disconnect() @@ -477,6 +507,26 @@ class Lcm: self.logger.critical("At config file '{}': {}".format(config_file, e)) exit(1) + @staticmethod + def get_process_id(): + """ + Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it + will provide a random one + :return: Obtained ID + """ + # Try getting docker id. If fails, get pid + try: + with open("/proc/self/cgroup", "r") as f: + text_id_ = f.readline() + _, _, text_id = text_id_.rpartition("/") + text_id = text_id.replace('\n', '')[:12] + if text_id: + return text_id + except Exception: + pass + # Return a random id + return ''.join(random_choice("0123456789abcdef") for _ in range(12)) + def usage(): print("""Usage: {} [options]