410413f68a47e066d9bce0eb1b49740849e3aca9
[osm/POL.git] / osm_policy_module / core / agent.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import json
25 import logging
26 import threading
27 from json import JSONDecodeError
28
29 import yaml
30 from kafka import KafkaConsumer
31 from osm_common import dbmongo
32
33 from osm_policy_module.common.lcm_client import LcmClient
34 from osm_policy_module.common.mon_client import MonClient
35 from osm_policy_module.core import database
36 from osm_policy_module.core.config import Config
37 from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
38
39 log = logging.getLogger(__name__)
40
41
42 class PolicyModuleAgent:
43 def __init__(self):
44 cfg = Config.instance()
45 self.common_db = dbmongo.DbMongo()
46 self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
47 'port': int(cfg.OSMPOL_DATABASE_PORT),
48 'name': 'osm'})
49 self.mon_client = MonClient()
50 self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
51 cfg.OSMPOL_MESSAGE_PORT)
52
53 def run(self):
54 consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
55 key_deserializer=bytes.decode,
56 value_deserializer=bytes.decode,
57 group_id='pol-consumer')
58 consumer.subscribe(["ns", "alarm_response"])
59
60 for message in consumer:
61 t = threading.Thread(target=self._process_msg, args=(message.topic, message.key, message.value,))
62 t.start()
63
64 def _process_msg(self, topic, key, msg):
65 try:
66 # Check for ns instantiation
67 if key == 'instantiated':
68 try:
69 content = json.loads(msg)
70 except JSONDecodeError:
71 content = yaml.safe_load(msg)
72 log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content)
73 nslcmop_id = content['nslcmop_id']
74 nslcmop = self.common_db.get_one(table="nslcmops",
75 filter={"_id": nslcmop_id})
76 if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
77 nsr_id = nslcmop['nsInstanceId']
78 log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
79 self._configure_scaling_groups(nsr_id)
80 else:
81 log.info(
82 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
83 "Current state is %s. Skipping...",
84 nslcmop['operationState'])
85
86 if key == 'notify_alarm':
87 try:
88 content = json.loads(msg)
89 except JSONDecodeError:
90 content = yaml.safe_load(msg)
91 log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content)
92 alarm_id = content['notify_details']['alarm_uuid']
93 metric_name = content['notify_details']['metric_name']
94 operation = content['notify_details']['operation']
95 threshold = content['notify_details']['threshold_value']
96 vdu_name = content['notify_details']['vdu_name']
97 vnf_member_index = content['notify_details']['vnf_member_index']
98 ns_id = content['notify_details']['ns_id']
99 log.info(
100 "Received alarm notification for alarm %s, \
101 metric %s, \
102 operation %s, \
103 threshold %s, \
104 vdu_name %s, \
105 vnf_member_index %s, \
106 ns_id %s ",
107 alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id)
108 try:
109 alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
110 lcm_client = LcmClient()
111 log.info("Sending scaling action message for ns: %s", alarm_id)
112 lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.vnf_member_index,
113 alarm.action)
114 except ScalingAlarm.DoesNotExist:
115 log.info("There is no action configured for alarm %s.", alarm_id)
116 except Exception:
117 log.exception("Error consuming message: ")
118
119 def _get_vnfr(self, nsr_id: str, member_index: int):
120 vnfr = self.common_db.get_one(table="vnfrs",
121 filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
122 return vnfr
123
124 def _get_vnfrs(self, nsr_id: str):
125 return [self._get_vnfr(nsr_id, member['member-vnf-index']) for member in
126 self._get_nsr(nsr_id)['nsd']['constituent-vnfd']]
127
128 def _get_vnfd(self, vnfd_id: str):
129 vnfr = self.common_db.get_one(table="vnfds",
130 filter={"_id": vnfd_id})
131 return vnfr
132
133 def _get_nsr(self, nsr_id: str):
134 nsr = self.common_db.get_one(table="nsrs",
135 filter={"id": nsr_id})
136 return nsr
137
138 def _configure_scaling_groups(self, nsr_id: str):
139 # TODO(diazb): Check for alarm creation on exception and clean resources if needed.
140 with database.db.atomic():
141 vnfrs = self._get_vnfrs(nsr_id)
142 log.info("Checking %s vnfrs...", len(vnfrs))
143 for vnfr in vnfrs:
144 vnfd = self._get_vnfd(vnfr['vnfd-id'])
145 log.info("Looking for vnfd %s", vnfr['vnfd-id'])
146 scaling_groups = vnfd['scaling-group-descriptor']
147 vnf_monitoring_params = vnfd['monitoring-param']
148 for scaling_group in scaling_groups:
149 log.info("Creating scaling record in DB...")
150 scaling_record = ScalingRecord.create(
151 nsr_id=nsr_id,
152 name=scaling_group['name'],
153 content=json.dumps(scaling_group)
154 )
155 log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
156 scaling_record.nsr_id,
157 scaling_record.name,
158 scaling_record.content)
159 for scaling_policy in scaling_group['scaling-policy']:
160 for vdur in vnfd['vdu']:
161 vdu_monitoring_params = vdur['monitoring-param']
162 for scaling_criteria in scaling_policy['scaling-criteria']:
163 vnf_monitoring_param = next(
164 filter(lambda param: param['id'] == scaling_criteria['vnf-monitoring-param-ref'],
165 vnf_monitoring_params))
166 # TODO: Add support for non-nfvi metrics
167 vdu_monitoring_param = next(
168 filter(
169 lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param-ref'],
170 vdu_monitoring_params))
171 alarm_uuid = self.mon_client.create_alarm(
172 metric_name=vdu_monitoring_param['nfvi-metric'],
173 ns_id=nsr_id,
174 vdu_name=vdur['name'],
175 vnf_member_index=vnfr['member-vnf-index-ref'],
176 threshold=scaling_criteria['scale-in-threshold'],
177 operation=scaling_criteria['scale-in-relational-operation'],
178 statistic=vnf_monitoring_param['aggregation-type']
179 )
180 ScalingAlarm.create(
181 alarm_id=alarm_uuid,
182 action='scale_in',
183 vnf_member_index=int(vnfr['member-vnf-index-ref']),
184 vdu_name=vdur['name'],
185 scaling_record=scaling_record
186 )
187 alarm_uuid = self.mon_client.create_alarm(
188 metric_name=vdu_monitoring_param['nfvi-metric'],
189 ns_id=nsr_id,
190 vdu_name=vdur['name'],
191 vnf_member_index=vnfr['member-vnf-index-ref'],
192 threshold=scaling_criteria['scale-out-threshold'],
193 operation=scaling_criteria['scale-out-relational-operation'],
194 statistic=vnf_monitoring_param['aggregation-type']
195 )
196 ScalingAlarm.create(
197 alarm_id=alarm_uuid,
198 action='scale_out',
199 vnf_member_index=int(vnfr['member-vnf-index-ref']),
200 vdu_name=vdur['name'],
201 scaling_record=scaling_record
202 )