Migrates POL code from MON repo
[osm/POL.git] / osm_policy_module / core / agent.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import json
25 import logging
26 import threading
27 from json import JSONDecodeError
28
29 import yaml
30 from kafka import KafkaConsumer
31 from osm_common import dbmongo
32
33 from osm_policy_module.common.lcm_client import LcmClient
34 from osm_policy_module.common.mon_client import MonClient
35 from osm_policy_module.core import database
36 from osm_policy_module.core.config import Config
37 from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
38
39 log = logging.getLogger(__name__)
40
41
42 class PolicyModuleAgent:
43 def __init__(self):
44 cfg = Config.instance()
45 self.common_db = dbmongo.DbMongo()
46 self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
47 'port': int(cfg.OSMPOL_DATABASE_PORT),
48 'name': 'osm'})
49 self.mon_client = MonClient()
50 self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
51 cfg.OSMPOL_MESSAGE_PORT)
52
53 def run(self):
54 cfg = Config.instance()
55 cfg.read_environ()
56
57 consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
58 key_deserializer=bytes.decode,
59 value_deserializer=bytes.decode,
60 consumer_timeout_ms=10000)
61 consumer.subscribe(["ns", "alarm_response"])
62
63 for message in consumer:
64 t = threading.Thread(target=self._process_msg, args=(message.topic, message.key, message.value,))
65 t.start()
66
67 def _process_msg(self, topic, key, msg):
68 try:
69 # Check for ns instantiation
70 if key == 'instantiated':
71 try:
72 content = json.loads(msg)
73 except JSONDecodeError:
74 content = yaml.safe_load(msg)
75 log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content)
76 nslcmop_id = content['nslcmop_id']
77 nslcmop = self.common_db.get_one(table="nslcmops",
78 filter={"_id": nslcmop_id})
79 if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
80 nsr_id = nslcmop['nsInstanceId']
81 log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
82 self._configure_scaling_groups(nsr_id)
83 else:
84 log.info(
85 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
86 "Current state is %s. Skipping...",
87 nslcmop['operationState'])
88
89 if key == 'notify_alarm':
90 try:
91 content = json.loads(msg)
92 except JSONDecodeError:
93 content = yaml.safe_load(msg)
94 log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content)
95 alarm_id = content['notify_details']['alarm_uuid']
96 metric_name = content['notify_details']['metric_name']
97 operation = content['notify_details']['operation']
98 threshold = content['notify_details']['threshold_value']
99 vdu_name = content['notify_details']['vdu_name']
100 vnf_member_index = content['notify_details']['vnf_member_index']
101 ns_id = content['notify_details']['ns_id']
102 log.info(
103 "Received alarm notification for alarm %s, \
104 metric %s, \
105 operation %s, \
106 threshold %s, \
107 vdu_name %s, \
108 vnf_member_index %s, \
109 ns_id %s ",
110 alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id)
111 try:
112 alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
113 lcm_client = LcmClient()
114 log.info("Sending scaling action message for ns: %s", alarm_id)
115 lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.vnf_member_index,
116 alarm.action)
117 except ScalingAlarm.DoesNotExist:
118 log.info("There is no action configured for alarm %s.", alarm_id)
119 except Exception:
120 log.exception("Error consuming message: ")
121
122 def _get_vnfr(self, nsr_id: str, member_index: int):
123 vnfr = self.common_db.get_one(table="vnfrs",
124 filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
125 return vnfr
126
127 def _get_vnfrs(self, nsr_id: str):
128 return [self._get_vnfr(nsr_id, member['member-vnf-index']) for member in
129 self._get_nsr(nsr_id)['nsd']['constituent-vnfd']]
130
131 def _get_vnfd(self, vnfd_id: str):
132 vnfr = self.common_db.get_one(table="vnfds",
133 filter={"_id": vnfd_id})
134 return vnfr
135
136 def _get_nsr(self, nsr_id: str):
137 nsr = self.common_db.get_one(table="nsrs",
138 filter={"id": nsr_id})
139 return nsr
140
141 def _configure_scaling_groups(self, nsr_id: str):
142 # TODO(diazb): Check for alarm creation on exception and clean resources if needed.
143 with database.db.atomic():
144 vnfrs = self._get_vnfrs(nsr_id)
145 log.info("Checking %s vnfrs...", len(vnfrs))
146 for vnfr in vnfrs:
147 vnfd = self._get_vnfd(vnfr['vnfd-id'])
148 log.info("Looking for vnfd %s", vnfr['vnfd-id'])
149 scaling_groups = vnfd['scaling-group-descriptor']
150 vnf_monitoring_params = vnfd['monitoring-param']
151 for scaling_group in scaling_groups:
152 log.info("Creating scaling record in DB...")
153 scaling_record = ScalingRecord.create(
154 nsr_id=nsr_id,
155 name=scaling_group['name'],
156 content=json.dumps(scaling_group)
157 )
158 log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
159 scaling_record.nsr_id,
160 scaling_record.name,
161 scaling_record.content)
162 for scaling_policy in scaling_group['scaling-policy']:
163 for vdur in vnfd['vdu']:
164 vdu_monitoring_params = vdur['monitoring-param']
165 for scaling_criteria in scaling_policy['scaling-criteria']:
166 vnf_monitoring_param = next(
167 filter(lambda param: param['id'] == scaling_criteria['vnf-monitoring-param-ref'],
168 vnf_monitoring_params))
169 # TODO: Add support for non-nfvi metrics
170 vdu_monitoring_param = next(
171 filter(
172 lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param-ref'],
173 vdu_monitoring_params))
174 alarm_uuid = self.mon_client.create_alarm(
175 metric_name=vdu_monitoring_param['nfvi-metric'],
176 ns_id=nsr_id,
177 vdu_name=vdur['name'],
178 vnf_member_index=vnfr['member-vnf-index-ref'],
179 threshold=scaling_criteria['scale-in-threshold'],
180 operation=scaling_criteria['scale-in-relational-operation'],
181 statistic=vnf_monitoring_param['aggregation-type']
182 )
183 ScalingAlarm.create(
184 alarm_id=alarm_uuid,
185 action='scale_in',
186 vnf_member_index=int(vnfr['member-vnf-index-ref']),
187 vdu_name=vdur['name'],
188 scaling_record=scaling_record
189 )
190 alarm_uuid = self.mon_client.create_alarm(
191 metric_name=vdu_monitoring_param['nfvi-metric'],
192 ns_id=nsr_id,
193 vdu_name=vdur['name'],
194 vnf_member_index=vnfr['member-vnf-index-ref'],
195 threshold=scaling_criteria['scale-out-threshold'],
196 operation=scaling_criteria['scale-out-relational-operation'],
197 statistic=vnf_monitoring_param['aggregation-type']
198 )
199 ScalingAlarm.create(
200 alarm_id=alarm_uuid,
201 action='scale_out',
202 vnf_member_index=int(vnfr['member-vnf-index-ref']),
203 vdu_name=vdur['name'],
204 scaling_record=scaling_record
205 )