import asyncio
import json
import logging
+import operator
+import functools
import requests
from requests.exceptions import ConnectionError, RequestException
finally:
database.db.close()
- async def delete_vnf_alarms(self, nsr_id):
+ async def delete_vnf_alarms(self, nsr_id, vnf_member_index=None):
log.info("Deleting vnf alarms for network service %s", nsr_id)
database.db.connect()
try:
with database.db.atomic():
- for alarm in VnfAlarmRepository.list(VnfAlarm.nsr_id == nsr_id):
+ if vnf_member_index is None:
+ alarm_conditions = VnfAlarm.nsr_id == nsr_id
+ else:
+ query_list = [VnfAlarm.nsr_id == nsr_id,
+ VnfAlarm.vnf_member_index == vnf_member_index]
+ alarm_conditions = functools.reduce(operator.and_, query_list)
+ for alarm in VnfAlarmRepository.list(alarm_conditions):
log.debug("Deleting vnf alarm %s", alarm.alarm_uuid)
try:
await self.mon_client.delete_alarm(
import datetime
import json
import logging
+import operator
+import functools
from osm_policy_module.common.common_db_client import CommonDbClient
from osm_policy_module.common.lcm_client import LcmClient
finally:
database.db.close()
- async def delete_scaling_groups(self, nsr_id: str):
+ async def delete_scaling_groups(self, nsr_id: str, vnf_member_index=None):
log.debug("Deleting scaling groups for network service %s", nsr_id)
database.db.connect()
try:
with database.db.atomic() as tx:
try:
- for scaling_group in ScalingGroupRepository.list(
- ScalingGroup.nsr_id == nsr_id
- ):
+ if vnf_member_index is None:
+ scale_conditions = ScalingGroup.nsr_id == nsr_id
+ else:
+ query_list = [ScalingGroup.nsr_id == nsr_id,
+ ScalingGroup.vnf_member_index == vnf_member_index]
+ scale_conditions = functools.reduce(operator.and_, query_list)
+ for scaling_group in ScalingGroupRepository.list(scale_conditions):
for scaling_policy in scaling_group.scaling_policies:
for scaling_criteria in scaling_policy.scaling_criterias:
for alarm in scaling_criteria.scaling_alarms:
log = logging.getLogger(__name__)
-ALLOWED_KAFKA_KEYS = ["instantiated", "scaled", "terminated", "notify_alarm"]
+ALLOWED_KAFKA_KEYS = ["instantiated", "scaled", "terminated", "notify_alarm", "vnf_terminated"]
class PolicyModuleAgent:
if key == "notify_alarm":
await self._handle_alarm_notification(msg)
+
+ if key == "vnf_terminated":
+ await self._handle_vnf_terminated(msg)
else:
log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
except peewee.PeeweeException:
"Current state is %s. Skipping...",
content["operationState"],
)
+
+ async def _handle_vnf_terminated(self, content):
+ nsr_id = content['nsr_id']
+ vnf_member_index = content['vnf_member_index']
+ if (
+ content["operationState"] == "COMPLETED"
+ or content["operationState"] == "PARTIALLY_COMPLETED"
+ ):
+ log.info(
+ "Deleting policies of VNF with nsr_id: %s and vnf-member-index: %s"
+ % (nsr_id, vnf_member_index))
+ await self.autoscaling_service.delete_scaling_groups(nsr_id, vnf_member_index)
+ await self.alarming_service.delete_vnf_alarms(nsr_id, vnf_member_index)
+ else:
+ log.info(
+ "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+ "Current state is %s. Skipping...",
+ content['operationState'])
autoscaling_handle_alarm.assert_called_with("test_alarm_uuid", "alarm")
alarming_handle_alarm.assert_called_with("test_alarm_uuid", "alarm", content)
+ @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None)
+ @mock.patch("osm_policy_module.alarming.service.MonClient")
+ @mock.patch("osm_policy_module.alarming.service.LcmClient")
+ @mock.patch("osm_policy_module.autoscaling.service.MonClient")
+ @mock.patch("osm_policy_module.autoscaling.service.LcmClient")
+ @mock.patch.object(AutoscalingService, "delete_scaling_groups")
+ @mock.patch.object(AlarmingService, "delete_vnf_alarms")
+ def test_handle_vnf_terminated(
+ self,
+ delete_vnf_alarms,
+ delete_scaling_groups,
+ autoscaling_lcm_client,
+ autoscaling_mon_client,
+ alarming_lcm_client,
+ alarming_mon_client,
+ ):
+ async def mock_delete_scaling_groups(nsr_id, vnf_member_index):
+ pass
+
+ async def mock_delete_vnf_alarms(nsr_id, vnf_member_index):
+ pass
+
+ config = Config()
+ agent = PolicyModuleAgent(config, self.loop)
+ assert autoscaling_lcm_client.called
+ assert autoscaling_mon_client.called
+ assert alarming_lcm_client.called
+ assert alarming_mon_client.called
+ content = {
+ "nsr_id": "test_nsr_id",
+ "vnf_member_index": "1",
+ "operationState": "COMPLETED"
+ }
+ failed_content = {
+ "nsr_id": "test_nsr_id",
+ "vnf_member_index": "1",
+ "operationState": "FAILED"
+ }
+ delete_scaling_groups.side_effect = mock_delete_scaling_groups
+ delete_vnf_alarms.side_effect = mock_delete_vnf_alarms
+
+ self.loop.run_until_complete(agent._handle_vnf_terminated(content))
+ delete_scaling_groups.assert_called_with("test_nsr_id", "1")
+ delete_scaling_groups.reset_mock()
+
+ self.loop.run_until_complete(agent._handle_vnf_terminated(failed_content))
+ delete_scaling_groups.assert_not_called()
+
if __name__ == "__main__":
unittest.main()