+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
+ with self.write_lock:
+ return self.map_topic[topic].upload_content(
+ session, _id, indata, kwargs, headers
+ )
+
+ def clone(
+ self, rollback, session, topic, _id, indata=None, kwargs=None, headers=None
+ ):
+ if topic not in self.map_topic:
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
+ with self.write_lock:
+ return self.map_topic[topic].clone(
+ rollback, session, _id, indata, kwargs, headers
+ )
+
+ def move_ksu(self, session, topic, _id, indata=None, kwargs=None):
+ if topic not in self.map_topic:
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
+
+ with self.write_lock:
+ return self.map_topic[topic].move_ksu(session, _id, indata, kwargs)
+
+ def get_cluster_info(self, session, topic, _id, item):
+ if topic not in self.map_topic:
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
+ return self.map_topic[topic].get_cluster_info(session, _id, item)
+
+ def update_cluster(self, session, topic, _id, item, indata):
+ if topic not in self.map_topic:
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
+ return self.map_topic[topic].update_cluster(session, _id, item, indata)
+
+ def delete_ksu(self, session, topic, _id, indata):
+ if topic not in self.map_topic:
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )