X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=a42f1cb400823109b6575d4a74aae5a33d1e16db;hb=3e359b1f0c36fb97145b0bfcbd4d8cc89117924a;hp=a01d1883a65d1bc29c914afdb5cfbfca70f6828c;hpb=8a5188754662c19e2471e5e7a1172a3bccea669e;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index a01d188..a42f1cb 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -27,6 +27,7 @@ import ROclient import ns import vim_sdn import netslice +from time import time, sleep from lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit # from osm_lcm import version as lcm_version, version_date as lcm_version_date, ROclient @@ -40,12 +41,13 @@ from n2vc import version as n2vc_version __author__ = "Alfonso Tierno" -min_RO_version = [0, 6, 0] +min_RO_version = [0, 6, 3] min_n2vc_version = "0.0.2" min_common_version = "0.1.11" # uncomment if LCM is installed as library and installed, and get them from __init__.py -lcm_version = '0.1.30' -lcm_version_date = '2019-01-10' +lcm_version = '0.1.35' +lcm_version_date = '2019-01-31' +health_check_file = path.expanduser("~") + "/time_last_ping" # TODO find better location for this file class Lcm: @@ -64,6 +66,8 @@ class Lcm: self.msg = None self.fs = None self.pings_not_received = 1 + self.consecutive_errors = 0 + self.first_start = False # contains created tasks/futures to be able to cancel self.lcm_tasks = TaskRegistry() @@ -145,15 +149,17 @@ class Lcm: raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format( config["storage"]["driver"])) - if config["message"]["driver"] == "local": + config_message = config["message"].copy() + config_message["loop"] = self.loop + if config_message["driver"] == "local": self.msg = msglocal.MsgLocal() - self.msg.connect(config["message"]) - elif config["message"]["driver"] == "kafka": + self.msg.connect(config_message) + elif config_message["driver"] == "kafka": self.msg = msgkafka.MsgKafka() - self.msg.connect(config["message"]) + self.msg.connect(config_message) else: raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format( - config["storage"]["driver"])) + config["message"]["driver"])) except (DbException, FsException, MsgException) as e: self.logger.critical(str(e), exc_info=True) raise LcmException(str(e)) @@ -237,6 +243,11 @@ class Lcm: if topic == "admin": if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm": self.pings_not_received = 0 + try: + with open(health_check_file, "w") as f: + f.write(str(time())) + except Exception as e: + self.logger.error("Cannot write into '{}' for healthcheck: {}".format(health_check_file, e)) return elif topic == "ns": if command == "instantiate": @@ -273,6 +284,7 @@ class Lcm: self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task) return elif command == "show": + nsr_id = params try: db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) print("nsr:\n _id={}\n operational-status: {}\n config-status: {}" @@ -307,6 +319,7 @@ class Lcm: self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task) return elif command == "show": + nsir_id = params try: db_nsir = self.db.get_one("nsirs", {"_id": nsir_id}) print("nsir:\n _id={}\n operational-status: {}\n config-status: {}" @@ -404,27 +417,6 @@ class Lcm: # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") - def health_check(self): - - global exit_code - task = None - exit_code = 1 - - def health_check_callback(topic, command, params): - global exit_code - print("receiving callback {} {} {}".format(topic, command, params)) - if topic == "admin" and command == "ping" and params["to"] == "lcm" and params["from"] == "lcm": - # print("received LCM ping") - exit_code = 0 - task.cancel() - - try: - task = asyncio.ensure_future(self.msg.aioread(("admin",), self.loop, health_check_callback)) - self.loop.run_until_complete(task) - except Exception: - pass - exit(exit_code) - def start(self): # check RO version @@ -496,13 +488,29 @@ def usage(): # --log-socket-port PORT: send logs using this port (default: 9022)") +def health_check(): + retry = 2 + while retry: + retry -= 1 + try: + with open(health_check_file, "r") as f: + last_received_ping = f.read() + + if time() - float(last_received_ping) < Lcm.ping_interval_pace + 10: + exit(0) + except Exception: + pass + if retry: + sleep(6) + exit(1) + + if __name__ == '__main__': try: # load parameters and configuration opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"]) # TODO add "log-socket-host=", "log-socket-port=", "log-file=" config_file = None - health_check = None for o, a in opts: if o in ("-h", "--help"): usage() @@ -510,7 +518,7 @@ if __name__ == '__main__': elif o in ("-c", "--config"): config_file = a elif o == "--health-check": - health_check = True + health_check() # elif o == "--log-socket-port": # log_socket_port = a # elif o == "--log-socket-host": @@ -531,10 +539,7 @@ if __name__ == '__main__': print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr) exit(1) lcm = Lcm(config_file) - if health_check: - lcm.health_check() - else: - lcm.start() + lcm.start() except (LcmException, getopt.GetoptError) as e: print(str(e), file=sys.stderr) # usage()