- def _get_vnfr(self, nsr_id: str, member_index: int):
- vnfr = self.common_db.get_one(table="vnfrs",
- filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
- return vnfr
-
- def _get_vnfrs(self, nsr_id: str):
- return [self._get_vnfr(nsr_id, member['member-vnf-index']) for member in
- self._get_nsr(nsr_id)['nsd']['constituent-vnfd']]
-
- def _get_vnfd(self, vnfd_id: str):
- vnfr = self.common_db.get_one(table="vnfds",
- filter={"_id": vnfd_id})
- return vnfr
-
- def _get_nsr(self, nsr_id: str):
- nsr = self.common_db.get_one(table="nsrs",
- filter={"id": nsr_id})
- return nsr
-
- def _configure_scaling_groups(self, nsr_id: str):
- # TODO(diazb): Check for alarm creation on exception and clean resources if needed.
- with database.db.atomic():
- vnfrs = self._get_vnfrs(nsr_id)
- log.info("Checking %s vnfrs...", len(vnfrs))
- for vnfr in vnfrs:
- vnfd = self._get_vnfd(vnfr['vnfd-id'])
- log.info("Looking for vnfd %s", vnfr['vnfd-id'])
- scaling_groups = vnfd['scaling-group-descriptor']
- vnf_monitoring_params = vnfd['monitoring-param']
- for scaling_group in scaling_groups:
- log.info("Creating scaling record in DB...")
- scaling_record = ScalingRecord.create(
- nsr_id=nsr_id,
- name=scaling_group['name'],
- content=json.dumps(scaling_group)
- )
- log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
- scaling_record.nsr_id,
- scaling_record.name,
- scaling_record.content)
- for scaling_policy in scaling_group['scaling-policy']:
- for vdur in vnfd['vdu']:
- vdu_monitoring_params = vdur['monitoring-param']
+ async def _handle_alarm_notification(self, content):
+ log.debug("_handle_alarm_notification: %s", content)
+ alarm_uuid = content['notify_details']['alarm_uuid']
+ metric_name = content['notify_details']['metric_name']
+ operation = content['notify_details']['operation']
+ threshold = content['notify_details']['threshold_value']
+ vdu_name = content['notify_details']['vdu_name']
+ vnf_member_index = content['notify_details']['vnf_member_index']
+ nsr_id = content['notify_details']['ns_id']
+ log.info(
+ "Received alarm notification for alarm %s, \
+ metric %s, \
+ operation %s, \
+ threshold %s, \
+ vdu_name %s, \
+ vnf_member_index %s, \
+ ns_id %s ",
+ alarm_uuid, metric_name, operation, threshold, vdu_name, vnf_member_index, nsr_id)
+ try:
+ alarm = self.database_manager.get_alarm(alarm_uuid)
+ delta = datetime.datetime.now() - alarm.scaling_criteria.scaling_policy.last_scale
+ log.debug("last_scale: %s", alarm.scaling_criteria.scaling_policy.last_scale)
+ log.debug("now: %s", datetime.datetime.now())
+ log.debug("delta: %s", delta)
+ if delta.total_seconds() < alarm.scaling_criteria.scaling_policy.cooldown_time:
+ log.info("Time between last scale and now is less than cooldown time. Skipping.")
+ return
+ log.info("Sending scaling action message for ns: %s", nsr_id)
+ await self.lcm_client.scale(nsr_id,
+ alarm.scaling_criteria.scaling_policy.scaling_group.name,
+ alarm.vnf_member_index,
+ alarm.action)
+ alarm.scaling_criteria.scaling_policy.last_scale = datetime.datetime.now()
+ alarm.scaling_criteria.scaling_policy.save()
+ except ScalingAlarm.DoesNotExist:
+ log.info("There is no action configured for alarm %s.", alarm_uuid)
+
+ async def _handle_instantiated(self, content):
+ log.debug("_handle_instantiated: %s", content)
+ nslcmop_id = content['nslcmop_id']
+ nslcmop = self.db_client.get_nslcmop(nslcmop_id)
+ if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
+ nsr_id = nslcmop['nsInstanceId']
+ log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
+ await self._configure_scaling_groups(nsr_id)
+ else:
+ log.info(
+ "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+ "Current state is %s. Skipping...",
+ nslcmop['operationState'])
+
+ async def _handle_scaled(self, content):
+ log.debug("_handle_scaled: %s", content)
+ nslcmop_id = content['nslcmop_id']
+ nslcmop = self.db_client.get_nslcmop(nslcmop_id)
+ if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
+ nsr_id = nslcmop['nsInstanceId']
+ log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
+ await self._configure_scaling_groups(nsr_id)
+ log.info("Checking for orphaned alarms to be deleted for network service with nsr_id: %s", nsr_id)
+ await self._delete_orphaned_alarms(nsr_id)
+ else:
+ log.info(
+ "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+ "Current state is %s. Skipping...",
+ nslcmop['operationState'])
+
+ async def _handle_terminated(self, content):
+ log.debug("_handle_deleted: %s", content)
+ nsr_id = content['nsr_id']
+ if content['operationState'] == 'COMPLETED' or content['operationState'] == 'PARTIALLY_COMPLETED':
+ log.info("Deleting scaling groups and alarms for network service with nsr_id: %s", nsr_id)
+ await self._delete_scaling_groups(nsr_id)
+ else:
+ log.info(
+ "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+ "Current state is %s. Skipping...",
+ content['operationState'])
+
+ async def _configure_scaling_groups(self, nsr_id: str):
+ log.debug("_configure_scaling_groups: %s", nsr_id)
+ alarms_created = []
+ with database.db.atomic() as tx:
+ try:
+ vnfrs = self.db_client.get_vnfrs(nsr_id)
+ for vnfr in vnfrs:
+ log.info("Processing vnfr: %s", vnfr)
+ vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
+ log.info("Looking for vnfd %s", vnfr['vnfd-id'])
+ if 'scaling-group-descriptor' not in vnfd:
+ continue
+ scaling_groups = vnfd['scaling-group-descriptor']
+ vnf_monitoring_params = vnfd['monitoring-param']
+ for scaling_group in scaling_groups:
+ try:
+ scaling_group_record = ScalingGroup.select().where(
+ ScalingGroup.nsr_id == nsr_id,
+ ScalingGroup.vnf_member_index == int(vnfr['member-vnf-index-ref']),
+ ScalingGroup.name == scaling_group['name']
+ ).get()
+ log.info("Found existing scaling group record in DB...")
+ except ScalingGroup.DoesNotExist:
+ log.info("Creating scaling group record in DB...")
+ scaling_group_record = ScalingGroup.create(
+ nsr_id=nsr_id,
+ vnf_member_index=vnfr['member-vnf-index-ref'],
+ name=scaling_group['name'],
+ content=json.dumps(scaling_group)
+ )
+ log.info(
+ "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
+ scaling_group_record.nsr_id,
+ scaling_group_record.vnf_member_index,
+ scaling_group_record.name)
+ for scaling_policy in scaling_group['scaling-policy']:
+ if scaling_policy['scaling-type'] != 'automatic':
+ continue
+ try:
+ scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where(
+ ScalingPolicy.name == scaling_policy['name'],
+ ScalingGroup.id == scaling_group_record.id
+ ).get()
+ log.info("Found existing scaling policy record in DB...")
+ except ScalingPolicy.DoesNotExist:
+ log.info("Creating scaling policy record in DB...")
+ scaling_policy_record = ScalingPolicy.create(
+ nsr_id=nsr_id,
+ name=scaling_policy['name'],
+ cooldown_time=scaling_policy['cooldown-time'],
+ scaling_group=scaling_group_record
+ )
+ log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
+ scaling_policy_record.name,
+ scaling_policy_record.scaling_group.name)
+