From ed7f6d409b6dfa4833898a774aaadcda92be94e6 Mon Sep 17 00:00:00 2001 From: gcalvino Date: Fri, 14 Dec 2018 14:44:56 +0100 Subject: [PATCH] Bug 615 - LCM loses kafka messages Change-Id: I951fc8877a1eef245059cea9422bd928b1b3c6b4 Signed-off-by: gcalvino --- osm_lcm/lcm.py | 344 ++++++++++++++++++++++--------------------- osm_lcm/lcm_utils.py | 4 + 2 files changed, 179 insertions(+), 169 deletions(-) diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 56816830..a6760d0a 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -27,7 +27,7 @@ import ROclient import ns import vim_sdn import netslice -from lcm_utils import versiontuple, LcmException, TaskRegistry +from lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit # from osm_lcm import version as lcm_version, version_date as lcm_version_date, ROclient from osm_common import dbmemory, dbmongo, fslocal, msglocal, msgkafka @@ -213,186 +213,192 @@ class Lcm: wait_time = 1 if not first_start else 5 await asyncio.sleep(wait_time, loop=self.loop) + def kafka_read_callback(self, topic, command, params): + order_id = 1 + + if topic != "admin" and command != "ping": + self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params)) + self.consecutive_errors = 0 + self.first_start = False + order_id += 1 + if command == "exit": + raise LcmExceptionExit + elif command.startswith("#"): + return + elif command == "echo": + # just for test + print(params) + sys.stdout.flush() + return + elif command == "test": + asyncio.Task(self.test(params), loop=self.loop) + return + + if topic == "admin": + if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm": + self.pings_not_received = 0 + return + elif topic == "ns": + if command == "instantiate": + # self.logger.debug("Deploying NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) + return + elif command == "terminate": + # self.logger.debug("Deleting NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + self.lcm_tasks.cancel(topic, nsr_id) + task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task) + return + elif command == "action": + # self.logger.debug("Update NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task) + return + elif command == "scale": + # self.logger.debug("Update NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task) + return + elif command == "show": + try: + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + print("nsr:\n _id={}\n operational-status: {}\n config-status: {}" + "\n detailed-status: {}\n deploy: {}\n tasks: {}" + "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"], + db_nsr["detailed-status"], + db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id))) + except Exception as e: + print("nsr {} not found: {}".format(nsr_id, e)) + sys.stdout.flush() + return + elif command == "deleted": + return # TODO cleaning of task just in case should be done + elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time" + return + elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc) + if command == "instantiate": + # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"])) + nsilcmop = params + nsilcmop_id = nsilcmop["_id"] # slice operation id + nsir_id = nsilcmop["netsliceInstanceId"] # slice record id + task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id)) + self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task) + return + elif command == "terminate": + # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"])) + nsilcmop = params + nsilcmop_id = nsilcmop["_id"] # slice operation id + nsir_id = nsilcmop["netsliceInstanceId"] # slice record id + self.lcm_tasks.cancel(topic, nsir_id) + task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id)) + self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task) + return + elif command == "show": + try: + db_nsir = self.db.get_one("nsirs", {"_id": nsir_id}) + print("nsir:\n _id={}\n operational-status: {}\n config-status: {}" + "\n detailed-status: {}\n deploy: {}\n tasks: {}" + "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"], + db_nsir["detailed-status"], + db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id))) + except Exception as e: + print("nsir {} not found: {}".format(nsir_id, e)) + sys.stdout.flush() + return + elif command == "deleted": + return # TODO cleaning of task just in case should be done + elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time" + return + elif topic == "vim_account": + vim_id = params["_id"] + if command == "create": + task = asyncio.ensure_future(self.vim.create(params, order_id)) + self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task) + return + elif command == "delete": + self.lcm_tasks.cancel(topic, vim_id) + task = asyncio.ensure_future(self.vim.delete(vim_id, order_id)) + self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task) + return + elif command == "show": + print("not implemented show with vim_account") + sys.stdout.flush() + return + elif command == "edit": + task = asyncio.ensure_future(self.vim.edit(params, order_id)) + self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task) + return + elif topic == "wim_account": + wim_id = params["_id"] + if command == "create": + task = asyncio.ensure_future(self.wim.create(params, order_id)) + self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task) + return + elif command == "delete": + self.lcm_tasks.cancel(topic, wim_id) + task = asyncio.ensure_future(self.wim.delete(wim_id, order_id)) + self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task) + return + elif command == "show": + print("not implemented show with wim_account") + sys.stdout.flush() + return + elif command == "edit": + task = asyncio.ensure_future(self.wim.edit(params, order_id)) + self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task) + return + elif topic == "sdn": + _sdn_id = params["_id"] + if command == "create": + task = asyncio.ensure_future(self.sdn.create(params, order_id)) + self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task) + return + elif command == "delete": + self.lcm_tasks.cancel(topic, _sdn_id) + task = asyncio.ensure_future(self.sdn.delete(_sdn_id, order_id)) + self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task) + return + elif command == "edit": + task = asyncio.ensure_future(self.sdn.edit(params, order_id)) + self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task) + return + self.logger.critical("unknown topic {} and command '{}'".format(topic, command)) + async def kafka_read(self): self.logger.debug("Task kafka_read Enter") - order_id = 1 # future = asyncio.Future() - consecutive_errors = 0 - first_start = True - while consecutive_errors < 10: + self.consecutive_errors = 0 + self.first_start = True + while self.consecutive_errors < 10: try: topics = ("admin", "ns", "vim_account", "wim_account", "sdn", "nsi") - topic, command, params = await self.msg.aioread(topics, self.loop) - if topic != "admin" and command != "ping": - self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params)) - consecutive_errors = 0 - first_start = False - order_id += 1 - if command == "exit": - print("Bye!") - break - elif command.startswith("#"): - continue - elif command == "echo": - # just for test - print(params) - sys.stdout.flush() - continue - elif command == "test": - asyncio.Task(self.test(params), loop=self.loop) - continue + await self.msg.aioread(topics, self.loop, self.kafka_read_callback) - if topic == "admin": - if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm": - self.pings_not_received = 0 - continue - elif topic == "ns": - if command == "instantiate": - # self.logger.debug("Deploying NS {}".format(nsr_id)) - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] - task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id)) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) - continue - elif command == "terminate": - # self.logger.debug("Deleting NS {}".format(nsr_id)) - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] - self.lcm_tasks.cancel(topic, nsr_id) - task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id)) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task) - continue - elif command == "action": - # self.logger.debug("Update NS {}".format(nsr_id)) - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] - task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id)) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task) - continue - elif command == "scale": - # self.logger.debug("Update NS {}".format(nsr_id)) - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] - task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id)) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task) - continue - elif command == "show": - try: - db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) - print("nsr:\n _id={}\n operational-status: {}\n config-status: {}" - "\n detailed-status: {}\n deploy: {}\n tasks: {}" - "".format(nsr_id, db_nsr["operational-status"], db_nsr["config-status"], - db_nsr["detailed-status"], - db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id))) - except Exception as e: - print("nsr {} not found: {}".format(nsr_id, e)) - sys.stdout.flush() - continue - elif command == "deleted": - continue # TODO cleaning of task just in case should be done - elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time" - continue - elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc) - if command == "instantiate": - # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"])) - nsilcmop = params - nsilcmop_id = nsilcmop["_id"] # slice operation id - nsir_id = nsilcmop["netsliceInstanceId"] # slice record id - task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id)) - self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task) - continue - elif command == "terminate": - # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"])) - nsilcmop = params - nsilcmop_id = nsilcmop["_id"] # slice operation id - nsir_id = nsilcmop["netsliceInstanceId"] # slice record id - self.lcm_tasks.cancel(topic, nsir_id) - task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id)) - self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task) - continue - elif command == "show": - try: - db_nsir = self.db.get_one("nsirs", {"_id": nsir_id}) - print("nsir:\n _id={}\n operational-status: {}\n config-status: {}" - "\n detailed-status: {}\n deploy: {}\n tasks: {}" - "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"], - db_nsir["detailed-status"], - db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id))) - except Exception as e: - print("nsir {} not found: {}".format(nsir_id, e)) - sys.stdout.flush() - continue - elif command == "deleted": - continue # TODO cleaning of task just in case should be done - elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time" - continue - elif topic == "vim_account": - vim_id = params["_id"] - if command == "create": - task = asyncio.ensure_future(self.vim.create(params, order_id)) - self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_create", task) - continue - elif command == "delete": - self.lcm_tasks.cancel(topic, vim_id) - task = asyncio.ensure_future(self.vim.delete(vim_id, order_id)) - self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_delete", task) - continue - elif command == "show": - print("not implemented show with vim_account") - sys.stdout.flush() - continue - elif command == "edit": - task = asyncio.ensure_future(self.vim.edit(params, order_id)) - self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task) - continue - elif topic == "wim_account": - wim_id = params["_id"] - if command == "create": - task = asyncio.ensure_future(self.wim.create(params, order_id)) - self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task) - continue - elif command == "delete": - self.lcm_tasks.cancel(topic, wim_id) - task = asyncio.ensure_future(self.wim.delete(wim_id, order_id)) - self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task) - continue - elif command == "show": - print("not implemented show with wim_account") - sys.stdout.flush() - continue - elif command == "edit": - task = asyncio.ensure_future(self.wim.edit(params, order_id)) - self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task) - continue - elif topic == "sdn": - _sdn_id = params["_id"] - if command == "create": - task = asyncio.ensure_future(self.sdn.create(params, order_id)) - self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_create", task) - continue - elif command == "delete": - self.lcm_tasks.cancel(topic, _sdn_id) - task = asyncio.ensure_future(self.sdn.delete(_sdn_id, order_id)) - self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_delete", task) - continue - elif command == "edit": - task = asyncio.ensure_future(self.sdn.edit(params, order_id)) - self.lcm_tasks.register("sdn", _sdn_id, order_id, "sdn_edit", task) - continue - self.logger.critical("unknown topic {} and command '{}'".format(topic, command)) + except LcmExceptionExit: + self.logger.debug("Bye!") + break except Exception as e: # if not first_start is the first time after starting. So leave more time and wait # to allow kafka starts - if consecutive_errors == 8 if not first_start else 30: + if self.consecutive_errors == 8 if not self.first_start else 30: self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e)) raise - consecutive_errors += 1 + self.consecutive_errors += 1 self.logger.error("Task kafka_read retrying after Exception {}".format(e)) - wait_time = 2 if not first_start else 5 + wait_time = 2 if not self.first_start else 5 await asyncio.sleep(wait_time, loop=self.loop) # self.logger.debug("Task kafka_read terminating") diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index 2989bf41..687da413 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -30,6 +30,10 @@ class LcmExceptionNoMgmtIP(LcmException): pass +class LcmExceptionExit(LcmException): + pass + + def versiontuple(v): """utility for compare dot separate versions. Fills with zeros to proper number comparison package version will be something like 4.0.1.post11+gb3f024d.dirty-1. Where 4.0.1 is the git tag, postXX is the -- 2.25.1