1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
26 from random
import SystemRandom
27 from json
import JSONDecodeError
30 from aiokafka
import AIOKafkaProducer
, AIOKafkaConsumer
32 from osm_policy_module
.core
.config
import Config
34 log
= logging
.getLogger(__name__
)
38 def __init__(self
, config
: Config
):
39 self
.kafka_server
= "{}:{}".format(
40 config
.get("message", "host"), config
.get("message", "port")
43 async def create_alarm(
48 vnf_member_index
: str,
51 statistic
: str = "AVERAGE",
56 cor_id
= SystemRandom().randint(1, 10e7
)
57 msg
= self
._build
_create
_alarm
_payload
(
70 log
.debug("Sending create_alarm_request %s", msg
)
71 producer
= AIOKafkaProducer(
72 bootstrap_servers
=self
.kafka_server
,
73 key_serializer
=str.encode
,
74 value_serializer
=str.encode
,
76 await producer
.start()
78 await producer
.send_and_wait(
79 "alarm_request", key
="create_alarm_request", value
=json
.dumps(msg
)
83 log
.debug("Waiting for create_alarm_response...")
84 consumer
= AIOKafkaConsumer(
85 "alarm_response_" + str(cor_id
),
86 bootstrap_servers
=self
.kafka_server
,
87 key_deserializer
=bytes
.decode
,
88 value_deserializer
=bytes
.decode
,
89 auto_offset_reset
="earliest",
91 await consumer
.start()
94 async for message
in consumer
:
96 content
= json
.loads(message
.value
)
97 except JSONDecodeError
:
98 content
= yaml
.safe_load(message
.value
)
99 log
.debug("Received create_alarm_response %s", content
)
100 if content
["alarm_create_response"]["correlation_id"] == cor_id
:
101 if not content
["alarm_create_response"]["status"]:
102 raise ValueError("Error creating alarm in MON")
103 alarm_uuid
= content
["alarm_create_response"]["alarm_uuid"]
106 await consumer
.stop()
108 raise ValueError("No alarm deletion response from MON. Is MON up?")
111 async def delete_alarm(
112 self
, ns_id
: str, vnf_member_index
: str, vdu_name
: str, alarm_uuid
: str
114 cor_id
= SystemRandom().randint(1, 10e7
)
115 msg
= self
._build
_delete
_alarm
_payload
(
116 cor_id
, ns_id
, vdu_name
, vnf_member_index
, alarm_uuid
118 log
.debug("Sending delete_alarm_request %s", msg
)
119 producer
= AIOKafkaProducer(
120 bootstrap_servers
=self
.kafka_server
,
121 key_serializer
=str.encode
,
122 value_serializer
=str.encode
,
124 await producer
.start()
126 await producer
.send_and_wait(
127 "alarm_request", key
="delete_alarm_request", value
=json
.dumps(msg
)
130 await producer
.stop()
131 log
.debug("Waiting for delete_alarm_response...")
132 consumer
= AIOKafkaConsumer(
133 "alarm_response_" + str(cor_id
),
134 bootstrap_servers
=self
.kafka_server
,
135 key_deserializer
=bytes
.decode
,
136 value_deserializer
=bytes
.decode
,
137 auto_offset_reset
="earliest",
139 await consumer
.start()
142 async for message
in consumer
:
144 content
= json
.loads(message
.value
)
145 except JSONDecodeError
:
146 content
= yaml
.safe_load(message
.value
)
147 if content
["alarm_delete_response"]["correlation_id"] == cor_id
:
148 log
.debug("Received delete_alarm_response %s", content
)
149 if not content
["alarm_delete_response"]["status"]:
151 "Error deleting alarm in MON. Response status is False."
153 alarm_uuid
= content
["alarm_delete_response"]["alarm_uuid"]
156 await consumer
.stop()
158 raise ValueError("No alarm deletion response from MON. Is MON up?")
161 def _build_create_alarm_payload(
167 vnf_member_index
: str,
177 "vdu_name": vdu_name
,
178 "vnf_member_index": vnf_member_index
,
181 # TODO: Change for multiple DF support
182 df
= vnfd
.get("df", [{}])[0]
184 if "exporters-endpoints" in df
:
185 metric_port
= df
["exporters-endpoints"].get("metric-port", 9100)
186 if metric_name
.startswith("kpi_"):
187 metric_name
= metric_name
.replace("kpi_", "")
189 for vdu
in vnfr
["vdur"]:
190 if vdu
["name"] == vdu_name
:
191 vdu_ip
= vdu
["ip-address"]
192 tags
= {"instance": vdu_ip
+ ":" + str(metric_port
)}
193 alarm_create_request
= {
194 "correlation_id": cor_id
,
195 "alarm_name": "osm_alarm_{}_{}_{}_{}".format(
196 ns_id
, vnf_member_index
, vdu_name
, metric_name
198 "metric_name": metric_name
,
199 "operation": operation
,
200 "severity": "critical",
201 "threshold_value": threshold
,
202 "statistic": statistic
,
207 "alarm_create_request": alarm_create_request
,
211 def _build_delete_alarm_payload(
216 vnf_member_index
: str,
219 alarm_delete_request
= {
220 "correlation_id": cor_id
,
221 "alarm_uuid": alarm_uuid
,
224 "vdu_name": vdu_name
,
225 "vnf_member_index": vnf_member_index
,
229 "alarm_delete_request": alarm_delete_request
,