X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FPOL.git;a=blobdiff_plain;f=osm_policy_module%2Fcore%2Fagent.py;h=52412a6d8ae54a9956adaaeed6cfdbf9fc142157;hp=410413f68a47e066d9bce0eb1b49740849e3aca9;hb=refs%2Fchanges%2F19%2F6619%2F2;hpb=d0fea7ea94e262be18a0f473eebd7e7239d892b2 diff --git a/osm_policy_module/core/agent.py b/osm_policy_module/core/agent.py index 410413f..52412a6 100644 --- a/osm_policy_module/core/agent.py +++ b/osm_policy_module/core/agent.py @@ -28,8 +28,8 @@ from json import JSONDecodeError import yaml from kafka import KafkaConsumer -from osm_common import dbmongo +from osm_policy_module.common.db_client import DbClient from osm_policy_module.common.lcm_client import LcmClient from osm_policy_module.common.mon_client import MonClient from osm_policy_module.core import database @@ -42,10 +42,7 @@ log = logging.getLogger(__name__) class PolicyModuleAgent: def __init__(self): cfg = Config.instance() - self.common_db = dbmongo.DbMongo() - self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST, - 'port': int(cfg.OSMPOL_DATABASE_PORT), - 'name': 'osm'}) + self.db_client = DbClient() self.mon_client = MonClient() self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, cfg.OSMPOL_MESSAGE_PORT) @@ -71,8 +68,7 @@ class PolicyModuleAgent: content = yaml.safe_load(msg) log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content) nslcmop_id = content['nslcmop_id'] - nslcmop = self.common_db.get_one(table="nslcmops", - filter={"_id": nslcmop_id}) + nslcmop = self.db_client.get_nslcmop(nslcmop_id) if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': nsr_id = nslcmop['nsInstanceId'] log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id) @@ -116,32 +112,13 @@ class PolicyModuleAgent: except Exception: log.exception("Error consuming message: ") - def _get_vnfr(self, nsr_id: str, member_index: int): - vnfr = self.common_db.get_one(table="vnfrs", - filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) - return vnfr - - def _get_vnfrs(self, nsr_id: str): - return [self._get_vnfr(nsr_id, member['member-vnf-index']) for member in - self._get_nsr(nsr_id)['nsd']['constituent-vnfd']] - - def _get_vnfd(self, vnfd_id: str): - vnfr = self.common_db.get_one(table="vnfds", - filter={"_id": vnfd_id}) - return vnfr - - def _get_nsr(self, nsr_id: str): - nsr = self.common_db.get_one(table="nsrs", - filter={"id": nsr_id}) - return nsr - def _configure_scaling_groups(self, nsr_id: str): # TODO(diazb): Check for alarm creation on exception and clean resources if needed. with database.db.atomic(): - vnfrs = self._get_vnfrs(nsr_id) + vnfrs = self.db_client.get_vnfrs(nsr_id) log.info("Checking %s vnfrs...", len(vnfrs)) for vnfr in vnfrs: - vnfd = self._get_vnfd(vnfr['vnfd-id']) + vnfd = self.db_client.get_vnfd(vnfr['vnfd-id']) log.info("Looking for vnfd %s", vnfr['vnfd-id']) scaling_groups = vnfd['scaling-group-descriptor'] vnf_monitoring_params = vnfd['monitoring-param']