from admin_topics import UserTopicAuth, ProjectTopicAuth, RoleTopicAuth
from descriptor_topics import VnfdTopic, NsdTopic, PduTopic, NstTopic
from instance_topics import NsrTopic, VnfrTopic, NsLcmOpTopic, NsiTopic, NsiLcmOpTopic
+from pmjobs_topics import PmJobsTopic
from base64 import b64encode
from os import urandom, path
from threading import Lock
"nsis": NsiTopic,
"nsilcmops": NsiLcmOpTopic
# [NEW_TOPIC]: add an entry here
+ # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters
}
def __init__(self):
self.operations)
else:
self.map_topic[topic] = topic_class(self.db, self.fs, self.msg)
+ self.map_topic[topic] = topic_class(self.db, self.fs, self.msg)
+ self.map_topic["pm_jobs"] = PmJobsTopic(config["prometheus"].get("host"), config["prometheus"].get("port"))
except (DbException, FsException, MsgException) as e:
raise EngineException(str(e), http_code=e.http_code)