return "{}:0".format(content["_id"])
- def delete(self, session, _id, dry_run=False):
+ def delete(self, session, _id, dry_run=False, not_send_msg=None):
"""
Delete item by its internal _id
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: server internal id
:param dry_run: make checking but do not delete
+ :param not_send_msg: To not send message (False) or store content (list) instead
:return: operation id if it is ordered to delete. None otherwise
"""
if session["force"]:
self.db.del_one(self.topic, {"_id": _id})
op_id = None
- self._send_msg("deleted", {"_id": _id, "op_id": op_id})
+ self._send_msg("deleted", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg)
else:
update_dict["_admin.to_delete"] = True
self.db.set_one(self.topic, {"_id": _id},
# the number of operations is the operation_id. db_content does not contains the new operation inserted,
# so the -1 is not needed
op_id = "{}:{}".format(db_content["_id"], len(db_content["_admin"]["operations"]))
- self._send_msg("delete", {"_id": _id, "op_id": op_id})
+ self._send_msg("delete", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg)
return op_id
rollback.append({"topic": self.topic, "_id": _id})
# del content["password"]
- # self._send_msg("created", content)
+ # self._send_msg("created", content, not_send_msg=not_send_msg)
return _id, None
except ValidationError as e:
raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
user_list = [usr for usr in user_list if usr["username"] == session["username"]]
return user_list
- def delete(self, session, _id, dry_run=False):
+ def delete(self, session, _id, dry_run=False, not_send_msg=None):
"""
Delete item by its internal _id
:param _id: server internal id
:param force: indicates if deletion must be forced in case of conflict
:param dry_run: make checking but do not delete
+ :param not_send_msg: To not send message (False) or store content (list) instead
:return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
"""
# Allow _id to be a name or uuid
self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
_id = self.auth.create_project(content)
rollback.append({"topic": self.topic, "_id": _id})
- # self._send_msg("created", content)
+ # self._send_msg("created", content, not_send_msg=not_send_msg)
return _id, None
except ValidationError as e:
raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
project_list = [proj for proj in project_list if proj["_id"] in projects]
return project_list
- def delete(self, session, _id, dry_run=False):
+ def delete(self, session, _id, dry_run=False, not_send_msg=None):
"""
Delete item by its internal _id
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: server internal id
:param dry_run: make checking but do not delete
+ :param not_send_msg: To not send message (False) or store content (list) instead
:return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
"""
# Allow _id to be a name or uuid
content["_id"] = rid
# _id = self.db.create(self.topic, content)
rollback.append({"topic": self.topic, "_id": rid})
- # self._send_msg("created", content)
+ # self._send_msg("created", content, not_send_msg=not_send_msg)
return rid, None
except ValidationError as e:
raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
- def delete(self, session, _id, dry_run=False):
+ def delete(self, session, _id, dry_run=False, not_send_msg=None):
"""
Delete item by its internal _id
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: server internal id
:param dry_run: make checking but do not delete
+ :param not_send_msg: To not send message (False) or store content (list) instead
:return: dictionary with deleted item _id. It raises EngineException on error: not found, conflict, ...
"""
filter_q = {BaseTopic.id_field(self.topic, _id): _id}
final_content["_admin"]["modified"] = now
return None
- def _send_msg(self, action, content):
- if self.topic_msg:
+ def _send_msg(self, action, content, not_send_msg=None):
+ if self.topic_msg and not_send_msg is not False:
content.pop("_admin", None)
- self.msg.write(self.topic_msg, action, content)
+ if isinstance(not_send_msg, list):
+ not_send_msg.append((self.topic_msg, action, content))
+ else:
+ self.msg.write(self.topic_msg, action, content)
def check_conflict_on_del(self, session, _id, db_content):
"""
filter_q.update(self._get_project_filter(session))
return self.db.del_list(self.topic, filter_q)
- def delete_extra(self, session, _id, db_content):
+ def delete_extra(self, session, _id, db_content, not_send_msg=None):
"""
Delete other things apart from database entry of a item _id.
e.g.: other associated elements at database and other file system storage
:param _id: server internal id
:param db_content: The database content of the _id. It is already deleted when reached this method, but the
content is needed in same cases
+ :param not_send_msg: To not send message (False) or store content (list) instead
:return: None if ok or raises EngineException with the problem
"""
pass
- def delete(self, session, _id, dry_run=False):
+ def delete(self, session, _id, dry_run=False, not_send_msg=None):
"""
Delete item by its internal _id
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: server internal id
:param dry_run: make checking but do not delete
+ :param not_send_msg: To not send message (False) or store content (list) instead
:return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
"""
return None
else:
self.db.del_one(self.topic, filter_q)
- self.delete_extra(session, _id, item_content)
- self._send_msg("deleted", {"_id": _id})
+ self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg)
+ self._send_msg("deleted", {"_id": _id}, not_send_msg=not_send_msg)
return None
def edit(self, session, _id, indata=None, kwargs=None, content=None):
content["_admin"]["operationalState"] = "DISABLED"
content["_admin"]["usageState"] = "NOT_IN_USE"
- def delete_extra(self, session, _id, db_content):
+ def delete_extra(self, session, _id, db_content, not_send_msg=None):
"""
Deletes file system storage associated with the descriptor
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: server internal id
:param db_content: The database content of the descriptor
+ :param not_send_msg: To not send message (False) or store content (list) instead
:return: None if ok or raises EngineException with the problem
"""
self.fs.file_delete(_id, ignore_non_exist=True)
with self.write_lock:
return self.map_topic[topic].delete_list(session, _filter)
- def del_item(self, session, topic, _id):
+ def del_item(self, session, topic, _id, not_send_msg=None):
"""
Delete item by its internal id
:param session: contains the used login username and working project
:param topic: it can be: users, projects, vnfds, nsds, ...
:param _id: server id of the item
+ :param not_send_msg: If False, message will not be sent to kafka.
+ If a list, message is not sent, but content is stored in this variable so that the caller can send this
+ message using its own loop. If None, message is sent
:return: dictionary with deleted item _id. It raises exception if not found.
"""
if topic not in self.map_topic:
raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
with self.write_lock:
- return self.map_topic[topic].delete(session, _id)
+ return self.map_topic[topic].delete(session, _id, not_send_msg=not_send_msg)
def edit_item(self, session, topic, _id, indata=None, kwargs=None):
"""
"Launch 'terminate' operation first; or force deletion".format(_id),
http_code=HTTPStatus.CONFLICT)
- def delete_extra(self, session, _id, db_content):
+ def delete_extra(self, session, _id, db_content, not_send_msg=None):
"""
Deletes associated nslcmops and vnfrs from database. Deletes associated filesystem.
Set usageState of pdu, vnfd, nsd
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: server internal id
:param db_content: The database content of the descriptor
+ :param not_send_msg: To not send message (False) or store content (list) instead
:return: None if ok or raises EngineException with the problem
"""
self.fs.file_delete(_id, ignore_non_exist=True)
def __init__(self, db, fs, msg, auth):
BaseTopic.__init__(self, db, fs, msg, auth)
- def delete(self, session, _id, dry_run=False):
+ def delete(self, session, _id, dry_run=False, not_send_msg=None):
raise EngineException("Method delete called directly", HTTPStatus.INTERNAL_SERVER_ERROR)
def edit(self, session, _id, indata=None, kwargs=None, content=None):
# except DbException as e:
# raise EngineException("Cannot get ns_instance '{}': {}".format(e), HTTPStatus.NOT_FOUND)
- def delete(self, session, _id, dry_run=False):
+ def delete(self, session, _id, dry_run=False, not_send_msg=None):
raise EngineException("Method delete called directly", HTTPStatus.INTERNAL_SERVER_ERROR)
def edit(self, session, _id, indata=None, kwargs=None, content=None):
"Launch 'terminate' operation first; or force deletion".format(_id),
http_code=HTTPStatus.CONFLICT)
- def delete_extra(self, session, _id, db_content):
+ def delete_extra(self, session, _id, db_content, not_send_msg=None):
"""
Deletes associated nsilcmops from database. Deletes associated filesystem.
Set usageState of nst
:param session: contains "username", "admin", "force", "public", "project_id", "set_project"
:param _id: server internal id
:param db_content: The database content of the descriptor
+ :param not_send_msg: To not send message (False) or store content (list) instead
:return: None if ok or raises EngineException with the problem
"""
if nsi: # last one using nsr
continue
try:
- self.nsrTopic.delete(session, nsr_id, dry_run=False)
+ self.nsrTopic.delete(session, nsr_id, dry_run=False, not_send_msg=not_send_msg)
except (DbException, EngineException) as e:
if e.http_code == HTTPStatus.NOT_FOUND:
pass
except ValidationError as e:
raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
- def delete(self, session, _id, dry_run=False):
+ def delete(self, session, _id, dry_run=False, not_send_msg=None):
raise EngineException("Method delete called directly", HTTPStatus.INTERNAL_SERVER_ERROR)
def edit(self, session, _id, indata=None, kwargs=None, content=None):
kafka_working = True
await asyncio.sleep(10, loop=self.loop)
self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
- callback=self._msg_callback),
+ aiocallback=self._msg_callback),
loop=self.loop)
await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
except Exception as e:
self._stop()
self.loop.close()
- def _msg_callback(self, topic, command, params):
+ async def _msg_callback(self, topic, command, params):
"""
Callback to process a received message from kafka
:param topic: topic received
:param params: rest of parameters
:return: None
"""
+ msg_to_send = []
try:
if topic == "ns":
if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
self.logger.debug("received ns terminated {}".format(params))
if params.get("autoremove"):
- self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"])
+ self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
+ not_send_msg=msg_to_send)
self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
- return
- if topic == "nsi":
+ elif topic == "nsi":
if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
self.logger.debug("received nsi terminated {}".format(params))
if params.get("autoremove"):
- self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"])
+ self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
+ not_send_msg=msg_to_send)
self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
- return
+
+ # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
+ # but content to be written is stored at msg_to_send
+ for msg in msg_to_send:
+ await self.msg.aiowrite(*msg, loop=self.loop)
except (EngineException, DbException, MsgException) as e:
self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
except Exception as e: