X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=fd97f833d354ea05e843b7873b388d5616e26d5f;hb=27153c4846a5caeee4a430527fd87cd322408f4b;hp=cdc85487cb53b194a98ffdd5b0dfb549d8097293;hpb=53ad3c4c5d87a5b392a74cf386c29c67276ed3cb;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index cdc8548..fd97f83 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -27,8 +27,8 @@ import threading import six import yaml from kafka import KafkaConsumer -from osm_common import dbmongo +from osm_mon.common.common_db_client import CommonDbClient from osm_mon.core.auth import AuthManager from osm_mon.core.database import DatabaseManager from osm_mon.core.settings import Config @@ -79,9 +79,7 @@ class CommonConsumer: self.vrops_rcvr = plugin_receiver.PluginReceiver() log.info("Connecting to MongoDB...") - self.common_db = dbmongo.DbMongo() - common_db_uri = cfg.MONGO_URI.split(':') - self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'}) + self.common_db = CommonDbClient() log.info("Connection successful.") def get_vim_type(self, vim_uuid): @@ -89,24 +87,13 @@ class CommonConsumer: credentials = self.database_manager.get_credentials(vim_uuid) return credentials.type - def get_vdur(self, nsr_id, member_index, vdu_name): - vnfr = self.get_vnfr(nsr_id, member_index) - for vdur in vnfr['vdur']: - if vdur['name'] == vdu_name: - return vdur - raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, - vdu_name) - - def get_vnfr(self, nsr_id, member_index): - vnfr = self.common_db.get_one("vnfrs", - {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) - return vnfr - def run(self): common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - group_id="mon-consumer") + group_id="mon-consumer", + session_timeout_ms=60000, + heartbeat_interval_ms=20000) topics = ['metric_request', 'alarm_request', 'vim_account'] common_consumer.subscribe(topics) @@ -149,12 +136,12 @@ class CommonConsumer: vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index'] # Check the vim desired by the message - vnfr = self.get_vnfr(ns_id, vnf_index) + vnfr = self.common_db.get_vnfr(ns_id, vnf_index) vim_uuid = vnfr['vim-account-id'] if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values: vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name'] - vdur = self.get_vdur(ns_id, vnf_index, vdu_name) + vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name) if contains_list: values[list_index]['resource_uuid'] = vdur['vim-id'] else: @@ -182,7 +169,7 @@ class CommonConsumer: elif vim_type == "vmware": log.info("This metric_request message is for the vROPs plugin.") - self.vrops_rcvr.consume(message,vim_uuid) + self.vrops_rcvr.consume(message, vim_uuid) else: log.debug("vim_type is misconfigured or unsupported; %s",