NsiTopic,
NsiLcmOpTopic,
)
+from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic
from osm_nbi.pmjobs_topics import PmJobsTopic
from osm_nbi.subscription_topics import NslcmSubscriptionsTopic
+from osm_nbi.osm_vnfm.vnf_subscription import VnflcmSubscriptionsTopic
from base64 import b64encode
from os import urandom # , path
from threading import Lock
"nsilcmops": NsiLcmOpTopic,
"vnfpkgops": VnfPkgOpTopic,
"nslcm_subscriptions": NslcmSubscriptionsTopic,
+ "vnf_instances": VnfInstances,
+ "vnflcmops": VnfLcmOpTopic,
+ "vnflcm_subscriptions": VnflcmSubscriptionsTopic,
# [NEW_TOPIC]: add an entry here
# "pm_jobs": PmJobsTopic will be added manually because it needs other parameters
}
:return: The list, it can be empty if no one match the filter_q.
"""
if topic not in self.map_topic:
- raise EngineException(
- "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
- )
+ raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
return self.map_topic[topic].list(session, filter_q, api_req)
- def get_item(self, session, topic, _id, api_req=False):
+ def get_item(self, session, topic, _id, filter_q=None, api_req=False):
"""
Get complete information on an item
:param session: contains the used login username and working project
:param topic: it can be: users, projects, vnfds, nsds,
:param _id: server id of the item
+ :param filter_q: other arguments
:param api_req: True if this call is serving an external API request. False if serving internal request.
:return: dictionary, raise exception if not found.
"""
if topic not in self.map_topic:
- raise EngineException(
- "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
- )
- return self.map_topic[topic].show(session, _id, api_req)
+ raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ return self.map_topic[topic].show(session, _id, filter_q, api_req)
def get_file(self, session, topic, _id, path=None, accept_header=None):
"""