X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=c4e5eed8bb3948af92b4431502be67f7e8240e30;hb=9597c29234cffe76f9ae2bd7bcaa0955e010b4b4;hp=ca89b8ba78d30a457c6d7de120d5c5d986429c5f;hpb=a009e55ba79a19ce8c9b04fac4e5554550d94975;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index ca89b8b..c4e5eed 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -27,6 +27,7 @@ import ROclient import ns import vim_sdn import netslice +from time import time, sleep from lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit # from osm_lcm import version as lcm_version, version_date as lcm_version_date, ROclient @@ -36,16 +37,18 @@ from osm_common.dbbase import DbException from osm_common.fsbase import FsException from osm_common.msgbase import MsgException from os import environ, path +from random import choice as random_choice from n2vc import version as n2vc_version __author__ = "Alfonso Tierno" min_RO_version = [0, 6, 3] min_n2vc_version = "0.0.2" -min_common_version = "0.1.11" +min_common_version = "0.1.19" # uncomment if LCM is installed as library and installed, and get them from __init__.py -lcm_version = '0.1.33' -lcm_version_date = '2019-01-31' +lcm_version = '0.1.37' +lcm_version_date = '2019-04-30' +health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file class Lcm: @@ -62,6 +65,7 @@ class Lcm: self.db = None self.msg = None + self.msg_admin = None self.fs = None self.pings_not_received = 1 self.consecutive_errors = 0 @@ -71,6 +75,8 @@ class Lcm: self.lcm_tasks = TaskRegistry() # logging self.logger = logging.getLogger('lcm') + # get id + self.worker_id = self.get_process_id() # load configuration config = self.read_config_file(config_file) self.config = config @@ -152,9 +158,15 @@ class Lcm: if config_message["driver"] == "local": self.msg = msglocal.MsgLocal() self.msg.connect(config_message) + self.msg_admin = msglocal.MsgLocal() + config_message.pop("group_id", None) + self.msg_admin.connect(config_message) elif config_message["driver"] == "kafka": self.msg = msgkafka.MsgKafka() self.msg.connect(config_message) + self.msg_admin = msgkafka.MsgKafka() + config_message.pop("group_id", None) + self.msg_admin.connect(config_message) else: raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format( config["message"]["driver"])) @@ -193,7 +205,10 @@ class Lcm: self.pings_not_received = 1 while True: try: - await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop) + await self.msg_admin.aiowrite( + "admin", "ping", + {"from": "lcm", "to": "lcm", "worker_id": self.worker_id, "version": lcm_version}, + self.loop) # time between pings are low when it is not received and at starting wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace if not self.pings_not_received: @@ -214,7 +229,7 @@ class Lcm: raise consecutive_errors += 1 self.logger.error("Task kafka_read retrying after Exception {}".format(e)) - wait_time = 1 if not first_start else 5 + wait_time = 2 if not first_start else 5 await asyncio.sleep(wait_time, loop=self.loop) def kafka_read_callback(self, topic, command, params): @@ -240,7 +255,14 @@ class Lcm: if topic == "admin": if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm": + if params.get("worker_id") != self.worker_id: + return self.pings_not_received = 0 + try: + with open(health_check_file, "w") as f: + f.write(str(time())) + except Exception as e: + self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e)) return elif topic == "ns": if command == "instantiate": @@ -384,14 +406,18 @@ class Lcm: self.logger.critical("unknown topic {} and command '{}'".format(topic, command)) async def kafka_read(self): - self.logger.debug("Task kafka_read Enter") + self.logger.debug("Task kafka_read Enter with worker_id={}".format(self.worker_id)) # future = asyncio.Future() self.consecutive_errors = 0 self.first_start = True while self.consecutive_errors < 10: try: - topics = ("admin", "ns", "vim_account", "wim_account", "sdn", "nsi") - await self.msg.aioread(topics, self.loop, self.kafka_read_callback) + topics = ("ns", "vim_account", "wim_account", "sdn", "nsi") + topics_admin = ("admin", ) + await asyncio.gather( + self.msg.aioread(topics, self.loop, self.kafka_read_callback), + self.msg_admin.aioread(topics_admin, self.loop, self.kafka_read_callback, group_id=False) + ) except LcmExceptionExit: self.logger.debug("Bye!") @@ -410,27 +436,6 @@ class Lcm: # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") - def health_check(self): - - global exit_code - task = None - exit_code = 1 - - def health_check_callback(topic, command, params): - global exit_code - print("receiving callback {} {} {}".format(topic, command, params)) - if topic == "admin" and command == "ping" and params["to"] == "lcm" and params["from"] == "lcm": - # print("received LCM ping") - exit_code = 0 - task.cancel() - - try: - task = asyncio.ensure_future(self.msg.aioread(("admin",), self.loop, health_check_callback)) - self.loop.run_until_complete(task) - except Exception: - pass - exit(exit_code) - def start(self): # check RO version @@ -456,6 +461,8 @@ class Lcm: self.db.db_disconnect() if self.msg: self.msg.disconnect() + if self.msg_admin: + self.msg_admin.disconnect() if self.fs: self.fs.fs_disconnect() @@ -491,6 +498,26 @@ class Lcm: self.logger.critical("At config file '{}': {}".format(config_file, e)) exit(1) + @staticmethod + def get_process_id(): + """ + Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it + will provide a random one + :return: Obtained ID + """ + # Try getting docker id. If fails, get pid + try: + with open("/proc/self/cgroup", "r") as f: + text_id_ = f.readline() + _, _, text_id = text_id_.rpartition("/") + text_id = text_id.replace('\n', '')[:12] + if text_id: + return text_id + except Exception: + pass + # Return a random id + return ''.join(random_choice("0123456789abcdef") for _ in range(12)) + def usage(): print("""Usage: {} [options] @@ -502,13 +529,29 @@ def usage(): # --log-socket-port PORT: send logs using this port (default: 9022)") +def health_check(): + retry = 2 + while retry: + retry -= 1 + try: + with open(health_check_file, "r") as f: + last_received_ping = f.read() + + if time() - float(last_received_ping) < Lcm.ping_interval_pace + 10: + exit(0) + except Exception: + pass + if retry: + sleep(6) + exit(1) + + if __name__ == '__main__': try: # load parameters and configuration opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"]) # TODO add "log-socket-host=", "log-socket-port=", "log-file=" config_file = None - health_check = None for o, a in opts: if o in ("-h", "--help"): usage() @@ -516,7 +559,7 @@ if __name__ == '__main__': elif o in ("-c", "--config"): config_file = a elif o == "--health-check": - health_check = True + health_check() # elif o == "--log-socket-port": # log_socket_port = a # elif o == "--log-socket-host": @@ -537,10 +580,7 @@ if __name__ == '__main__': print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr) exit(1) lcm = Lcm(config_file) - if health_check: - lcm.health_check() - else: - lcm.start() + lcm.start() except (LcmException, getopt.GetoptError) as e: print(str(e), file=sys.stderr) # usage()