self.mon_client = MonClient(config, loop=self.loop)
self.lcm_client = LcmClient(config, loop=self.loop)
- async def configure_vnf_alarms(self, nsr_id: str):
+ async def configure_vnf_alarms(self, nsr_id: str, vnf_member_index=None):
log.info("Configuring vnf alarms for network service %s", nsr_id)
alarms_created = []
database.db.connect()
try:
with database.db.atomic():
- vnfrs = self.db_client.get_vnfrs(nsr_id)
+ if vnf_member_index is None:
+ vnfrs = self.db_client.get_vnfrs(nsr_id)
+ else:
+ vnfrs = []
+ vnfr = self.db_client.get_vnfr(nsr_id, vnf_member_index)
+ vnfrs.append(vnfr)
+ # vnfrs = self.db_client.get_vnfrs(nsr_id)
for vnfr in vnfrs:
log.debug("Processing vnfr: %s", vnfr)
vnfd = self.db_client.get_vnfd(vnfr["vnfd-id"])
self.mon_client = MonClient(config, loop=self.loop)
self.lcm_client = LcmClient(config, loop=self.loop)
- async def configure_scaling_groups(self, nsr_id: str):
+ async def configure_scaling_groups(self, nsr_id: str, vnf_member_index=None):
"""
Configures scaling groups for a network service. Creates records in DB. Creates alarms in MON.
:param nsr_id: Network service record id
try:
with database.db.atomic() as tx:
try:
- vnfrs = self.db_client.get_vnfrs(nsr_id)
+ if vnf_member_index is None:
+ vnfrs = self.db_client.get_vnfrs(nsr_id)
+ else:
+ vnfrs = []
+ vnfr = self.db_client.get_vnfr(nsr_id, vnf_member_index)
+ vnfrs.append(vnfr)
+ # vnfrs = self.db_client.get_vnfrs(nsr_id)
for vnfr in vnfrs:
log.debug("Processing vnfr: %s", vnfr)
vnfd = self.db_client.get_vnfd(vnfr["vnfd-id"])
log = logging.getLogger(__name__)
-ALLOWED_KAFKA_KEYS = ["instantiated", "scaled", "terminated", "notify_alarm", "vnf_terminated"]
+ALLOWED_KAFKA_KEYS = ["instantiated", "scaled", "terminated", "notify_alarm", "policy_updated", "vnf_terminated"]
class PolicyModuleAgent:
if key == "notify_alarm":
await self._handle_alarm_notification(msg)
+ if key == "policy_updated":
+ await self._handle_policy_update(msg)
+
if key == "vnf_terminated":
await self._handle_vnf_terminated(msg)
else:
content["operationState"],
)
+ async def _handle_policy_update(self, content):
+ log.info("_handle_policy_update: %s", content)
+ nsr_id = content['nsr_id']
+ vnf_member_index = content['vnf_member_index']
+ if (
+ content["operationState"] == "COMPLETED"
+ or content["operationState"] == "PARTIALLY_COMPLETED"
+ ):
+ log.info(
+ "Updating policies of VNF with nsr_id: %s and vnf-member-index: %s"
+ % (nsr_id, vnf_member_index))
+ await self.autoscaling_service.delete_scaling_groups(nsr_id, vnf_member_index)
+ await self.alarming_service.delete_vnf_alarms(nsr_id, vnf_member_index)
+ await self.autoscaling_service.configure_scaling_groups(nsr_id, vnf_member_index)
+ await self.alarming_service.configure_vnf_alarms(nsr_id, vnf_member_index)
+ else:
+ log.info(
+ "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+ "Current state is %s. Skipping...",
+ content['operationState'])
+
async def _handle_vnf_terminated(self, content):
nsr_id = content['nsr_id']
vnf_member_index = content['vnf_member_index']
self.loop.run_until_complete(agent._handle_instantiated(content))
configure_scaling_groups.assert_not_called()
+ @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None)
+ @mock.patch("osm_policy_module.alarming.service.MonClient")
+ @mock.patch("osm_policy_module.alarming.service.LcmClient")
+ @mock.patch("osm_policy_module.autoscaling.service.MonClient")
+ @mock.patch("osm_policy_module.autoscaling.service.LcmClient")
+ @mock.patch.object(AutoscalingService, "configure_scaling_groups")
+ @mock.patch.object(AlarmingService, "configure_vnf_alarms")
+ @mock.patch.object(AutoscalingService, "delete_scaling_groups")
+ @mock.patch.object(AlarmingService, "delete_vnf_alarms")
+ def test_handle_policy_update(
+ self,
+ delete_vnf_alarms,
+ delete_scaling_groups,
+ configure_vnf_alarms,
+ configure_scaling_groups,
+ autoscaling_lcm_client,
+ autoscaling_mon_client,
+ alarming_lcm_client,
+ alarming_mon_client,
+ ):
+ async def mock_delete_scaling_groups(nsr_id, vnf_member_index):
+ pass
+
+ async def mock_delete_vnf_alarms(nsr_id, vnf_member_index):
+ pass
+
+ async def mock_configure_scaling_groups(nsr_id, vnf_member_index):
+ pass
+
+ async def mock_configure_vnf_alarms(nsr_id, vnf_member_index):
+ pass
+
+ config = Config()
+ agent = PolicyModuleAgent(config, self.loop)
+ assert autoscaling_lcm_client.called
+ assert autoscaling_mon_client.called
+ assert alarming_lcm_client.called
+ assert alarming_mon_client.called
+ content = {
+ "nsr_id": "test_nsr_id",
+ "vnf_member_index": "1",
+ "operationState": "COMPLETED"
+ }
+ failed_content = {
+ "nsr_id": "test_nsr_id",
+ "vnf_member_index": "1",
+ "operationState": "FAILED"
+ }
+ configure_scaling_groups.side_effect = mock_configure_scaling_groups
+ configure_vnf_alarms.side_effect = mock_configure_vnf_alarms
+ delete_scaling_groups.side_effect = mock_delete_scaling_groups
+ delete_vnf_alarms.side_effect = mock_delete_vnf_alarms
+
+ self.loop.run_until_complete(agent._handle_policy_update(content))
+ configure_scaling_groups.assert_called_with("test_nsr_id", "1")
+ configure_scaling_groups.reset_mock()
+
+ self.loop.run_until_complete(agent._handle_policy_update(failed_content))
+ configure_scaling_groups.assert_not_called()
+
@mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None)
@mock.patch("osm_policy_module.autoscaling.service.MonClient")
@mock.patch("osm_policy_module.autoscaling.service.LcmClient")