From bee3bad8d15fe0893855d0dff92cef4351629edb Mon Sep 17 00:00:00 2001 From: tierno Date: Thu, 5 Dec 2019 12:26:01 +0000 Subject: [PATCH] fix bug 937. NSs are not deleted from database upon NSI deletion Change-Id: Ib69d8921da777e41749aa8511fd64d1d7cc6204a Signed-off-by: tierno --- osm_nbi/admin_topics.py | 22 +++++++++++++--------- osm_nbi/base_topic.py | 19 ++++++++++++------- osm_nbi/descriptor_topics.py | 3 ++- osm_nbi/engine.py | 7 +++++-- osm_nbi/instance_topics.py | 14 ++++++++------ osm_nbi/subscriptions.py | 20 +++++++++++++------- 6 files changed, 53 insertions(+), 32 deletions(-) diff --git a/osm_nbi/admin_topics.py b/osm_nbi/admin_topics.py index 5b37bf0..670629e 100644 --- a/osm_nbi/admin_topics.py +++ b/osm_nbi/admin_topics.py @@ -302,12 +302,13 @@ class CommonVimWimSdn(BaseTopic): return "{}:0".format(content["_id"]) - def delete(self, session, _id, dry_run=False): + def delete(self, session, _id, dry_run=False, not_send_msg=None): """ Delete item by its internal _id :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param _id: server internal id :param dry_run: make checking but do not delete + :param not_send_msg: To not send message (False) or store content (list) instead :return: operation id if it is ordered to delete. None otherwise """ @@ -344,7 +345,7 @@ class CommonVimWimSdn(BaseTopic): if session["force"]: self.db.del_one(self.topic, {"_id": _id}) op_id = None - self._send_msg("deleted", {"_id": _id, "op_id": op_id}) + self._send_msg("deleted", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg) else: update_dict["_admin.to_delete"] = True self.db.set_one(self.topic, {"_id": _id}, @@ -354,7 +355,7 @@ class CommonVimWimSdn(BaseTopic): # the number of operations is the operation_id. db_content does not contains the new operation inserted, # so the -1 is not needed op_id = "{}:{}".format(db_content["_id"], len(db_content["_admin"]["operations"])) - self._send_msg("delete", {"_id": _id, "op_id": op_id}) + self._send_msg("delete", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg) return op_id @@ -581,7 +582,7 @@ class UserTopicAuth(UserTopic): rollback.append({"topic": self.topic, "_id": _id}) # del content["password"] - # self._send_msg("created", content) + # self._send_msg("created", content, not_send_msg=not_send_msg) return _id, None except ValidationError as e: raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) @@ -733,7 +734,7 @@ class UserTopicAuth(UserTopic): user_list = [usr for usr in user_list if usr["username"] == session["username"]] return user_list - def delete(self, session, _id, dry_run=False): + def delete(self, session, _id, dry_run=False, not_send_msg=None): """ Delete item by its internal _id @@ -741,6 +742,7 @@ class UserTopicAuth(UserTopic): :param _id: server internal id :param force: indicates if deletion must be forced in case of conflict :param dry_run: make checking but do not delete + :param not_send_msg: To not send message (False) or store content (list) instead :return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ... """ # Allow _id to be a name or uuid @@ -865,7 +867,7 @@ class ProjectTopicAuth(ProjectTopic): self.format_on_new(content, project_id=session["project_id"], make_public=session["public"]) _id = self.auth.create_project(content) rollback.append({"topic": self.topic, "_id": _id}) - # self._send_msg("created", content) + # self._send_msg("created", content, not_send_msg=not_send_msg) return _id, None except ValidationError as e: raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) @@ -905,13 +907,14 @@ class ProjectTopicAuth(ProjectTopic): project_list = [proj for proj in project_list if proj["_id"] in projects] return project_list - def delete(self, session, _id, dry_run=False): + def delete(self, session, _id, dry_run=False, not_send_msg=None): """ Delete item by its internal _id :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param _id: server internal id :param dry_run: make checking but do not delete + :param not_send_msg: To not send message (False) or store content (list) instead :return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ... """ # Allow _id to be a name or uuid @@ -1194,18 +1197,19 @@ class RoleTopicAuth(BaseTopic): content["_id"] = rid # _id = self.db.create(self.topic, content) rollback.append({"topic": self.topic, "_id": rid}) - # self._send_msg("created", content) + # self._send_msg("created", content, not_send_msg=not_send_msg) return rid, None except ValidationError as e: raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - def delete(self, session, _id, dry_run=False): + def delete(self, session, _id, dry_run=False, not_send_msg=None): """ Delete item by its internal _id :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param _id: server internal id :param dry_run: make checking but do not delete + :param not_send_msg: To not send message (False) or store content (list) instead :return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ... """ filter_q = {BaseTopic.id_field(self.topic, _id): _id} diff --git a/osm_nbi/base_topic.py b/osm_nbi/base_topic.py index 1bc906c..52e02af 100644 --- a/osm_nbi/base_topic.py +++ b/osm_nbi/base_topic.py @@ -282,10 +282,13 @@ class BaseTopic: final_content["_admin"]["modified"] = now return None - def _send_msg(self, action, content): - if self.topic_msg: + def _send_msg(self, action, content, not_send_msg=None): + if self.topic_msg and not_send_msg is not False: content.pop("_admin", None) - self.msg.write(self.topic_msg, action, content) + if isinstance(not_send_msg, list): + not_send_msg.append((self.topic_msg, action, content)) + else: + self.msg.write(self.topic_msg, action, content) def check_conflict_on_del(self, session, _id, db_content): """ @@ -437,7 +440,7 @@ class BaseTopic: filter_q.update(self._get_project_filter(session)) return self.db.del_list(self.topic, filter_q) - def delete_extra(self, session, _id, db_content): + def delete_extra(self, session, _id, db_content, not_send_msg=None): """ Delete other things apart from database entry of a item _id. e.g.: other associated elements at database and other file system storage @@ -445,16 +448,18 @@ class BaseTopic: :param _id: server internal id :param db_content: The database content of the _id. It is already deleted when reached this method, but the content is needed in same cases + :param not_send_msg: To not send message (False) or store content (list) instead :return: None if ok or raises EngineException with the problem """ pass - def delete(self, session, _id, dry_run=False): + def delete(self, session, _id, dry_run=False, not_send_msg=None): """ Delete item by its internal _id :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param _id: server internal id :param dry_run: make checking but do not delete + :param not_send_msg: To not send message (False) or store content (list) instead :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ... """ @@ -483,8 +488,8 @@ class BaseTopic: return None else: self.db.del_one(self.topic, filter_q) - self.delete_extra(session, _id, item_content) - self._send_msg("deleted", {"_id": _id}) + self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg) + self._send_msg("deleted", {"_id": _id}, not_send_msg=not_send_msg) return None def edit(self, session, _id, indata=None, kwargs=None, content=None): diff --git a/osm_nbi/descriptor_topics.py b/osm_nbi/descriptor_topics.py index f388ad1..2b42d06 100644 --- a/osm_nbi/descriptor_topics.py +++ b/osm_nbi/descriptor_topics.py @@ -94,12 +94,13 @@ class DescriptorTopic(BaseTopic): content["_admin"]["operationalState"] = "DISABLED" content["_admin"]["usageState"] = "NOT_IN_USE" - def delete_extra(self, session, _id, db_content): + def delete_extra(self, session, _id, db_content, not_send_msg=None): """ Deletes file system storage associated with the descriptor :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param _id: server internal id :param db_content: The database content of the descriptor + :param not_send_msg: To not send message (False) or store content (list) instead :return: None if ok or raises EngineException with the problem """ self.fs.file_delete(_id, ignore_non_exist=True) diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index 5bdbb7e..3f83557 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -264,18 +264,21 @@ class Engine(object): with self.write_lock: return self.map_topic[topic].delete_list(session, _filter) - def del_item(self, session, topic, _id): + def del_item(self, session, topic, _id, not_send_msg=None): """ Delete item by its internal id :param session: contains the used login username and working project :param topic: it can be: users, projects, vnfds, nsds, ... :param _id: server id of the item + :param not_send_msg: If False, message will not be sent to kafka. + If a list, message is not sent, but content is stored in this variable so that the caller can send this + message using its own loop. If None, message is sent :return: dictionary with deleted item _id. It raises exception if not found. """ if topic not in self.map_topic: raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR) with self.write_lock: - return self.map_topic[topic].delete(session, _id) + return self.map_topic[topic].delete(session, _id, not_send_msg=not_send_msg) def edit_item(self, session, topic, _id, indata=None, kwargs=None): """ diff --git a/osm_nbi/instance_topics.py b/osm_nbi/instance_topics.py index decb9c7..139ced7 100644 --- a/osm_nbi/instance_topics.py +++ b/osm_nbi/instance_topics.py @@ -74,13 +74,14 @@ class NsrTopic(BaseTopic): "Launch 'terminate' operation first; or force deletion".format(_id), http_code=HTTPStatus.CONFLICT) - def delete_extra(self, session, _id, db_content): + def delete_extra(self, session, _id, db_content, not_send_msg=None): """ Deletes associated nslcmops and vnfrs from database. Deletes associated filesystem. Set usageState of pdu, vnfd, nsd :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param _id: server internal id :param db_content: The database content of the descriptor + :param not_send_msg: To not send message (False) or store content (list) instead :return: None if ok or raises EngineException with the problem """ self.fs.file_delete(_id, ignore_non_exist=True) @@ -458,7 +459,7 @@ class VnfrTopic(BaseTopic): def __init__(self, db, fs, msg, auth): BaseTopic.__init__(self, db, fs, msg, auth) - def delete(self, session, _id, dry_run=False): + def delete(self, session, _id, dry_run=False, not_send_msg=None): raise EngineException("Method delete called directly", HTTPStatus.INTERNAL_SERVER_ERROR) def edit(self, session, _id, indata=None, kwargs=None, content=None): @@ -999,7 +1000,7 @@ class NsLcmOpTopic(BaseTopic): # except DbException as e: # raise EngineException("Cannot get ns_instance '{}': {}".format(e), HTTPStatus.NOT_FOUND) - def delete(self, session, _id, dry_run=False): + def delete(self, session, _id, dry_run=False, not_send_msg=None): raise EngineException("Method delete called directly", HTTPStatus.INTERNAL_SERVER_ERROR) def edit(self, session, _id, indata=None, kwargs=None, content=None): @@ -1070,13 +1071,14 @@ class NsiTopic(BaseTopic): "Launch 'terminate' operation first; or force deletion".format(_id), http_code=HTTPStatus.CONFLICT) - def delete_extra(self, session, _id, db_content): + def delete_extra(self, session, _id, db_content, not_send_msg=None): """ Deletes associated nsilcmops from database. Deletes associated filesystem. Set usageState of nst :param session: contains "username", "admin", "force", "public", "project_id", "set_project" :param _id: server internal id :param db_content: The database content of the descriptor + :param not_send_msg: To not send message (False) or store content (list) instead :return: None if ok or raises EngineException with the problem """ @@ -1092,7 +1094,7 @@ class NsiTopic(BaseTopic): if nsi: # last one using nsr continue try: - self.nsrTopic.delete(session, nsr_id, dry_run=False) + self.nsrTopic.delete(session, nsr_id, dry_run=False, not_send_msg=not_send_msg) except (DbException, EngineException) as e: if e.http_code == HTTPStatus.NOT_FOUND: pass @@ -1510,7 +1512,7 @@ class NsiLcmOpTopic(BaseTopic): except ValidationError as e: raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - def delete(self, session, _id, dry_run=False): + def delete(self, session, _id, dry_run=False, not_send_msg=None): raise EngineException("Method delete called directly", HTTPStatus.INTERNAL_SERVER_ERROR) def edit(self, session, _id, indata=None, kwargs=None, content=None): diff --git a/osm_nbi/subscriptions.py b/osm_nbi/subscriptions.py index 39eed76..03bb92b 100644 --- a/osm_nbi/subscriptions.py +++ b/osm_nbi/subscriptions.py @@ -81,7 +81,7 @@ class SubscriptionThread(threading.Thread): kafka_working = True await asyncio.sleep(10, loop=self.loop) self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop, - callback=self._msg_callback), + aiocallback=self._msg_callback), loop=self.loop) await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop) except Exception as e: @@ -141,7 +141,7 @@ class SubscriptionThread(threading.Thread): self._stop() self.loop.close() - def _msg_callback(self, topic, command, params): + async def _msg_callback(self, topic, command, params): """ Callback to process a received message from kafka :param topic: topic received @@ -149,21 +149,27 @@ class SubscriptionThread(threading.Thread): :param params: rest of parameters :return: None """ + msg_to_send = [] try: if topic == "ns": if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"): self.logger.debug("received ns terminated {}".format(params)) if params.get("autoremove"): - self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"]) + self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"], + not_send_msg=msg_to_send) self.logger.debug("ns={} deleted from database".format(params["nsr_id"])) - return - if topic == "nsi": + elif topic == "nsi": if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"): self.logger.debug("received nsi terminated {}".format(params)) if params.get("autoremove"): - self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"]) + self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"], + not_send_msg=msg_to_send) self.logger.debug("nsis={} deleted from database".format(params["nsir_id"])) - return + + # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, + # but content to be written is stored at msg_to_send + for msg in msg_to_send: + await self.msg.aiowrite(*msg, loop=self.loop) except (EngineException, DbException, MsgException) as e: self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e)) except Exception as e: -- 2.17.1