fix bug 937. NSs are not deleted from database upon NSI deletion 29/8329/2 v7.0.0rc1
authortierno <alfonso.tiernosepulveda@telefonica.com>
Thu, 5 Dec 2019 12:26:01 +0000 (12:26 +0000)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Thu, 5 Dec 2019 16:30:38 +0000 (16:30 +0000)
Change-Id: Ib69d8921da777e41749aa8511fd64d1d7cc6204a
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
osm_nbi/admin_topics.py
osm_nbi/base_topic.py
osm_nbi/descriptor_topics.py
osm_nbi/engine.py
osm_nbi/instance_topics.py
osm_nbi/subscriptions.py

index 5b37bf0..670629e 100644 (file)
@@ -302,12 +302,13 @@ class CommonVimWimSdn(BaseTopic):
 
         return "{}:0".format(content["_id"])
 
-    def delete(self, session, _id, dry_run=False):
+    def delete(self, session, _id, dry_run=False, not_send_msg=None):
         """
         Delete item by its internal _id
         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
         :param _id: server internal id
         :param dry_run: make checking but do not delete
+        :param not_send_msg: To not send message (False) or store content (list) instead
         :return: operation id if it is ordered to delete. None otherwise
         """
 
@@ -344,7 +345,7 @@ class CommonVimWimSdn(BaseTopic):
         if session["force"]:
             self.db.del_one(self.topic, {"_id": _id})
             op_id = None
