Updates for Python 3.10 and Ubuntu 22.04
[osm/POL.git] / osm_policy_module / common / mon_client.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import json
25 import logging
26 import random
27 from json import JSONDecodeError
28
29 import yaml
30 from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
31
32 from osm_policy_module.core.config import Config
33
34 log = logging.getLogger(__name__)
35
36
37 class MonClient:
38 def __init__(self, config: Config):
39 self.kafka_server = "{}:{}".format(
40 config.get("message", "host"), config.get("message", "port")
41 )
42
43 async def create_alarm(
44 self,
45 metric_name: str,
46 ns_id: str,
47 vdu_name: str,
48 vnf_member_index: str,
49 threshold: int,
50 operation: str,
51 statistic: str = "AVERAGE",
52 action: str = "",
53 ):
54 cor_id = random.randint(1, 10e7)
55 msg = self._build_create_alarm_payload(
56 cor_id,
57 metric_name,
58 ns_id,
59 vdu_name,
60 vnf_member_index,
61 threshold,
62 statistic,
63 operation,
64 action,
65 )
66 log.debug("Sending create_alarm_request %s", msg)
67 producer = AIOKafkaProducer(
68 bootstrap_servers=self.kafka_server,
69 key_serializer=str.encode,
70 value_serializer=str.encode,
71 )
72 await producer.start()
73 try:
74 await producer.send_and_wait(
75 "alarm_request", key="create_alarm_request", value=json.dumps(msg)
76 )
77 finally:
78 await producer.stop()
79 log.debug("Waiting for create_alarm_response...")
80 consumer = AIOKafkaConsumer(
81 "alarm_response_" + str(cor_id),
82 bootstrap_servers=self.kafka_server,
83 key_deserializer=bytes.decode,
84 value_deserializer=bytes.decode,
85 auto_offset_reset="earliest",
86 )
87 await consumer.start()
88 alarm_uuid = None
89 try:
90 async for message in consumer:
91 try:
92 content = json.loads(message.value)
93 except JSONDecodeError:
94 content = yaml.safe_load(message.value)
95 log.debug("Received create_alarm_response %s", content)
96 if content["alarm_create_response"]["correlation_id"] == cor_id:
97 if not content["alarm_create_response"]["status"]:
98 raise ValueError("Error creating alarm in MON")
99 alarm_uuid = content["alarm_create_response"]["alarm_uuid"]
100 break
101 finally:
102 await consumer.stop()
103 if not alarm_uuid:
104 raise ValueError("No alarm deletion response from MON. Is MON up?")
105 return alarm_uuid
106
107 async def delete_alarm(
108 self, ns_id: str, vnf_member_index: str, vdu_name: str, alarm_uuid: str
109 ):
110 cor_id = random.randint(1, 10e7)
111 msg = self._build_delete_alarm_payload(
112 cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid
113 )
114 log.debug("Sending delete_alarm_request %s", msg)
115 producer = AIOKafkaProducer(
116 bootstrap_servers=self.kafka_server,
117 key_serializer=str.encode,
118 value_serializer=str.encode,
119 )
120 await producer.start()
121 try:
122 await producer.send_and_wait(
123 "alarm_request", key="delete_alarm_request", value=json.dumps(msg)
124 )
125 finally:
126 await producer.stop()
127 log.debug("Waiting for delete_alarm_response...")
128 consumer = AIOKafkaConsumer(
129 "alarm_response_" + str(cor_id),
130 bootstrap_servers=self.kafka_server,
131 key_deserializer=bytes.decode,
132 value_deserializer=bytes.decode,
133 auto_offset_reset="earliest",
134 )
135 await consumer.start()
136 alarm_uuid = None
137 try:
138 async for message in consumer:
139 try:
140 content = json.loads(message.value)
141 except JSONDecodeError:
142 content = yaml.safe_load(message.value)
143 if content["alarm_delete_response"]["correlation_id"] == cor_id:
144 log.debug("Received delete_alarm_response %s", content)
145 if not content["alarm_delete_response"]["status"]:
146 raise ValueError(
147 "Error deleting alarm in MON. Response status is False."
148 )
149 alarm_uuid = content["alarm_delete_response"]["alarm_uuid"]
150 break
151 finally:
152 await consumer.stop()
153 if not alarm_uuid:
154 raise ValueError("No alarm deletion response from MON. Is MON up?")
155 return alarm_uuid
156
157 def _build_create_alarm_payload(
158 self,
159 cor_id: int,
160 metric_name: str,
161 ns_id: str,
162 vdu_name: str,
163 vnf_member_index: str,
164 threshold: int,
165 statistic: str,
166 operation: str,
167 action: str,
168 ):
169 alarm_create_request = {
170 "correlation_id": cor_id,
171 "alarm_name": "osm_alarm_{}_{}_{}_{}".format(
172 ns_id, vnf_member_index, vdu_name, metric_name
173 ),
174 "metric_name": metric_name,
175 "operation": operation,
176 "severity": "critical",
177 "threshold_value": threshold,
178 "statistic": statistic,
179 "action": action,
180 "tags": {
181 "ns_id": ns_id,
182 "vdu_name": vdu_name,
183 "vnf_member_index": vnf_member_index,
184 },
185 }
186 msg = {
187 "alarm_create_request": alarm_create_request,
188 }
189 return msg
190
191 def _build_delete_alarm_payload(
192 self,
193 cor_id: int,
194 ns_id: str,
195 vdu_name: str,
196 vnf_member_index: str,
197 alarm_uuid: str,
198 ):
199 alarm_delete_request = {
200 "correlation_id": cor_id,
201 "alarm_uuid": alarm_uuid,
202 "tags": {
203 "ns_id": ns_id,
204 "vdu_name": vdu_name,
205 "vnf_member_index": vnf_member_index,
206 },
207 }
208 msg = {
209 "alarm_delete_request": alarm_delete_request,
210 }
211 return msg