NsiTopic,
NsiLcmOpTopic,
)
+from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic
from osm_nbi.pmjobs_topics import PmJobsTopic
from osm_nbi.subscription_topics import NslcmSubscriptionsTopic
+from osm_nbi.osm_vnfm.vnf_subscription import VnflcmSubscriptionsTopic
from base64 import b64encode
from os import urandom # , path
from threading import Lock
"nsilcmops": NsiLcmOpTopic,
"vnfpkgops": VnfPkgOpTopic,
"nslcm_subscriptions": NslcmSubscriptionsTopic,
+ "vnf_instances": VnfInstances,
+ "vnflcmops": VnfLcmOpTopic,
+ "vnflcm_subscriptions": VnflcmSubscriptionsTopic,
# [NEW_TOPIC]: add an entry here
# "pm_jobs": PmJobsTopic will be added manually because it needs other parameters
}
# "resources_to_operations file missing")
#
# with open(resources_to_operations_file, 'r') as f:
- # resources_to_operations = yaml.load(f, Loader=yaml.Loader)
+ # resources_to_operations = yaml.safeload(f)
#
# self.operations = []
#
:return: The list, it can be empty if no one match the filter_q.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
return self.map_topic[topic].list(session, filter_q, api_req)
def get_item(self, session, topic, _id, filter_q=None, api_req=False):
:return: dictionary, raise exception if not found.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
return self.map_topic[topic].show(session, _id, filter_q, api_req)
def get_file(self, session, topic, _id, path=None, accept_header=None):