from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic
from osm_nbi.pmjobs_topics import PmJobsTopic
from osm_nbi.subscription_topics import NslcmSubscriptionsTopic
+from osm_nbi.osm_vnfm.vnf_subscription import VnflcmSubscriptionsTopic
from base64 import b64encode
from os import urandom # , path
from threading import Lock
"nslcm_subscriptions": NslcmSubscriptionsTopic,
"vnf_instances": VnfInstances,
"vnflcmops": VnfLcmOpTopic,
+ "vnflcm_subscriptions": VnflcmSubscriptionsTopic,
# [NEW_TOPIC]: add an entry here
# "pm_jobs": PmJobsTopic will be added manually because it needs other parameters
}
:return: The list, it can be empty if no one match the filter_q.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
return self.map_topic[topic].list(session, filter_q, api_req)
def get_item(self, session, topic, _id, filter_q=None, api_req=False):
:return: dictionary, raise exception if not found.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
return self.map_topic[topic].show(session, _id, filter_q, api_req)
def get_file(self, session, topic, _id, path=None, accept_header=None):