From fda3b8c9084db17a03feba4e989ce43124e261e1 Mon Sep 17 00:00:00 2001 From: tierno Date: Wed, 18 Apr 2018 18:53:02 +0200 Subject: [PATCH] adding ns_lcm_op_occs instantiate terminate action (primitive) Change-Id: I3c16d12d558c4b2a988e0ce1cdc77225298e69d3 Signed-off-by: tierno --- lcm/osm_common/dbmongo.py | 6 +- lcm/osm_common/msgkafka.py | 6 +- lcm/osm_lcm/lcm.cfg | 3 +- lcm/osm_lcm/lcm.py | 579 +++++++++++++++++++++++++++---------- 4 files changed, 440 insertions(+), 154 deletions(-) diff --git a/lcm/osm_common/dbmongo.py b/lcm/osm_common/dbmongo.py index 6bc35a58..46e4dc84 100644 --- a/lcm/osm_common/dbmongo.py +++ b/lcm/osm_common/dbmongo.py @@ -167,12 +167,12 @@ class DbMongo(DbBase): try: collection = self.db[table] rows = collection.update_one(self._format_filter(filter), {"$set": update_dict}) - if rows.updated_count == 0: + if rows.matched_count == 0: if fail_on_empty: raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter), HTTPStatus.NOT_FOUND) return None - return {"deleted": rows.deleted_count} + return {"modified": rows.modified_count} except Exception as e: # TODO refine raise DbException(str(e)) @@ -186,6 +186,6 @@ class DbMongo(DbBase): raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter), HTTPStatus.NOT_FOUND) return None - return {"replace": rows.modified_count} + return {"replaced": rows.modified_count} except Exception as e: # TODO refine raise DbException(str(e)) diff --git a/lcm/osm_common/msgkafka.py b/lcm/osm_common/msgkafka.py index 459513d8..de1e764a 100644 --- a/lcm/osm_common/msgkafka.py +++ b/lcm/osm_common/msgkafka.py @@ -56,7 +56,7 @@ class MsgKafka(MsgBase): except Exception as e: raise MsgException("Error reading {} topic: {}".format(topic, str(e))) - async def aiowrite(self, topic, key, msg, loop): + async def aiowrite(self, topic, key, msg, loop=None): if not loop: loop = self.loop @@ -64,9 +64,9 @@ class MsgKafka(MsgBase): self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker) await self.producer.start() - await self.producer.send(topic=topic, key=key, value=msg) + await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True)) except Exception as e: - raise MsgException("Error publishing to {} topic: {}".format(topic, str(e))) + raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e)) finally: await self.producer.stop() diff --git a/lcm/osm_lcm/lcm.cfg b/lcm/osm_lcm/lcm.cfg index 5f87a123..c62ee250 100644 --- a/lcm/osm_lcm/lcm.cfg +++ b/lcm/osm_lcm/lcm.cfg @@ -8,8 +8,7 @@ global: #[RO] RO: - #host: ro # hostname or IP - host: localhost + host: ro # hostname or IP port: 9090 tenant: osm loglevel: DEBUG diff --git a/lcm/osm_lcm/lcm.py b/lcm/osm_lcm/lcm.py index aa90f3d8..6e0f8ab6 100644 --- a/lcm/osm_lcm/lcm.py +++ b/lcm/osm_lcm/lcm.py @@ -23,6 +23,7 @@ from n2vc.vnf import N2VC from copy import deepcopy from http import HTTPStatus +from time import time class LcmException(Exception): @@ -41,8 +42,9 @@ class Lcm: self.db = None self.msg = None self.fs = None - # contains created tasks/futures to be able to cancel + self.pings_not_received = 1 + # contains created tasks/futures to be able to cancel self.lcm_ns_tasks = {} self.lcm_vim_tasks = {} self.lcm_sdn_tasks = {} @@ -142,9 +144,15 @@ class Lcm: except DbException as e: self.logger.error("Updating {} _id={}: {}".format(item, _id, e)) - async def create_vim(self, vim_content, order_id): + def update_db_2(self, item, _id, _desc): + try: + self.db.set_one(item, {"_id": _id}, _desc) + except DbException as e: + self.logger.error("Updating {} _id={}: {}".format(item, _id, e)) + + async def vim_create(self, vim_content, order_id): vim_id = vim_content["_id"] - logging_text = "Task create_vim={} ".format(vim_id) + logging_text = "Task vim_create={} ".format(vim_id) self.logger.debug(logging_text + "Enter") db_vim = None exc = None @@ -153,9 +161,9 @@ class Lcm: db_vim = self.db.get_one("vims", {"_id": vim_id}) if "_admin" not in db_vim: db_vim["_admin"] = {} - if "deploy" not in db_vim["_admin"]: - db_vim["_admin"]["deploy"] = {} - db_vim["_admin"]["deploy"]["RO"] = None + if "deployed" not in db_vim["_admin"]: + db_vim["_admin"]["deployed"] = {} + db_vim["_admin"]["deployed"]["RO"] = None step = "Creating vim at RO" RO = ROclient.ROClient(self.loop, **self.ro_config) @@ -170,7 +178,7 @@ class Lcm: vim_RO.pop("vim_password", None) desc = await RO.create("vim", descriptor=vim_RO) RO_vim_id = desc["uuid"] - db_vim["_admin"]["deploy"]["RO"] = RO_vim_id + db_vim["_admin"]["deployed"]["RO"] = RO_vim_id self.update_db("vims", vim_id, db_vim) step = "Attach vim to RO tenant" @@ -198,17 +206,17 @@ class Lcm: db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc) self.update_db("vims", vim_id, db_vim) - async def edit_vim(self, vim_content, order_id): + async def vim_edit(self, vim_content, order_id): vim_id = vim_content["_id"] - logging_text = "Task edit_vim={} ".format(vim_id) + logging_text = "Task vim_edit={} ".format(vim_id) self.logger.debug(logging_text + "Enter") db_vim = None exc = None step = "Getting vim from db" try: db_vim = self.db.get_one("vims", {"_id": vim_id}) - if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"): - RO_vim_id = db_vim["_admin"]["deploy"]["RO"] + if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"): + RO_vim_id = db_vim["_admin"]["deployed"]["RO"] step = "Editing vim at RO" RO = ROclient.ROClient(self.loop, **self.ro_config) vim_RO = deepcopy(vim_content) @@ -250,16 +258,16 @@ class Lcm: db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc) self.update_db("vims", vim_id, db_vim) - async def delete_vim(self, vim_id, order_id): - logging_text = "Task delete_vim={} ".format(vim_id) + async def vim_delete(self, vim_id, order_id): + logging_text = "Task vim_delete={} ".format(vim_id) self.logger.debug(logging_text + "Enter") db_vim = None exc = None step = "Getting vim from db" try: db_vim = self.db.get_one("vims", {"_id": vim_id}) - if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"): - RO_vim_id = db_vim["_admin"]["deploy"]["RO"] + if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"): + RO_vim_id = db_vim["_admin"]["deployed"]["RO"] RO = ROclient.ROClient(self.loop, **self.ro_config) step = "Detaching vim from RO tenant" try: @@ -282,7 +290,7 @@ class Lcm: # nothing to delete self.logger.error(logging_text + "Skipping. There is not RO information at database") self.db.del_one("vims", {"_id": vim_id}) - self.logger.debug("delete_vim task vim_id={} Exit Ok".format(vim_id)) + self.logger.debug("vim_delete task vim_id={} Exit Ok".format(vim_id)) return None except (ROclient.ROClientException, DbException) as e: @@ -297,9 +305,9 @@ class Lcm: db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc) self.update_db("vims", vim_id, db_vim) - async def create_sdn(self, sdn_content, order_id): + async def sdn_create(self, sdn_content, order_id): sdn_id = sdn_content["_id"] - logging_text = "Task create_sdn={} ".format(sdn_id) + logging_text = "Task sdn_create={} ".format(sdn_id) self.logger.debug(logging_text + "Enter") db_sdn = None exc = None @@ -308,9 +316,9 @@ class Lcm: db_sdn = self.db.get_one("sdns", {"_id": sdn_id}) if "_admin" not in db_sdn: db_sdn["_admin"] = {} - if "deploy" not in db_sdn["_admin"]: - db_sdn["_admin"]["deploy"] = {} - db_sdn["_admin"]["deploy"]["RO"] = None + if "deployed" not in db_sdn["_admin"]: + db_sdn["_admin"]["deployed"] = {} + db_sdn["_admin"]["deployed"]["RO"] = None step = "Creating sdn at RO" RO = ROclient.ROClient(self.loop, **self.ro_config) @@ -321,7 +329,7 @@ class Lcm: sdn_RO.pop("schema_type", None) desc = await RO.create("sdn", descriptor=sdn_RO) RO_sdn_id = desc["uuid"] - db_sdn["_admin"]["deploy"]["RO"] = RO_sdn_id + db_sdn["_admin"]["deployed"]["RO"] = RO_sdn_id db_sdn["_admin"]["operationalState"] = "ENABLED" self.update_db("sdns", sdn_id, db_sdn) self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id)) @@ -339,17 +347,17 @@ class Lcm: db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc) self.update_db("sdns", sdn_id, db_sdn) - async def edit_sdn(self, sdn_content, order_id): + async def sdn_edit(self, sdn_content, order_id): sdn_id = sdn_content["_id"] - logging_text = "Task edit_sdn={} ".format(sdn_id) + logging_text = "Task sdn_edit={} ".format(sdn_id) self.logger.debug(logging_text + "Enter") db_sdn = None exc = None step = "Getting sdn from db" try: db_sdn = self.db.get_one("sdns", {"_id": sdn_id}) - if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"): - RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"] + if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"): + RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"] RO = ROclient.ROClient(self.loop, **self.ro_config) step = "Editing sdn at RO" sdn_RO = deepcopy(sdn_content) @@ -377,16 +385,16 @@ class Lcm: db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc) self.update_db("sdns", sdn_id, db_sdn) - async def delete_sdn(self, sdn_id, order_id): - logging_text = "Task delete_sdn={} ".format(sdn_id) + async def sdn_delete(self, sdn_id, order_id): + logging_text = "Task sdn_delete={} ".format(sdn_id) self.logger.debug(logging_text + "Enter") db_sdn = None exc = None step = "Getting sdn from db" try: db_sdn = self.db.get_one("sdns", {"_id": sdn_id}) - if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"): - RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"] + if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"): + RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"] RO = ROclient.ROClient(self.loop, **self.ro_config) step = "Deleting sdn from RO" try: @@ -400,7 +408,7 @@ class Lcm: # nothing to delete self.logger.error(logging_text + "Skipping. There is not RO information at database") self.db.del_one("sdns", {"_id": sdn_id}) - self.logger.debug("delete_sdn task sdn_id={} Exit Ok".format(sdn_id)) + self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id)) return None except (ROclient.ROClientException, DbException) as e: @@ -451,7 +459,7 @@ class Lcm: if ci_file: ci_file.close() - def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None): + def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, db_nslcmop, vnf_member_index, task=None): """Update the lcm database with the status of the charm. Updates the VNF's operational status with the state of the charm: @@ -468,35 +476,55 @@ class Lcm: charms. """ nsr_id = None + nslcmop_id = None + update_nsr = update_nslcmop = False try: nsr_id = db_nsr["_id"] - nsr_lcm = db_nsr["_admin"]["deploy"] + nslcmop_id = db_nslcmop["_id"] + nsr_lcm = db_nsr["_admin"]["deployed"] + ns_action = db_nslcmop["lcmOperationType"] + logging_text = "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id, ns_action, nslcmop_id, + vnf_member_index) + if task: if task.cancelled(): - self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Cancelled".format(nsr_id, vnf_member_index)) + self.logger.debug(logging_text + " task Cancelled") + # TODO update db_nslcmop return if task.done(): exc = task.exception() if exc: - self.logger.error( - "[n2vc_callback] create_ns={} vnf_index={} task Exception={}".format(nsr_id, vnf_member_index, exc)) - nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error" - nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc) + self.logger.error(logging_text + " task Exception={}".format(exc)) + if ns_action in ("instantiate", "terminate"): + nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error" + nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc) + elif ns_action == "action": + db_nslcmop["operationState"] = "FAILED" + db_nslcmop["detailedStatus"] = str(exc) + db_nslcmop["statusEnteredTime"] = time() + update_nslcmop = True + return + else: - self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Done".format(nsr_id, vnf_member_index)) - # TODO it seams that task Done, but callback is still ongoing. For the moment comment this two lines - # nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "active" - # nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = "" + self.logger.debug(logging_text + " task Done") + # TODO revise with Adam if action is finished and ok when task is done + if ns_action == "action": + db_nslcmop["operationState"] = "COMPLETED" + db_nslcmop["detailedStatus"] = "Done" + db_nslcmop["statusEnteredTime"] = time() + update_nslcmop = True + # task is Done, but callback is still ongoing. So ignore + return elif workload_status: - self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} Enter workload_status={}".format(nsr_id, vnf_member_index, workload_status)) + self.logger.debug(logging_text + " Enter workload_status={}".format(workload_status)) if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == workload_status: return # same status, ignore nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status # TODO N2VC some error message in case of error should be obtained from N2VC nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = "" else: - self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Enter with bad parameters".format(nsr_id, vnf_member_index), exc_info=True) + self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True) return some_failed = False @@ -514,14 +542,21 @@ class Lcm: if vca_status == "error": some_failed = True db_nsr["config-status"] = "failed" - db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index, - vca_info["detailed-status"]) + error_text = "fail configuring vnf_index={} {}".format(vnf_member_index, + vca_info["detailed-status"]) + db_nsr["detailed-status"] = error_text + db_nslcmop["operationState"] = "FAILED_TEMP" + db_nslcmop["detailedStatus"] = error_text + db_nslcmop["statusEnteredTime"] = time() break if all_active: - self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} All active".format(nsr_id, vnf_member_index)) + self.logger.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id, vnf_member_index)) db_nsr["config-status"] = "configured" db_nsr["detailed-status"] = "done" + db_nslcmop["operationState"] = "COMPLETED" + db_nslcmop["detailedStatus"] = "Done" + db_nslcmop["statusEnteredTime"] = time() elif some_failed: pass else: @@ -531,22 +566,43 @@ class Lcm: cs += separator + "{}: {}".format(status, num) separator = ", " db_nsr["config-status"] = cs - self.update_db("nsrs", nsr_id, db_nsr) + db_nslcmop["detailedStatus"] = cs + update_nsr = update_nslcmop = True except Exception as e: - self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Exception {}".format(nsr_id, vnf_member_index, e), exc_info=True) + self.logger.critical("[n2vc_callback] vnf_index={} Exception {}".format(vnf_member_index, e), exc_info=True) + finally: + try: + if update_nslcmop: + self.update_db("nslcmops", nslcmop_id, db_nslcmop) + if update_nsr: + self.update_db("nsrs", nsr_id, db_nsr) + except Exception as e: + self.logger.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format( + vnf_member_index, e), exc_info=True) - async def create_ns(self, nsr_id, order_id): - logging_text = "Task create_ns={} ".format(nsr_id) + async def ns_instantiate(self, nsr_id, nslcmop_id): + logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") # get all needed from database db_nsr = None + db_nslcmop = None exc = None - step = "Getting nsr from db" + step = "Getting nsr, nslcmop, RO_vims from db" try: + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) nsd = db_nsr["nsd"] nsr_name = db_nsr["name"] # TODO short-name?? + + db_vim = self.db.get_one("vims", {"_id": db_nsr["datacenter"]}) + # if db_vim["_admin"]["operationalState"] == "PROCESSING": + # #TODO check if VIM is creating and wait + if db_vim["_admin"]["operationalState"] != "ENABLED": + raise LcmException("VIM={} is not available. operationalSstatus={}".format( + db_nsr["datacenter"], db_vim["_admin"]["operationalState"])) + RO_vim_id = db_vim["_admin"]["deployed"]["RO"] + needed_vnfd = {} for c_vnf in nsd["constituent-vnfd"]: vnfd_id = c_vnf["vnfd-id-ref"] @@ -554,19 +610,20 @@ class Lcm: step = "Getting vnfd={} from db".format(vnfd_id) needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id}) - nsr_lcm = { - "id": nsr_id, - "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"}, - "nsr_ip": {}, - "VCA": {}, - } - db_nsr["_admin"]["deploy"] = nsr_lcm + nsr_lcm = db_nsr["_admin"].get("deployed") + if not nsr_lcm: + nsr_lcm = db_nsr["_admin"]["deployed"] = { + "id": nsr_id, + "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"}, + "nsr_ip": {}, + "VCA": {}, + } db_nsr["detailed-status"] = "creating" db_nsr["operational-status"] = "init" deloyment_timeout = 120 - RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config) + RO = ROclient.ROClient(self.loop, datacenter=RO_vim_id, **self.ro_config) # get vnfds, instantiate at RO for vnfd_id, vnfd in needed_vnfd.items(): @@ -584,6 +641,7 @@ class Lcm: vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO) desc = await RO.create("vnfd", descriptor=vnfd_RO) nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"] + db_nsr["_admin"]["nsState"] = "INSTANTIATED" self.update_db("nsrs", nsr_id, db_nsr) # create nsd at RO @@ -606,6 +664,7 @@ class Lcm: vnfd_id = c_vnf["vnfd-id-ref"] c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200] desc = await RO.create("nsd", descriptor=nsd_RO) + db_nsr["_admin"]["nsState"] = "INSTANTIATED" nsr_lcm["RO"]["nsd_id"] = desc["uuid"] self.update_db("nsrs", nsr_id, db_nsr) @@ -634,9 +693,10 @@ class Lcm: step = db_nsr["detailed-status"] = "Creating ns at RO" self.logger.debug(logging_text + step) - desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"], + desc = await RO.create("ns", name=db_nsr["name"], datacenter=RO_vim_id, scenario=nsr_lcm["RO"]["nsd_id"]) RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"] + db_nsr["_admin"]["nsState"] = "INSTANTIATED" nsr_lcm["RO"]["nsr_status"] = "BUILD" self.update_db("nsrs", nsr_id, db_nsr) @@ -695,7 +755,7 @@ class Lcm: ) # Setup the runtime parameters for this VNF - params['rw_mgmt_ip'] = nsr_lcm['nsr_ip'][vnf_index] + params['rw_mgmt_ip'] = nsr_lcm['nsr_ip']["vnf"][vnf_index] # ns_name will be ignored in the current version of N2VC # but will be implemented for the next point release. @@ -725,12 +785,14 @@ class Lcm: {}, # for native charms only self.n2vc_callback, # Callback for status changes db_nsr, # Callback parameter + db_nslcmop, vnf_index, # Callback parameter None, # Callback parameter (task) ) ) - task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, db_nsr, vnf_index)) - self.lcm_ns_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task + task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, + db_nsr, db_nslcmop, vnf_index)) + self.lcm_ns_tasks[nsr_id][nslcmop_id]["create_charm:" + vnf_index] = task # TODO: Make this call inside deploy() # Login to the VCA. If there are multiple calls to login(), @@ -774,68 +836,95 @@ class Lcm: deploy() number_to_configure += 1 - db_nsr["config-status"] = "configuring" if number_to_configure else "configured" - db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure) if number_to_configure else "done" + if number_to_configure: + db_nsr["config-status"] = "configuring" + db_nsr["detailed-status"] = "configuring: init: {}".format(number_to_configure) + db_nslcmop["detailed-status"] = "configuring: init: {}".format(number_to_configure) + else: + db_nslcmop["operationState"] = "COMPLETED" + db_nslcmop["detailed-status"] = "done" + db_nsr["config-status"] = "configured" + db_nsr["detailed-status"] = "done" db_nsr["operational-status"] = "running" self.update_db("nsrs", nsr_id, db_nsr) - - self.logger.debug("Task create_ns={} Exit Ok".format(nsr_id)) + self.update_db("nslcmops", nslcmop_id, db_nslcmop) + self.logger.debug("Task ns_instantiate={} Exit Ok".format(nsr_id)) return nsr_lcm except (ROclient.ROClientException, DbException, LcmException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except Exception as e: - self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) + self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) exc = e finally: - if exc and db_nsr: - db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc) - db_nsr["operational-status"] = "failed" - self.update_db("nsrs", nsr_id, db_nsr) - - async def delete_ns(self, nsr_id, order_id): - logging_text = "Task delete_ns={} ".format(nsr_id) + if exc: + if db_nsr: + db_nsr["detailed-status"] = "ERROR {}: {}".format(step, exc) + db_nsr["operational-status"] = "failed" + self.update_db("nsrs", nsr_id, db_nsr) + if db_nslcmop: + db_nslcmop["detailed-status"] = "FAILED {}: {}".format(step, exc) + db_nslcmop["operationState"] = "FAILED" + db_nslcmop["statusEnteredTime"] = time() + self.update_db("nslcmops", nslcmop_id, db_nslcmop) + + async def ns_terminate(self, nsr_id, nslcmop_id): + logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") db_nsr = None + db_nslcmop = None exc = None - step = "Getting nsr from db" + step = "Getting nsr, nslcmop from db" + failed_detail = [] # annotates all failed error messages + vca_task_list = [] + vca_task_dict = {} try: + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) - nsd = db_nsr["nsd"] - nsr_lcm = db_nsr["_admin"]["deploy"] - - db_nsr["operational-status"] = "terminating" - db_nsr["config-status"] = "terminating" - db_nsr["detailed-status"] = "Deleting charms" - self.update_db("nsrs", nsr_id, db_nsr) + # nsd = db_nsr["nsd"] + nsr_lcm = deepcopy(db_nsr["_admin"]["deployed"]) + if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED": + return + # TODO ALF remove + # db_vim = self.db.get_one("vims", {"_id": db_nsr["datacenter"]}) + # #TODO check if VIM is creating and wait + # RO_vim_id = db_vim["_admin"]["deployed"]["RO"] + + db_nsr_update = { + "operational-status": "terminating", + "config-status": "terminating", + "detailed-status": "Deleting charms", + } + self.update_db_2("nsrs", nsr_id, db_nsr_update) try: self.logger.debug(logging_text + step) for vnf_index, deploy_info in nsr_lcm["VCA"].items(): if deploy_info and deploy_info.get("application"): - # n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None): - - # self.n2vc.RemoveCharms(model_name, application_name, self.n2vc_callback, model_name, application_name) task = asyncio.ensure_future( self.n2vc.RemoveCharms( deploy_info['model'], deploy_info['application'], - self.n2vc_callback, - db_nsr, - vnf_index, + # self.n2vc_callback, + # db_nsr, + # db_nslcmop, + # vnf_index, ) ) + vca_task_list.append(task) + vca_task_dict[vnf_index] = task # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'], - # deploy_info['application'],None, db_nsr, vnf_index)) - self.lcm_ns_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task + # deploy_info['application'], None, db_nsr, + # db_nslcmop, vnf_index)) + self.lcm_ns_tasks[nsr_id][nslcmop_id]["delete_charm:" + vnf_index] = task except Exception as e: self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e)) # remove from RO - RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config) + RO = ROclient.ROClient(self.loop, **self.ro_config) # Delete ns - RO_nsr_id = nsr_lcm["RO"]["nsr_id"] + RO_nsr_id = nsr_lcm["RO"].get("nsr_id") if RO_nsr_id: try: step = db_nsr["detailed-status"] = "Deleting ns at RO" @@ -849,10 +938,11 @@ class Lcm: nsr_lcm["RO"]["nsr_status"] = "DELETED" self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id)) elif e.http_code == 409: #conflict - self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e)) + failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e)) + self.logger.debug(logging_text + failed_detail[-1]) else: - self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(RO_nsr_id, e)) - self.update_db("nsrs", nsr_id, db_nsr) + failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e)) + self.logger.error(logging_text + failed_detail[-1]) # Delete nsd RO_nsd_id = nsr_lcm["RO"]["nsd_id"] @@ -867,10 +957,11 @@ class Lcm: nsr_lcm["RO"]["nsd_id"] = None self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id)) elif e.http_code == 409: #conflict - self.logger.debug(logging_text + "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e)) + failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e)) + self.logger.debug(logging_text + failed_detail[-1]) else: - self.logger.error(logging_text + "RO_nsd_id={} delete error: {}".format(RO_nsd_id, e)) - self.update_db("nsrs", nsr_id, db_nsr) + failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e)) + self.logger.error(logging_text + failed_detail[-1]) for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): if not RO_vnfd_id: @@ -885,14 +976,54 @@ class Lcm: nsr_lcm["RO"]["vnfd_id"][vnf_id] = None self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id)) elif e.http_code == 409: #conflict - self.logger.debug(logging_text + "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e)) + failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e)) + self.logger.debug(logging_text + failed_detail[-1]) else: - self.logger.error(logging_text + "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e)) - self.update_db("nsrs", nsr_id, db_nsr) + failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e)) + self.logger.error(logging_text + failed_detail[-1]) - # TODO delete from database or mark as deleted??? - db_nsr["operational-status"] = "terminated" - self.db.del_one("nsrs", {"_id": nsr_id}) + if vca_task_list: + await asyncio.wait(vca_task_list, timeout=300) + for vnf_index, task in vca_task_dict.items(): + if task.cancelled(): + failed_detail.append("VCA[{}] Deletion has been cancelled".format(vnf_index)) + elif task.done(): + exc = task.exception() + if exc: + failed_detail.append("VCA[{}] Deletion exception: {}".format(vnf_index, exc)) + else: + nsr_lcm["VCA"][vnf_index] = None + else: # timeout + # TODO Should it be cancelled?!! + task.cancel() + failed_detail.append("VCA[{}] Deletion timeout".format(vnf_index)) + + if failed_detail: + self.logger.error(logging_text + " ;".join(failed_detail)) + db_nsr_update = { + "operational-status": "failed", + "detailed-status": "Deletion errors " + "; ".join(failed_detail), + "_admin": {"deployed": nsr_lcm, } + } + db_nslcmop_update = { + "detailedStatus": "; ".join(failed_detail), + "operationState": "FAILED", + "statusEnteredTime": time() + } + elif db_nslcmop["operationParams"].get("autoremove"): + self.db.del_one("nsrs", {"_id": nsr_id}) + self.db.del_list("nslcmops", {"nsInstanceId": nsr_id}) + else: + db_nsr_update = { + "operational-status": "terminated", + "detailed-status": "Done", + "_admin": {"deployed": nsr_lcm, "nsState": "NOT_INSTANTIATED"} + } + db_nslcmop_update = { + "detailedStatus": "Done", + "operationState": "COMPLETED", + "statusEnteredTime": time() + } self.logger.debug(logging_text + "Exit") except (ROclient.ROClientException, DbException) as e: @@ -902,10 +1033,104 @@ class Lcm: self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) exc = e finally: - if exc and db_nsr: - db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc) - db_nsr["operational-status"] = "failed" - self.update_db("nsrs", nsr_id, db_nsr) + if exc and db_nslcmop: + db_nslcmop_update = { + "detailed-status": "FAILED {}: {}".format(step, exc), + "operationState": "FAILED", + "statusEnteredTime": time(), + } + if db_nslcmop_update: + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + if db_nsr_update: + self.update_db_2("nsrs", nsr_id, db_nsr_update) + + async def ns_action(self, nsr_id, nslcmop_id): + logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id) + self.logger.debug(logging_text + "Enter") + # get all needed from database + db_nsr = None + db_nslcmop = None + db_nslcmop_update = None + exc = None + step = "Getting nsr, nslcmop" + try: + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + nsr_lcm = db_nsr["_admin"].get("deployed") + vnf_index = db_nslcmop["operationParams"]["vnf_member_index"] + + #TODO check if ns is in a proper status + vca_deployed = nsr_lcm["VCA"].get(vnf_index) + if not vca_deployed: + raise LcmException("charm for vnf_member_index={} is not deployed".format(vnf_index)) + model_name = vca_deployed.get("model") + application_name = vca_deployed.get("application") + if not model_name or not application_name: + raise LcmException("charm for vnf_member_index={} is not properly deployed".format(vnf_index)) + if vca_deployed["operational-status"] != "active": + raise LcmException("charm for vnf_member_index={} operational_status={} not 'active'".format( + vnf_index, vca_deployed["operational-status"])) + primitive = db_nslcmop["operationParams"]["primitive"] + primitive_params = db_nslcmop["operationParams"]["primitive_params"] + callback = None # self.n2vc_callback + callback_args = () # [db_nsr, db_nslcmop, vnf_index, None] + await self.n2vc.login() + task = asyncio.ensure_future( + self.n2vc.ExecutePrimitive( + model_name, + application_name, + primitive, callback, + *callback_args, + **primitive_params + ) + ) + # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, + # db_nsr, db_nslcmop, vnf_index)) + # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task + # wait until completed with timeout + await asyncio.wait((task,), timeout=300) + + result = "FAILED" # by default + result_detail = "" + if task.cancelled(): + db_nslcmop["detailedStatus"] = "Task has been cancelled" + elif task.done(): + exc = task.exception() + if exc: + result_detail = str(exc) + else: + self.logger.debug(logging_text + " task Done") + # TODO revise with Adam if action is finished and ok when task is done or callback is needed + result = "COMPLETED" + result_detail = "Done" + else: # timeout + # TODO Should it be cancelled?!! + task.cancel() + result_detail = "timeout" + + db_nslcmop_update = { + "detailedStatus": result_detail, + "operationState": result, + "statusEnteredTime": time() + } + self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail)) + return # database update is called inside finally + + except (DbException, LcmException) as e: + self.logger.error(logging_text + "Exit Exception {}".format(e)) + exc = e + except Exception as e: + self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) + exc = e + finally: + if exc and db_nslcmop: + db_nslcmop_update = { + "detailed-status": "FAILED {}: {}".format(step, exc), + "operationState": "FAILED", + "statusEnteredTime": time(), + } + if db_nslcmop_update: + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) async def test(self, param=None): self.logger.debug("Starting/Ending test task: {}".format(param)) @@ -933,16 +1158,51 @@ class Lcm: self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name)) lcm_tasks[_id] = {} - async def read_kafka(self): - self.logger.debug("Task Kafka Enter") + async def kafka_ping(self): + self.logger.debug("Task kafka_ping Enter") + consecutive_errors = 0 + first_start = True + kafka_has_received = False + self.pings_not_received = 1 + while True: + try: + await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop) + # time between pings are low when it is not received and at starting + wait_time = 5 if not kafka_has_received else 120 + if not self.pings_not_received: + kafka_has_received = True + self.pings_not_received += 1 + await asyncio.sleep(wait_time, loop=self.loop) + if self.pings_not_received > 10: + raise LcmException("It is not receiving pings from Kafka bus") + consecutive_errors = 0 + first_start = False + except LcmException: + raise + except Exception as e: + # if not first_start is the first time after starting. So leave more time and wait + # to allow kafka starts + if consecutive_errors == 8 if not first_start else 30: + self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e)) + raise + consecutive_errors += 1 + self.logger.error("Task kafka_read retrying after Exception {}".format(e)) + wait_time = 1 if not first_start else 5 + await asyncio.sleep(wait_time, loop=self.loop) + + async def kafka_read(self): + self.logger.debug("Task kafka_read Enter") order_id = 1 # future = asyncio.Future() consecutive_errors = 0 + first_start = True while consecutive_errors < 10: try: - topic, command, params = await self.msg.aioread(("ns", "vim_account", "sdn"), self.loop) - self.logger.debug("Task Kafka receives {} {}: {}".format(topic, command, params)) + topics = ("admin", "ns", "vim_account", "sdn") + topic, command, params = await self.msg.aioread(topics, self.loop) + self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params)) consecutive_errors = 0 + first_start = False order_id += 1 if command == "exit": print("Bye!") @@ -958,22 +1218,41 @@ class Lcm: asyncio.Task(self.test(params), loop=self.loop) continue - if topic == "ns": - nsr_id = params.strip() - if command == "create": + if topic == "admin": + if command == "ping" and params["to"] == "lcm" and params["from"] == "lcm": + self.pings_not_received = 0 + continue + elif topic == "ns": + if command == "instantiate": # self.logger.debug("Deploying NS {}".format(nsr_id)) - task = asyncio.ensure_future(self.create_ns(nsr_id, order_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns_instantiate(nsr_id, nslcmop_id)) if nsr_id not in self.lcm_ns_tasks: self.lcm_ns_tasks[nsr_id] = {} - self.lcm_ns_tasks[nsr_id][order_id] = {"create_ns": task} + self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_instantiate": task} continue - elif command == "delete": + elif command == "terminate": # self.logger.debug("Deleting NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] self.cancel_tasks(topic, nsr_id) - task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id)) + task = asyncio.ensure_future(self.ns_terminate(nsr_id, nslcmop_id)) if nsr_id not in self.lcm_ns_tasks: self.lcm_ns_tasks[nsr_id] = {} - self.lcm_ns_tasks[nsr_id][order_id] = {"delete_ns": task} + self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_terminate": task} + continue + elif command == "action": + # self.logger.debug("Update NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns_action(nsr_id, nslcmop_id)) + if nsr_id not in self.lcm_ns_tasks: + self.lcm_ns_tasks[nsr_id] = {} + self.lcm_ns_tasks[nsr_id][nslcmop_id] = {"ns_action": task} continue elif command == "show": try: @@ -983,78 +1262,86 @@ class Lcm: "{}\n deploy: {}\n tasks: {}".format( nsr_id, db_nsr["operational-status"], db_nsr["config-status"], db_nsr["detailed-status"], - db_nsr["_admin"]["deploy"], self.lcm_ns_tasks.get(nsr_id))) + db_nsr["_admin"]["deployed"], self.lcm_ns_tasks.get(nsr_id))) except Exception as e: print("nsr {} not found: {}".format(nsr_id, e)) sys.stdout.flush() continue + elif command == "deleted": + continue # TODO cleaning of task just in case should be done elif topic == "vim_account": vim_id = params["_id"] if command == "create": - task = asyncio.ensure_future(self.create_vim(params, order_id)) + task = asyncio.ensure_future(self.vim_create(params, order_id)) if vim_id not in self.lcm_vim_tasks: self.lcm_vim_tasks[vim_id] = {} - self.lcm_vim_tasks[vim_id][order_id] = {"create_vim": task} + self.lcm_vim_tasks[vim_id][order_id] = {"vim_create": task} continue elif command == "delete": self.cancel_tasks(topic, vim_id) - task = asyncio.ensure_future(self.delete_vim(vim_id, order_id)) + task = asyncio.ensure_future(self.vim_delete(vim_id, order_id)) if vim_id not in self.lcm_vim_tasks: self.lcm_vim_tasks[vim_id] = {} - self.lcm_vim_tasks[vim_id][order_id] = {"delete_vim": task} + self.lcm_vim_tasks[vim_id][order_id] = {"vim_delete": task} continue elif command == "show": print("not implemented show with vim_account") sys.stdout.flush() continue elif command == "edit": - task = asyncio.ensure_future(self.edit_vim(vim_id, order_id)) + task = asyncio.ensure_future(self.vim_edit(vim_id, order_id)) if vim_id not in self.lcm_vim_tasks: self.lcm_vim_tasks[vim_id] = {} - self.lcm_vim_tasks[vim_id][order_id] = {"edit_vim": task} + self.lcm_vim_tasks[vim_id][order_id] = {"vim_edit": task} continue elif topic == "sdn": _sdn_id = params["_id"] if command == "create": - task = asyncio.ensure_future(self.create_sdn(params, order_id)) + task = asyncio.ensure_future(self.sdn_create(params, order_id)) if _sdn_id not in self.lcm_sdn_tasks: self.lcm_sdn_tasks[_sdn_id] = {} - self.lcm_sdn_tasks[_sdn_id][order_id] = {"create_sdn": task} + self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_create": task} continue elif command == "delete": self.cancel_tasks(topic, _sdn_id) - task = asyncio.ensure_future(self.delete_sdn(_sdn_id, order_id)) + task = asyncio.ensure_future(self.sdn_delete(_sdn_id, order_id)) if _sdn_id not in self.lcm_sdn_tasks: self.lcm_sdn_tasks[_sdn_id] = {} - self.lcm_sdn_tasks[_sdn_id][order_id] = {"delete_sdn": task} + self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_delete": task} continue elif command == "edit": - task = asyncio.ensure_future(self.edit_sdn(_sdn_id, order_id)) + task = asyncio.ensure_future(self.sdn_edit(_sdn_id, order_id)) if _sdn_id not in self.lcm_sdn_tasks: self.lcm_sdn_tasks[_sdn_id] = {} - self.lcm_sdn_tasks[_sdn_id][order_id] = {"edit_sdn": task} + self.lcm_sdn_tasks[_sdn_id][order_id] = {"sdn_edit": task} continue self.logger.critical("unknown topic {} and command '{}'".format(topic, command)) except Exception as e: - if consecutive_errors == 5: - self.logger.error("Task Kafka task exit error too many errors. Exception: {}".format(e)) - break - else: - consecutive_errors += 1 - self.logger.error("Task Kafka Exception {}".format(e)) - await asyncio.sleep(1, loop=self.loop) - self.logger.debug("Task Kafka terminating") - self.logger.debug("Task Kafka exit") + # if not first_start is the first time after starting. So leave more time and wait + # to allow kafka starts + if consecutive_errors == 8 if not first_start else 30: + self.logger.error("Task kafka_read task exit error too many errors. Exception: {}".format(e)) + raise + consecutive_errors += 1 + self.logger.error("Task kafka_read retrying after Exception {}".format(e)) + wait_time = 2 if not first_start else 5 + await asyncio.sleep(wait_time, loop=self.loop) + + # self.logger.debug("Task kafka_read terminating") + self.logger.debug("Task kafka_read exit") def start(self): self.loop = asyncio.get_event_loop() - self.loop.run_until_complete(self.read_kafka()) + self.loop.run_until_complete(asyncio.gather( + self.kafka_read(), + self.kafka_ping() + )) # TODO # self.logger.debug("Terminating cancelling creation tasks") # self.cancel_tasks("ALL", "create") # timeout = 200 # while self.is_pending_tasks(): - # self.logger.debug("Task Kafka terminating. Waiting for tasks termination") + # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination") # await asyncio.sleep(2, loop=self.loop) # timeout -= 2 # if not timeout: -- 2.25.1