1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
28 from json
import JSONDecodeError
31 from aiokafka
import AIOKafkaProducer
, AIOKafkaConsumer
33 from osm_policy_module
.core
.config
import Config
35 log
= logging
.getLogger(__name__
)
39 def __init__(self
, loop
=None):
40 cfg
= Config
.instance()
41 self
.kafka_server
= '{}:{}'.format(cfg
.OSMPOL_MESSAGE_HOST
,
42 cfg
.OSMPOL_MESSAGE_PORT
)
44 loop
= asyncio
.get_event_loop()
47 async def create_alarm(self
, metric_name
: str, ns_id
: str, vdu_name
: str, vnf_member_index
: int, threshold
: int,
48 statistic
: str, operation
: str):
49 cor_id
= random
.randint(1, 10e7
)
50 msg
= self
._build
_create
_alarm
_payload
(cor_id
, metric_name
, ns_id
, vdu_name
, vnf_member_index
, threshold
,
53 log
.info("Sending create_alarm_request %s", msg
)
54 producer
= AIOKafkaProducer(loop
=self
.loop
,
55 bootstrap_servers
=self
.kafka_server
,
56 key_serializer
=str.encode
,
57 value_serializer
=str.encode
)
58 await producer
.start()
60 await producer
.send_and_wait("alarm_request", key
="create_alarm_request", value
=json
.dumps(msg
))
63 log
.info("Waiting for create_alarm_response...")
64 consumer
= AIOKafkaConsumer(
65 "alarm_response_" + str(cor_id
),
67 bootstrap_servers
=self
.kafka_server
,
68 key_deserializer
=bytes
.decode
,
69 value_deserializer
=bytes
.decode
,
70 auto_offset_reset
='earliest')
71 await consumer
.start()
74 async for message
in consumer
:
76 content
= json
.loads(message
.value
)
77 except JSONDecodeError
:
78 content
= yaml
.safe_load(message
.value
)
79 log
.info("Received create_alarm_response %s", content
)
80 if content
['alarm_create_response']['correlation_id'] == cor_id
:
81 if not content
['alarm_create_response']['status']:
82 raise ValueError("Error creating alarm in MON")
83 alarm_uuid
= content
['alarm_create_response']['alarm_uuid']
88 raise ValueError('No alarm deletion response from MON. Is MON up?')
91 async def delete_alarm(self
, ns_id
: str, vnf_member_index
: int, vdu_name
: str, alarm_uuid
: str):
92 cor_id
= random
.randint(1, 10e7
)
93 msg
= self
._build
_delete
_alarm
_payload
(cor_id
, ns_id
, vdu_name
, vnf_member_index
, alarm_uuid
)
94 log
.info("Sending delete_alarm_request %s", msg
)
95 producer
= AIOKafkaProducer(loop
=self
.loop
,
96 bootstrap_servers
=self
.kafka_server
,
97 key_serializer
=str.encode
,
98 value_serializer
=str.encode
)
99 await producer
.start()
101 await producer
.send_and_wait("alarm_request", key
="delete_alarm_request", value
=json
.dumps(msg
))
103 await producer
.stop()
104 log
.info("Waiting for delete_alarm_response...")
105 consumer
= AIOKafkaConsumer(
106 "alarm_response_" + str(cor_id
),
108 bootstrap_servers
=self
.kafka_server
,
109 key_deserializer
=bytes
.decode
,
110 value_deserializer
=bytes
.decode
,
111 auto_offset_reset
='earliest')
112 await consumer
.start()
115 async for message
in consumer
:
117 content
= json
.loads(message
.value
)
118 except JSONDecodeError
:
119 content
= yaml
.safe_load(message
.value
)
120 if content
['alarm_delete_response']['correlation_id'] == cor_id
:
121 log
.info("Received delete_alarm_response %s", content
)
122 if not content
['alarm_delete_response']['status']:
123 raise ValueError("Error deleting alarm in MON. Response status is False.")
124 alarm_uuid
= content
['alarm_delete_response']['alarm_uuid']
127 await consumer
.stop()
129 raise ValueError('No alarm deletion response from MON. Is MON up?')
132 def _build_create_alarm_payload(self
, cor_id
: int, metric_name
: str, ns_id
: str, vdu_name
: str,
133 vnf_member_index
: int,
134 threshold
: int, statistic
: str, operation
: str):
135 alarm_create_request
= {
136 'correlation_id': cor_id
,
137 'alarm_name': 'osm_alarm_{}_{}_{}_{}'.format(ns_id
, vnf_member_index
, vdu_name
, metric_name
),
138 'metric_name': metric_name
,
140 'vdu_name': vdu_name
,
141 'vnf_member_index': vnf_member_index
,
142 'operation': operation
,
143 'severity': 'critical',
144 'threshold_value': threshold
,
145 'statistic': statistic
148 'alarm_create_request': alarm_create_request
,
152 def _build_delete_alarm_payload(self
, cor_id
: int, ns_id
: str, vdu_name
: str,
153 vnf_member_index
: int, alarm_uuid
: str):
154 alarm_delete_request
= {
155 'correlation_id': cor_id
,
156 'alarm_uuid': alarm_uuid
,
158 'vdu_name': vdu_name
,
159 'vnf_member_index': vnf_member_index
162 'alarm_delete_request': alarm_delete_request
,