-            self._send_msg("deleted", {"_id": _id, "op_id": op_id})
+            self._send_msg("deleted", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg)
         else:
             update_dict["_admin.to_delete"] = True
             self.db.set_one(self.topic, {"_id": _id},
@@ -354,7 +355,7 @@ class CommonVimWimSdn(BaseTopic):
             # the number of operations is the operation_id. db_content does not contains the new operation inserted,
             # so the -1 is not needed
             op_id = "{}:{}".format(db_content["_id"], len(db_content["_admin"]["operations"]))
-            self._send_msg("delete", {"_id": _id, "op_id": op_id})
+            self._send_msg("delete", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg)
         return op_id
 
 
@@ -581,7 +582,7 @@ class UserTopicAuth(UserTopic):
 
             rollback.append({"topic": self.topic, "_id": _id})
             # del content["password"]
-            # self._send_msg("created", content)
+            # self._send_msg("created", content, not_send_msg=not_send_msg)
             return _id, None
         except ValidationError as e:
             raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
@@ -733,7 +734,7 @@ class UserTopicAuth(UserTopic):
             user_list = [usr for usr in user_list if usr["username"] == session["username"]]
         return user_list
 
-    def delete(self, session, _id, dry_run=False):
+    def delete(self, session, _id, dry_run=False, not_send_msg=None):
         """
         Delete item by its internal _id
 
@@ -741,6 +742,7 @@ class UserTopicAuth(UserTopic):
         :param _id: server internal id
         :param force: indicates if deletion must be forced in case of conflict
         :param dry_run: make checking but do not delete
+        :param not_send_msg: To not send message (False) or store content (list) instead
         :return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
         """
         # Allow _id to be a name or uuid
@@ -865,7 +867,7 @@ class ProjectTopicAuth(ProjectTopic):
             self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
             _id = self.auth.create_project(content)
             rollback.append({"topic": self.topic, "_id": _id})
-            # self._send_msg("created", content)
+            # self._send_msg("created", content, not_send_msg=not_send_msg)
             return _id, None
         except ValidationError as e:
             raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
@@ -905,13 +907,14 @@ class ProjectTopicAuth(ProjectTopic):
             project_list = [proj for proj in project_list if proj["_id"] in projects]
         return project_list
 
-    def delete(self, session, _id, dry_run=False):
+    def delete(self, session, _id, dry_run=False, not_send_msg=None):
         """
         Delete item by its internal _id
 
         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
         :param _id: server internal id
         :param dry_run: make checking but do not delete
+        :param not_send_msg: To not send message (False) or store content (list) instead
         :return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
         """
         # Allow _id to be a name or uuid
@@ -1194,18 +1197,19 @@ class RoleTopicAuth(BaseTopic):
             content["_id"] = rid
             # _id = self.db.create(self.topic, content)
             rollback.append({"topic": self.topic, "_id": rid})
-            # self._send_msg("created", content)
+            # self._send_msg("created", content, not_send_msg=not_send_msg)
             return rid, None
         except ValidationError as e:
             raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
 
-    def delete(self, session, _id, dry_run=False):
+    def delete(self, session, _id, dry_run=False, not_send_msg=None):
         """
         Delete item by its internal _id
 
         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
         :param _id: server internal id
         :param dry_run: make checking but do not delete
+        :param not_send_msg: To not send message (False) or store content (list) instead
         :return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
         """
         filter_q = {BaseTopic.id_field(self.topic, _id): _id}
index 1bc906c..52e02af 100644 (file)
@@ -282,10 +282,13 @@ class BaseTopic:
             final_content["_admin"]["modified"] = now
         return None
 
-    def _send_msg(self, action, content):
-        if self.topic_msg:
+    def _send_msg(self, action, content, not_send_msg=None):
+        if self.topic_msg and not_send_msg is not False:
             content.pop("_admin", None)
-            self.msg.write(self.topic_msg, action, content)
+            if isinstance(not_send_msg, list):
+                not_send_msg.append((self.topic_msg, action, content))
+            else:
+                self.msg.write(self.topic_msg, action, content)
 
     def check_conflict_on_del(self, session, _id, db_content):
         """
@@ -437,7 +440,7 @@ class BaseTopic:
             filter_q.update(self._get_project_filter(session))
         return self.db.del_list(self.topic, filter_q)
 
-    def delete_extra(self, session, _id, db_content):
+    def delete_extra(self, session, _id, db_content, not_send_msg=None):
         """
         Delete other things apart from database entry of a item _id.
         e.g.: other associated elements at database and other file system storage
@@ -445,16 +448,18 @@ class BaseTopic:
         :param _id: server internal id
         :param db_content: The database content of the _id. It is already deleted when reached this method, but the
             content is needed in same cases
+        :param not_send_msg: To not send message (False) or store content (list) instead
         :return: None if ok or raises EngineException with the problem
         """
         pass
 
-    def delete(self, session, _id, dry_run=False):
+    def delete(self, session, _id, dry_run=False, not_send_msg=None):
         """
         Delete item by its internal _id
         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
         :param _id: server internal id
         :param dry_run: make checking but do not delete
+        :param not_send_msg: To not send message (False) or store content (list) instead
         :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
         """
 
@@ -483,8 +488,8 @@ class BaseTopic:
                 return None
         else:
             self.db.del_one(self.topic, filter_q)
-        self.delete_extra(session, _id, item_content)
-        self._send_msg("deleted", {"_id": _id})
+        self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg)
+        self._send_msg("deleted", {"_id": _id}, not_send_msg=not_send_msg)
         return None
 
     def edit(self, session, _id, indata=None, kwargs=None, content=None):
index f388ad1..2b42d06 100644 (file)
@@ -94,12 +94,13 @@ class DescriptorTopic(BaseTopic):
         content["_admin"]["operationalState"] = "DISABLED"
         content["_admin"]["usageState"] = "NOT_IN_USE"
 
-    def delete_extra(self, session, _id, db_content):
+    def delete_extra(self, session, _id, db_content, not_send_msg=None):
         """
         Deletes file system storage associated with the descriptor
         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
         :param _id: server internal id
         :param db_content: The database content of the descriptor
+        :param not_send_msg: To not send message (False) or store content (list) instead
         :return: None if ok or raises EngineException with the problem
         """
         self.fs.file_delete(_id, ignore_non_exist=True)
index 5bdbb7e..3f83557 100644 (file)
@@ -264,18 +264,21 @@ class Engine(object):
         with self.write_lock:
             return self.map_topic[topic].delete_list(session, _filter)
 
-    def del_item(self, session, topic, _id):
+    def del_item(self, session, topic, _id, not_send_msg=None):
         """
         Delete item by its internal id
         :param session: contains the used login username and working project
         :param topic: it can be: users, projects, vnfds, nsds, ...
         :param _id: server id of the item
+        :param not_send_msg: If False, message will not be sent to kafka.
+            If a list, message is not sent, but content is stored in this variable so that the caller can send this
+            message using its own loop. If None, message is sent
         :return: dictionary with deleted item _id. It raises exception if not found.
         """
         if topic not in self.map_topic:
             raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
         with self.write_lock:
-            return self.map_topic[topic].delete(session, _id)
+            return self.map_topic[topic].delete(session, _id, not_send_msg=not_send_msg)
 
     def edit_item(self, session, topic, _id, indata=None, kwargs=None):
         """
index decb9c7..139ced7 100644 (file)
@@ -74,13 +74,14 @@ class NsrTopic(BaseTopic):
                                   "Launch 'terminate' operation first; or force deletion".format(_id),
                                   http_code=HTTPStatus.CONFLICT)
 
