def generate_response(self, key, **kwargs) -> dict:
"""Make call to appropriate response function."""
- if key == "list_alarm_response":
- message = self.alarm_list_response(**kwargs)
- elif key == "create_alarm_response":
+ if key == "create_alarm_response":
message = self.create_alarm_response(**kwargs)
elif key == "delete_alarm_response":
message = self.delete_alarm_response(**kwargs)
- elif key == "update_alarm_response":
- message = self.update_alarm_response(**kwargs)
- elif key == "create_metric_response":
- message = self.metric_create_response(**kwargs)
- elif key == "read_metric_data_response":
- message = self.read_metric_data_response(**kwargs)
- elif key == "delete_metric_response":
- message = self.delete_metric_response(**kwargs)
- elif key == "update_metric_response":
- message = self.update_metric_response(**kwargs)
- elif key == "list_metric_response":
- message = self.list_metric_response(**kwargs)
elif key == "notify_alarm":
message = self.notify_alarm(**kwargs)
else:
return message
- def alarm_list_response(self, **kwargs) -> dict:
- """Generate the response for an alarm list request."""
- alarm_list_resp = {"schema_version": schema_version,
- "schema_type": "list_alarm_response",
- "correlation_id": kwargs['cor_id'],
- "list_alarm_response": kwargs['alarm_list']}
- return alarm_list_resp
-
def create_alarm_response(self, **kwargs) -> dict:
"""Generate a response for a create alarm request."""
create_alarm_resp = {"schema_version": schema_version,
"status": kwargs['status']}}
return delete_alarm_resp
- def update_alarm_response(self, **kwargs) -> dict:
- """Generate a response for an update alarm request."""
- update_alarm_resp = {"schema_version": schema_version,
- "schema_type": "update_alarm_response",
- "alarm_update_response": {
- "correlation_id": kwargs['cor_id'],
- "alarm_uuid": kwargs['alarm_id'],
- "status": kwargs['status']}}
- return update_alarm_resp
-
- def metric_create_response(self, **kwargs) -> dict:
- """Generate a response for a create metric request."""
- create_metric_resp = {"schema_version": schema_version,
- "schema_type": "create_metric_response",
- "correlation_id": kwargs['cor_id'],
- "metric_create_response": {
- "metric_uuid": kwargs['metric_id'],
- "resource_uuid": kwargs['resource_id'],
- "status": kwargs['status']}}
- return create_metric_resp
-
- def read_metric_data_response(self, **kwargs) -> dict:
- """Generate a response for a read metric data request."""
- read_metric_data_resp = {"schema_version": schema_version,
- "schema_type": "read_metric_data_response",
- "metric_name": kwargs['metric_name'],
- "metric_uuid": kwargs['metric_id'],
- "resource_uuid": kwargs['resource_id'],
- "correlation_id": kwargs['cor_id'],
- "status": kwargs['status'],
- "metrics_data": {
- "time_series": kwargs['times'],
- "metrics_series": kwargs['metrics']}}
- return read_metric_data_resp
-
- def delete_metric_response(self, **kwargs) -> dict:
- """Generate a response for a delete metric request."""
- delete_metric_resp = {"schema_version": schema_version,
- "schema_type": "delete_metric_response",
- "metric_name": kwargs['metric_name'],
- "metric_uuid": kwargs['metric_id'],
- "resource_uuid": kwargs['resource_id'],
- "correlation_id": kwargs['cor_id'],
- "status": kwargs['status']}
- return delete_metric_resp
-
- def update_metric_response(self, **kwargs) -> dict:
- """Generate a repsonse for an update metric request."""
- update_metric_resp = {"schema_version": schema_version,
- "schema_type": "update_metric_response",
- "correlation_id": kwargs['cor_id'],
- "metric_update_response": {
- "metric_uuid": kwargs['metric_id'],
- "status": kwargs['status'],
- "resource_uuid": kwargs['resource_id']}}
- return update_metric_resp
-
- def list_metric_response(self, **kwargs) -> dict:
- """Generate a response for a list metric request."""
- list_metric_resp = {"schema_version": schema_version,
- "schema_type": "list_metric_response",
- "correlation_id": kwargs['cor_id'],
- "status": kwargs['status'],
- "metrics_list": kwargs['metric_list']}
- return list_metric_resp
-
def notify_alarm(self, **kwargs) -> dict:
"""Generate a response to send alarm notifications."""
notify_alarm_resp = {"schema_version": schema_version,
"schema_type": "notify_alarm",
"notify_details": {
"alarm_uuid": kwargs['alarm_id'],
- "vdu_name": kwargs['vdu_name'],
- "vnf_member_index": kwargs['vnf_member_index'],
- "ns_id": kwargs['ns_id'],
"metric_name": kwargs['metric_name'],
"threshold_value": kwargs['threshold_value'],
"operation": kwargs['operation'],
"severity": kwargs['sev'],
"status": kwargs['status'],
- "start_date": kwargs['date']}}
+ "start_date": kwargs['date'],
+ "tags": kwargs['tags']}}
return notify_alarm_resp