-    def delete_extra(self, session, _id, db_content):
+    def delete_extra(self, session, _id, db_content, not_send_msg=None):
         """
         Deletes associated nslcmops and vnfrs from database. Deletes associated filesystem.
          Set usageState of pdu, vnfd, nsd
         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
         :param _id: server internal id
         :param db_content: The database content of the descriptor
+        :param not_send_msg: To not send message (False) or store content (list) instead
         :return: None if ok or raises EngineException with the problem
         """
         self.fs.file_delete(_id, ignore_non_exist=True)
@@ -458,7 +459,7 @@ class VnfrTopic(BaseTopic):
     def __init__(self, db, fs, msg, auth):
         BaseTopic.__init__(self, db, fs, msg, auth)
 
-    def delete(self, session, _id, dry_run=False):
+    def delete(self, session, _id, dry_run=False, not_send_msg=None):
         raise EngineException("Method delete called directly", HTTPStatus.INTERNAL_SERVER_ERROR)
 
     def edit(self, session, _id, indata=None, kwargs=None, content=None):
@@ -999,7 +1000,7 @@ class NsLcmOpTopic(BaseTopic):
         # except DbException as e:
         #     raise EngineException("Cannot get ns_instance '{}': {}".format(e), HTTPStatus.NOT_FOUND)
 
-    def delete(self, session, _id, dry_run=False):
+    def delete(self, session, _id, dry_run=False, not_send_msg=None):
         raise EngineException("Method delete called directly", HTTPStatus.INTERNAL_SERVER_ERROR)
 
     def edit(self, session, _id, indata=None, kwargs=None, content=None):
@@ -1070,13 +1071,14 @@ class NsiTopic(BaseTopic):
                                   "Launch 'terminate' operation first; or force deletion".format(_id),
                                   http_code=HTTPStatus.CONFLICT)
 
-    def delete_extra(self, session, _id, db_content):
+    def delete_extra(self, session, _id, db_content, not_send_msg=None):
         """
         Deletes associated nsilcmops from database. Deletes associated filesystem.
          Set usageState of nst
         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
         :param _id: server internal id
         :param db_content: The database content of the descriptor
+        :param not_send_msg: To not send message (False) or store content (list) instead
         :return: None if ok or raises EngineException with the problem
         """
 
@@ -1092,7 +1094,7 @@ class NsiTopic(BaseTopic):
                 if nsi:  # last one using nsr
                     continue
             try:
-                self.nsrTopic.delete(session, nsr_id, dry_run=False)
+                self.nsrTopic.delete(session, nsr_id, dry_run=False, not_send_msg=not_send_msg)
             except (DbException, EngineException) as e:
                 if e.http_code == HTTPStatus.NOT_FOUND:
                     pass
@@ -1510,7 +1512,7 @@ class NsiLcmOpTopic(BaseTopic):
         except ValidationError as e:
             raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
 
-    def delete(self, session, _id, dry_run=False):
+    def delete(self, session, _id, dry_run=False, not_send_msg=None):
         raise EngineException("Method delete called directly", HTTPStatus.INTERNAL_SERVER_ERROR)
 
     def edit(self, session, _id, indata=None, kwargs=None, content=None):
index 39eed76..03bb92b 100644 (file)
@@ -81,7 +81,7 @@ class SubscriptionThread(threading.Thread):
                     kafka_working = True
                 await asyncio.sleep(10, loop=self.loop)
                 self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
-                                                                           callback=self._msg_callback),
+                                                                           aiocallback=self._msg_callback),
                                                           loop=self.loop)
                 await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
             except Exception as e:
@@ -141,7 +141,7 @@ class SubscriptionThread(threading.Thread):
         self._stop()
         self.loop.close()
 
-    def _msg_callback(self, topic, command, params):
+    async def _msg_callback(self, topic, command, params):
         """
         Callback to process a received message from kafka
         :param topic:  topic received
@@ -149,21 +149,27 @@ class SubscriptionThread(threading.Thread):
         :param params: rest of parameters
         :return: None
         """
+        msg_to_send = []
         try:
             if topic == "ns":
                 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
                     self.logger.debug("received ns terminated {}".format(params))
                     if params.get("autoremove"):
-                        self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"])
+                        self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
+                                             not_send_msg=msg_to_send)
                         self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
-                    return
-            if topic == "nsi":
+            elif topic == "nsi":
                 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
                     self.logger.debug("received nsi terminated {}".format(params))
                     if params.get("autoremove"):
-                        self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"])
+                        self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
+                                             not_send_msg=msg_to_send)
                         self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
-                    return
+
+            # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, 
+            # but content to be written is stored at msg_to_send
+            for msg in msg_to_send:
+                await self.msg.aiowrite(*msg, loop=self.loop)
         except (EngineException, DbException, MsgException) as e:
             self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
         except Exception as e: