1 |
|
####################################################################################### |
2 |
|
# Copyright ETSI Contributors and Others. |
3 |
|
# |
4 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
# you may not use this file except in compliance with the License. |
6 |
|
# You may obtain a copy of the License at |
7 |
|
# |
8 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
# |
10 |
|
# Unless required by applicable law or agreed to in writing, software |
11 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
13 |
|
# implied. |
14 |
|
# See the License for the specific language governing permissions and |
15 |
|
# limitations under the License. |
16 |
|
####################################################################################### |
17 |
1 |
from copy import deepcopy |
18 |
1 |
from dataclasses import dataclass |
19 |
1 |
import logging |
20 |
1 |
from os import makedirs, path |
21 |
1 |
from pprint import pformat |
22 |
1 |
import random |
23 |
1 |
import threading |
24 |
1 |
from typing import Optional |
25 |
|
|
26 |
1 |
from importlib_metadata import entry_points |
27 |
1 |
from osm_common import dbmemory, dbmongo |
28 |
1 |
from osm_common.dbbase import DbException |
29 |
1 |
from osm_ng_ro.ns_thread import ConfigValidate |
30 |
1 |
from osm_ro_plugin import vimconn |
31 |
1 |
import yaml |
32 |
1 |
from yaml.representer import RepresenterError |
33 |
|
|
34 |
|
|
35 |
1 |
openStackvmStatusOk = [ |
36 |
|
"ACTIVE", |
37 |
|
"PAUSED", |
38 |
|
"SUSPENDED", |
39 |
|
"SHUTOFF", |
40 |
|
"BUILD", |
41 |
|
] |
42 |
|
|
43 |
1 |
openStacknetStatusOk = [ |
44 |
|
"ACTIVE", |
45 |
|
"PAUSED", |
46 |
|
"BUILD", |
47 |
|
] |
48 |
|
|
49 |
1 |
db_vim_collection = "vim_accounts" |
50 |
1 |
vim_type = "openstack" |
51 |
1 |
ro_task_collection = "ro_tasks" |
52 |
1 |
plugin_name = "rovim_openstack" |
53 |
1 |
monitoring_task = None |
54 |
|
|
55 |
|
|
56 |
1 |
@dataclass |
57 |
1 |
class VmToMonitor: |
58 |
1 |
vm_id: str |
59 |
1 |
target_record: str |
60 |
|
|
61 |
|
|
62 |
1 |
@dataclass |
63 |
1 |
class VimToMonitor: |
64 |
1 |
vim_id: str |
65 |
1 |
vms: list |
66 |
|
|
67 |
|
|
68 |
1 |
class MonitorVmsException(Exception): |
69 |
1 |
def __init__(self, message): |
70 |
1 |
super(Exception, self).__init__(message) |
71 |
|
|
72 |
|
|
73 |
1 |
class MonitorDbException(Exception): |
74 |
1 |
def __init__(self, message): |
75 |
1 |
super(Exception, self).__init__(message) |
76 |
|
|
77 |
|
|
78 |
1 |
class MonitorVimException(Exception): |
79 |
1 |
def __init__(self, message): |
80 |
1 |
super(Exception, self).__init__(message) |
81 |
|
|
82 |
|
|
83 |
1 |
class SafeDumper(yaml.SafeDumper): |
84 |
1 |
def represent_data(self, data): |
85 |
0 |
if isinstance(data, dict) and data.__class__ != dict: |
86 |
|
# A solution to convert subclasses of dict to dicts which is not handled by pyyaml. |
87 |
0 |
data = dict(data.items()) |
88 |
0 |
return super(SafeDumper, self).represent_data(data) |
89 |
|
|
90 |
|
|
91 |
1 |
class MonitorVms: |
92 |
1 |
def __init__(self, config: dict): |
93 |
1 |
self.config = config |
94 |
1 |
self.db = None |
95 |
1 |
self.refresh_config = ConfigValidate(config) |
96 |
1 |
self.my_vims = {} |
97 |
1 |
self.plugins = {} |
98 |
1 |
self.logger = logging.getLogger("ro.monitor") |
99 |
1 |
self.connect_db() |
100 |
1 |
self.db_vims = self.get_db_vims() |
101 |
1 |
self.load_vims() |
102 |
|
|
103 |
1 |
def load_vims(self) -> None: |
104 |
1 |
for vim in self.db_vims: |
105 |
1 |
if vim["_id"] not in self.my_vims: |
106 |
1 |
self._load_vim(vim["_id"]) |
107 |
|
|
108 |
1 |
def connect_db(self) -> None: |
109 |
|
"""Connect to the Database. |
110 |
|
|
111 |
|
Raises: |
112 |
|
MonitorDbException |
113 |
|
""" |
114 |
1 |
try: |
115 |
1 |
if not self.db: |
116 |
1 |
if self.config["database"]["driver"] == "mongo": |
117 |
1 |
self.db = dbmongo.DbMongo() |
118 |
1 |
self.db.db_connect(self.config["database"]) |
119 |
1 |
elif self.config["database"]["driver"] == "memory": |
120 |
1 |
self.db = dbmemory.DbMemory() |
121 |
1 |
self.db.db_connect(self.config["database"]) |
122 |
|
else: |
123 |
1 |
raise MonitorDbException( |
124 |
|
"Invalid configuration param '{}' at '[database]':'driver'".format( |
125 |
|
self.config["database"]["driver"] |
126 |
|
) |
127 |
|
) |
128 |
1 |
except (DbException, MonitorDbException, ValueError) as e: |
129 |
1 |
raise MonitorDbException(str(e)) |
130 |
|
|
131 |
1 |
def get_db_vims(self) -> list: |
132 |
|
"""Get all VIM accounts which types are Openstack.""" |
133 |
1 |
return self.db.get_list(db_vim_collection, {"vim_type": vim_type}) |
134 |
|
|
135 |
1 |
def find_ro_tasks_to_monitor(self) -> list: |
136 |
|
"""Get the ro_tasks which belongs to vdu and status DONE.""" |
137 |
1 |
return self.db.get_list( |
138 |
|
ro_task_collection, |
139 |
|
q_filter={ |
140 |
|
"tasks.status": ["DONE"], |
141 |
|
"tasks.item": ["vdu"], |
142 |
|
}, |
143 |
|
) |
144 |
|
|
145 |
1 |
@staticmethod |
146 |
1 |
def _initialize_target_vim(vim_module_conn, vim: dict) -> object: |
147 |
|
"""Create the VIM connector object with given vim details. |
148 |
|
|
149 |
|
Args: |
150 |
|
vim_module_conn (class): VIM connector class |
151 |
|
vim (dict): VIM details to initialize VIM connecter object |
152 |
|
|
153 |
|
Returns: |
154 |
|
VIM connector (object): VIM connector object |
155 |
|
""" |
156 |
1 |
return vim_module_conn( |
157 |
|
uuid=vim["_id"], |
158 |
|
name=vim["name"], |
159 |
|
tenant_id=vim.get("vim_tenant_id"), |
160 |
|
tenant_name=vim.get("vim_tenant_name"), |
161 |
|
url=vim["vim_url"], |
162 |
|
url_admin=None, |
163 |
|
user=vim["vim_user"], |
164 |
|
passwd=vim["vim_password"], |
165 |
|
config=vim.get("config") or {}, |
166 |
|
persistent_info={}, |
167 |
|
) |
168 |
|
|
169 |
1 |
def _load_vim(self, target_id) -> None: |
170 |
|
"""Load or reload a vim_account. |
171 |
|
Read content from database, load the plugin if not loaded, then it fills my_vims dictionary. |
172 |
|
|
173 |
|
Args: |
174 |
|
target_id (str): ID of vim account |
175 |
|
|
176 |
|
Raises: |
177 |
|
MonitorVimException |
178 |
|
""" |
179 |
1 |
try: |
180 |
1 |
vim = self.db.get_one(db_vim_collection, {"_id": target_id}) |
181 |
1 |
schema_version = vim.get("schema_version") |
182 |
1 |
self.db.encrypt_decrypt_fields( |
183 |
|
vim, |
184 |
|
"decrypt", |
185 |
|
fields=("password", "secret"), |
186 |
|
schema_version=schema_version, |
187 |
|
salt=target_id, |
188 |
|
) |
189 |
1 |
self._process_vim_config(target_id, vim) |
190 |
1 |
vim_module_conn = self._load_plugin(plugin_name) |
191 |
1 |
self.my_vims[target_id] = self._initialize_target_vim(vim_module_conn, vim) |
192 |
1 |
self.logger.debug( |
193 |
|
"Connector loaded for {}, plugin={}".format(target_id, plugin_name) |
194 |
|
) |
195 |
1 |
except ( |
196 |
|
DbException, |
197 |
|
IOError, |
198 |
|
AttributeError, |
199 |
|
MonitorDbException, |
200 |
|
MonitorVimException, |
201 |
|
TypeError, |
202 |
|
) as e: |
203 |
1 |
raise MonitorVimException( |
204 |
|
"Cannot load {} plugin={}: {}".format(target_id, plugin_name, str(e)) |
205 |
|
) |
206 |
|
|
207 |
1 |
@staticmethod |
208 |
1 |
def _process_vim_config(target_id: str, db_vim: dict) -> None: |
209 |
|
""" |
210 |
|
Process vim config, creating vim configuration files as ca_cert |
211 |
|
Args: |
212 |
|
target_id (str): vim id |
213 |
|
db_vim (dict): Vim dictionary obtained from database |
214 |
|
|
215 |
|
Raises: |
216 |
|
MonitorVimException |
217 |
|
""" |
218 |
1 |
if not db_vim.get("config"): |
219 |
1 |
return |
220 |
1 |
file_name = "" |
221 |
1 |
work_dir = "/app/osm_ro/certs" |
222 |
1 |
try: |
223 |
1 |
if db_vim["config"].get("ca_cert_content"): |
224 |
1 |
file_name = f"{work_dir}/{target_id}:{random.randint(0, 99999)}" |
225 |
|
|
226 |
1 |
if not path.isdir(file_name): |
227 |
1 |
makedirs(file_name) |
228 |
|
|
229 |
1 |
file_name = file_name + "/ca_cert" |
230 |
|
|
231 |
1 |
with open(file_name, "w") as f: |
232 |
1 |
f.write(db_vim["config"]["ca_cert_content"]) |
233 |
1 |
del db_vim["config"]["ca_cert_content"] |
234 |
1 |
db_vim["config"]["ca_cert"] = file_name |
235 |
|
|
236 |
1 |
except (FileNotFoundError, IOError, OSError) as e: |
237 |
1 |
raise MonitorVimException( |
238 |
|
"Error writing to file '{}': {}".format(file_name, e) |
239 |
|
) |
240 |
|
|
241 |
1 |
def _load_plugin(self, name: str = "rovim_openstack", type: str = "vim"): |
242 |
|
"""Finds the proper VIM connector and returns VIM connector class name. |
243 |
|
Args: |
244 |
|
name (str): rovim_openstack |
245 |
|
type (str): vim |
246 |
|
|
247 |
|
Returns: |
248 |
|
VIM connector class name (class) |
249 |
|
|
250 |
|
Raises: |
251 |
|
MonitorVimException |
252 |
|
""" |
253 |
1 |
try: |
254 |
1 |
if name in self.plugins: |
255 |
1 |
return self.plugins[name] |
256 |
|
|
257 |
1 |
for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name): |
258 |
1 |
self.plugins[name] = ep.load() |
259 |
1 |
return self.plugins[name] |
260 |
|
|
261 |
1 |
except Exception as e: |
262 |
1 |
raise MonitorVimException("Cannot load plugin osm_{}: {}".format(name, e)) |
263 |
|
|
264 |
1 |
@staticmethod |
265 |
1 |
def create_vm_to_monitor(ro_task: dict) -> Optional[object]: |
266 |
|
"""Create VM using dataclass with ro task details. |
267 |
|
|
268 |
|
Args: |
269 |
|
ro_task (dict): Details of ro_task |
270 |
|
|
271 |
|
Returns: |
272 |
|
VmToMonitor (object) |
273 |
|
""" |
274 |
1 |
if not ro_task: |
275 |
1 |
return |
276 |
1 |
return VmToMonitor( |
277 |
|
ro_task["vim_info"]["vim_id"], ro_task["tasks"][0]["target_record"] |
278 |
|
) |
279 |
|
|
280 |
1 |
@staticmethod |
281 |
1 |
def add_vm_to_existing_vim( |
282 |
|
vims_to_monitor: list, ro_task: dict, target_vim: str |
283 |
|
) -> bool: |
284 |
|
"""Add VmToMonitor to existing VIM list. |
285 |
|
|
286 |
|
Args: |
287 |
|
vims_to_monitor (list): List of VIMs to monitor |
288 |
|
ro_task (dict): ro_task details |
289 |
|
target_vim (str): ID of target VIM |
290 |
|
|
291 |
|
Returns: |
292 |
|
Boolean If VM is added to VIM list, it returns True else False. |
293 |
|
""" |
294 |
1 |
for vim in vims_to_monitor: |
295 |
1 |
if target_vim == vim.vim_id: |
296 |
1 |
vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task) |
297 |
1 |
vim.vms.append(vm_to_monitor) |
298 |
1 |
return True |
299 |
1 |
return False |
300 |
|
|
301 |
1 |
@staticmethod |
302 |
1 |
def add_new_vim_for_monitoring( |
303 |
|
vims_to_monitor: list, ro_task: dict, target_vim: str |
304 |
|
) -> None: |
305 |
|
"""Create a new VIM object and add to vims_to_monitor list. |
306 |
|
|
307 |
|
Args: |
308 |
|
vims_to_monitor (list): List of VIMs to monitor |
309 |
|
ro_task (dict): ro_task details |
310 |
|
target_vim (str): ID of target VIM |
311 |
|
""" |
312 |
1 |
vim_to_monitor = VimToMonitor(target_vim, []) |
313 |
1 |
vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task) |
314 |
1 |
vim_to_monitor.vms.append(vm_to_monitor) |
315 |
1 |
vims_to_monitor.append(vim_to_monitor) |
316 |
|
|
317 |
1 |
@staticmethod |
318 |
1 |
def prepare_vims_to_monitor( |
319 |
|
vims_to_monitor: list, ro_task: dict, target_vim: str |
320 |
|
) -> None: |
321 |
|
"""If the required VIM exists in the vims_to_monitor list, add VM under related VIM, |
322 |
|
otherwise create a new VIM object and add VM to this new created VIM. |
323 |
|
|
324 |
|
Args: |
325 |
|
vims_to_monitor (list): List of VIMs to monitor |
326 |
|
ro_task (dict): ro_task details |
327 |
|
target_vim (str): ID of target VIM |
328 |
|
""" |
329 |
1 |
if not MonitorVms.add_vm_to_existing_vim(vims_to_monitor, ro_task, target_vim): |
330 |
1 |
MonitorVms.add_new_vim_for_monitoring(vims_to_monitor, ro_task, target_vim) |
331 |
|
|
332 |
1 |
def _get_db_paths(self, target_record: str) -> tuple: |
333 |
|
"""Get the database paths and info of target VDU and VIM. |
334 |
|
|
335 |
|
Args: |
336 |
|
target_record (str): A string which includes vnfr_id, vdur_id, vim_id |
337 |
|
|
338 |
|
Returns: |
339 |
|
(vim_info_path: str, vim_id: str, vnfr_id: str, vdur_path:str, vdur_index: int, db_vnfr: dict) tuple |
340 |
|
|
341 |
|
Raises: |
342 |
|
MonitorVmsException |
343 |
|
""" |
344 |
1 |
try: |
345 |
1 |
[_, vnfr_id, vdur_info, vim_id] = target_record.split(":") |
346 |
1 |
vim_info_path = vdur_info + ":" + vim_id |
347 |
1 |
vdur_path = vim_info_path.split(".vim_info.")[0] |
348 |
1 |
vdur_index = int(vdur_path.split(".")[1]) |
349 |
1 |
db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}, fail_on_empty=False) |
350 |
1 |
return vim_info_path, vim_id, vnfr_id, vdur_path, vdur_index, db_vnfr |
351 |
1 |
except (DbException, ValueError) as e: |
352 |
1 |
raise MonitorVmsException(str(e)) |
353 |
|
|
354 |
1 |
@staticmethod |
355 |
1 |
def _check_if_vdur_vim_info_exists( |
356 |
|
db_vnfr: dict, vdur_index: int |
357 |
|
) -> Optional[bool]: |
358 |
|
"""Check if VNF record and vdur vim_info record exists. |
359 |
|
|
360 |
|
Args: |
361 |
|
db_vnfr (dict): VNF record |
362 |
|
vdur_index (int): index of vdur under db_vnfr["vdur"] |
363 |
|
|
364 |
|
Returns: |
365 |
|
Boolean True if VNF record and vdur vim_info record exists. |
366 |
|
""" |
367 |
1 |
try: |
368 |
1 |
if db_vnfr and db_vnfr.get("vdur") and isinstance(vdur_index, int): |
369 |
1 |
if db_vnfr["vdur"][vdur_index] and db_vnfr["vdur"][vdur_index].get( |
370 |
|
"vim_info" |
371 |
|
): |
372 |
1 |
return True |
373 |
1 |
except IndexError: |
374 |
1 |
return |
375 |
|
|
376 |
1 |
def _get_vm_data_from_db(self, vm_to_monitor: object) -> Optional[tuple]: |
377 |
|
"""Get the required DB path and VIM info data from database. |
378 |
|
|
379 |
|
Args: |
380 |
|
vm_to_monitor (object): Includes vm_id and target record in DB. |
381 |
|
|
382 |
|
Returns: |
383 |
|
(vdur_path: str, vdur_vim_info_update: dict, db_vnfr: dict, existing_vim_info: dict, vnfr_id,vim_info_path: str) (Tuple): |
384 |
|
Required VM info if _check_if_vdur_vim_info_exists else None |
385 |
|
""" |
386 |
1 |
( |
387 |
|
vim_info_path, |
388 |
|
vim_id, |
389 |
|
vnfr_id, |
390 |
|
vdur_path, |
391 |
|
vdur_index, |
392 |
|
db_vnfr, |
393 |
|
) = self._get_db_paths(vm_to_monitor.target_record) |
394 |
1 |
if not self._check_if_vdur_vim_info_exists(db_vnfr, vdur_index): |
395 |
1 |
return |
396 |
|
|
397 |
1 |
existing_vim_info = db_vnfr["vdur"][vdur_index]["vim_info"].get("vim:" + vim_id) |
398 |
1 |
if not existing_vim_info: |
399 |
1 |
return |
400 |
|
|
401 |
1 |
vdur_vim_info_update = deepcopy(existing_vim_info) |
402 |
1 |
return ( |
403 |
|
vdur_path, |
404 |
|
vdur_vim_info_update, |
405 |
|
db_vnfr, |
406 |
|
existing_vim_info, |
407 |
|
vnfr_id, |
408 |
|
vim_info_path, |
409 |
|
) |
410 |
|
|
411 |
1 |
@staticmethod |
412 |
1 |
def update_vim_info_for_deleted_vm(vdur_vim_info_update: dict) -> None: |
413 |
|
"""Updates the vdur_vim_info_update to report that VM is deleted. |
414 |
|
|
415 |
|
Args: |
416 |
|
vdur_vim_info_update (dict): Dictionary to be updated and used to update VDUR later. |
417 |
|
""" |
418 |
1 |
vdur_vim_info_update.update( |
419 |
|
{ |
420 |
|
"vim_status": "DELETED", |
421 |
|
"vim_message": "Deleted externally", |
422 |
|
"vim_id": None, |
423 |
|
"vim_name": None, |
424 |
|
"interfaces": None, |
425 |
|
} |
426 |
|
) |
427 |
|
|
428 |
1 |
def report_deleted_vdur(self, vm_to_monitor: object) -> None: |
429 |
|
"""VM does not exist in the Openstack Cloud so update the VNFR to report VM deletion. |
430 |
|
|
431 |
|
Args: |
432 |
|
vm_to_monitor (object): VM needs to be reported as deleted. |
433 |
|
""" |
434 |
1 |
vm_data = self._get_vm_data_from_db(vm_to_monitor) |
435 |
1 |
if not vm_data: |
436 |
1 |
return |
437 |
1 |
( |
438 |
|
vdur_path, |
439 |
|
vdur_vim_info_update, |
440 |
|
_, |
441 |
|
existing_vim_info, |
442 |
|
vnfr_id, |
443 |
|
vim_info_path, |
444 |
|
) = vm_data |
445 |
1 |
self.update_vim_info_for_deleted_vm(vdur_vim_info_update) |
446 |
1 |
vdur_update = { |
447 |
|
vdur_path + ".status": "DELETED", |
448 |
|
} |
449 |
|
|
450 |
1 |
if existing_vim_info != vdur_vim_info_update: |
451 |
|
# VNFR record is updated one time upon VM deletion. |
452 |
1 |
self.logger.info(f"Reporting deletion of VM: {vm_to_monitor.vm_id}") |
453 |
1 |
self.backup_vdu_interfaces(vdur_vim_info_update) |
454 |
1 |
all_updates = [vdur_update, {vim_info_path: vdur_vim_info_update}] |
455 |
1 |
self.update_in_database(all_updates, vnfr_id) |
456 |
1 |
self.logger.info(f"Updated vnfr for vm_id: {vm_to_monitor.vm_id}.") |
457 |
|
|
458 |
1 |
def update_vnfrs(self, servers: list, ports: dict, vms_to_monitor: list) -> None: |
459 |
|
"""Update the VDURs according to the latest information provided by servers list. |
460 |
|
|
461 |
|
Args: |
462 |
|
servers (list): List of existing VMs comes from single Openstack VIM account |
463 |
|
ports (dict): List of all ports comes from single Openstack VIM account |
464 |
|
vms_to_monitor (list): List of VMs to be monitored and updated. |
465 |
|
""" |
466 |
1 |
for vm_to_monitor in vms_to_monitor: |
467 |
1 |
server = next( |
468 |
|
filter(lambda server: server.id == vm_to_monitor.vm_id, servers), None |
469 |
|
) |
470 |
1 |
if server: |
471 |
1 |
self.report_vdur_updates(server, vm_to_monitor, ports) |
472 |
|
else: |
473 |
1 |
self.report_deleted_vdur(vm_to_monitor) |
474 |
|
|
475 |
1 |
def serialize(self, value: dict) -> Optional[str]: |
476 |
|
"""Serialization of python basic types. |
477 |
|
In the case value is not serializable a message will be logged. |
478 |
|
|
479 |
|
Args: |
480 |
|
value (dict/str): Data to serialize |
481 |
|
|
482 |
|
Returns: |
483 |
|
serialized_value (str, yaml) |
484 |
|
""" |
485 |
1 |
if isinstance(value, str): |
486 |
1 |
return value |
487 |
1 |
try: |
488 |
1 |
return yaml.dump( |
489 |
|
value, Dumper=SafeDumper, default_flow_style=True, width=256 |
490 |
|
) |
491 |
1 |
except RepresenterError: |
492 |
1 |
self.logger.info( |
493 |
|
"The following entity cannot be serialized in YAML:\n\n%s\n\n", |
494 |
|
pformat(value), |
495 |
|
exc_info=True, |
496 |
|
) |
497 |
1 |
return str(value) |
498 |
|
|
499 |
1 |
def _get_server_info(self, server: object) -> str: |
500 |
|
"""Get the server info, extract some fields and returns info as string. |
501 |
|
|
502 |
|
Args: |
503 |
|
server (object): VM info object |
504 |
|
|
505 |
|
Returns: |
506 |
|
server_info (string) |
507 |
|
""" |
508 |
1 |
server_info = server.to_dict() |
509 |
1 |
server_info.pop("OS-EXT-SRV-ATTR:user_data", None) |
510 |
1 |
server_info.pop("user_data", None) |
511 |
1 |
return self.serialize(server_info) |
512 |
|
|
513 |
1 |
def check_vm_status_updates( |
514 |
|
self, |
515 |
|
vdur_vim_info_update: dict, |
516 |
|
vdur_update: dict, |
517 |
|
server: object, |
518 |
|
vdur_path: str, |
519 |
|
) -> None: |
520 |
|
"""Fills up dictionaries to update VDUR according to server.status. |
521 |
|
|
522 |
|
Args: |
523 |
|
vdur_vim_info_update (dict): Dictionary which keeps the differences of vdur_vim_info |
524 |
|
vdur_update (dict): Dictionary which keeps the differences of vdur |
525 |
|
server (server): VM info |
526 |
|
vdur_path (str): Path of VDUR in DB |
527 |
|
""" |
528 |
1 |
if server.status in openStackvmStatusOk: |
529 |
1 |
vdur_vim_info_update["vim_status"] = vdur_update[ |
530 |
|
vdur_path + ".status" |
531 |
|
] = server.status |
532 |
|
|
533 |
|
else: |
534 |
1 |
vdur_vim_info_update["vim_status"] = vdur_update[ |
535 |
|
vdur_path + ".status" |
536 |
|
] = server.status |
537 |
1 |
vdur_vim_info_update["vim_message"] = "VIM status reported " + server.status |
538 |
|
|
539 |
1 |
vdur_vim_info_update["vim_details"] = self._get_server_info(server) |
540 |
1 |
vdur_vim_info_update["vim_id"] = server.id |
541 |
1 |
vdur_vim_info_update["vim_name"] = vdur_update[ |
542 |
|
vdur_path + ".name" |
543 |
|
] = server.name |
544 |
|
|
545 |
1 |
@staticmethod |
546 |
1 |
def get_interface_info( |
547 |
|
ports: dict, interface: dict, server: object |
548 |
|
) -> Optional[dict]: |
549 |
|
"""Get the updated port info regarding with existing interface of server. |
550 |
|
|
551 |
|
Args: |
552 |
|
ports (dict): List of all ports belong to single VIM account |
553 |
|
interface (dict): Existing interface info which is taken from DB |
554 |
|
server (object): Server info |
555 |
|
|
556 |
|
Returns: |
557 |
|
port (dict): The updated port info related to existing interface of server |
558 |
|
""" |
559 |
1 |
return next( |
560 |
|
filter( |
561 |
|
lambda port: port.get("id") == interface.get("vim_interface_id") |
562 |
|
and port.get("device_id") == server.id, |
563 |
|
ports["ports"], |
564 |
|
), |
565 |
|
None, |
566 |
|
) |
567 |
|
|
568 |
1 |
@staticmethod |
569 |
1 |
def check_vlan_pci_updates( |
570 |
|
interface_info: dict, index: int, vdur_vim_info_update: dict |
571 |
|
) -> None: |
572 |
|
"""If interface has pci and vlan, update vdur_vim_info dictionary with the refreshed data. |
573 |
|
|
574 |
|
Args: |
575 |
|
interface_info (dict): Refreshed interface info |
576 |
|
index (int): Index of interface in VDUR |
577 |
|
vdur_vim_info_update (dict): Dictionary to be updated and used to update VDUR later. |
578 |
|
""" |
579 |
1 |
if interface_info.get("binding:profile") and interface_info[ |
580 |
|
"binding:profile" |
581 |
|
].get("pci_slot"): |
582 |
1 |
pci = interface_info["binding:profile"]["pci_slot"] |
583 |
1 |
vdur_vim_info_update["interfaces"][index]["pci"] = pci |
584 |
|
|
585 |
1 |
if interface_info.get("binding:vif_details"): |
586 |
1 |
vdur_vim_info_update["interfaces"][index]["vlan"] = interface_info[ |
587 |
|
"binding:vif_details" |
588 |
|
].get("vlan") |
589 |
|
|
590 |
1 |
@staticmethod |
591 |
1 |
def check_vdur_interface_updates( |
592 |
|
vdur_update: dict, |
593 |
|
vdur_path: str, |
594 |
|
index: int, |
595 |
|
interface_info: dict, |
596 |
|
old_interface: dict, |
597 |
|
vnfr_update: dict, |
598 |
|
vnfr_id: str, |
599 |
|
) -> None: |
600 |
|
"""Updates the vdur_update dictionary which stores differences between the latest interface data and data in DB. |
601 |
|
|
602 |
|
Args: |
603 |
|
vdur_update (dict): Dictionary used to store vdur updates |
604 |
|
vdur_path (str): VDUR record path in DB |
605 |
|
index (int): Index of interface in VDUR |
606 |
|
interface_info (dict): Refreshed interface info |
607 |
|
old_interface (dict): The previous interface info comes from DB |
608 |
|
vnfr_update (dict): VDUR record path in DB |
609 |
|
vnfr_id (str): VNFR ID |
610 |
|
""" |
611 |
1 |
current_ip_address = MonitorVms._get_current_ip_address(interface_info) |
612 |
1 |
if current_ip_address: |
613 |
1 |
vdur_update[ |
614 |
|
vdur_path + ".interfaces." + str(index) + ".ip-address" |
615 |
|
] = current_ip_address |
616 |
|
|
617 |
1 |
if old_interface.get("mgmt_vdu_interface"): |
618 |
1 |
vdur_update[vdur_path + ".ip-address"] = current_ip_address |
619 |
|
|
620 |
1 |
if old_interface.get("mgmt_vnf_interface"): |
621 |
1 |
vnfr_update[vnfr_id + ".ip-address"] = current_ip_address |
622 |
|
|
623 |
1 |
vdur_update[ |
624 |
|
vdur_path + ".interfaces." + str(index) + ".mac-address" |
625 |
|
] = interface_info.get("mac_address") |
626 |
|
|
627 |
1 |
@staticmethod |
628 |
1 |
def _get_current_ip_address(interface_info: dict) -> Optional[str]: |
629 |
1 |
if interface_info.get("fixed_ips") and interface_info["fixed_ips"][0]: |
630 |
1 |
return interface_info["fixed_ips"][0].get("ip_address") |
631 |
|
|
632 |
1 |
@staticmethod |
633 |
1 |
def backup_vdu_interfaces(vdur_vim_info_update: dict) -> None: |
634 |
|
"""Backup VDU interfaces as interfaces_backup. |
635 |
|
|
636 |
|
Args: |
637 |
|
vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates |
638 |
|
""" |
639 |
1 |
if vdur_vim_info_update.get("interfaces") and not vdur_vim_info_update.get( |
640 |
|
"vim_message" |
641 |
|
): |
642 |
1 |
vdur_vim_info_update["interfaces_backup"] = vdur_vim_info_update[ |
643 |
|
"interfaces" |
644 |
|
] |
645 |
|
|
646 |
1 |
def update_vdur_vim_info_interfaces( |
647 |
|
self, |
648 |
|
vdur_vim_info_update: dict, |
649 |
|
index: int, |
650 |
|
interface_info: dict, |
651 |
|
server: object, |
652 |
|
) -> None: |
653 |
|
"""Update the vdur_vim_info dictionary with the latest interface info. |
654 |
|
|
655 |
|
Args: |
656 |
|
vdur_vim_info_update (dict): The dictionary which is used to store vdur_vim_info updates |
657 |
|
index (int): Interface index |
658 |
|
interface_info (dict): The latest interface info |
659 |
|
server (object): The latest VM info |
660 |
|
""" |
661 |
1 |
if not ( |
662 |
|
vdur_vim_info_update.get("interfaces") |
663 |
|
and vdur_vim_info_update["interfaces"][index] |
664 |
|
): |
665 |
1 |
raise MonitorVmsException("Existing interfaces info could not found.") |
666 |
|
|
667 |
1 |
vdur_vim_info_update["interfaces"][index].update( |
668 |
|
{ |
669 |
|
"mac_address": interface_info["mac_address"], |
670 |
|
"ip_address": interface_info["fixed_ips"][0].get("ip_address") |
671 |
|
if interface_info.get("fixed_ips") |
672 |
|
else None, |
673 |
|
"vim_net_id": interface_info["network_id"], |
674 |
|
"vim_info": self.serialize(interface_info), |
675 |
|
"compute_node": server.to_dict()["OS-EXT-SRV-ATTR:host"] |
676 |
|
if server.to_dict().get("OS-EXT-SRV-ATTR:host") |
677 |
|
else None, |
678 |
|
} |
679 |
|
) |
680 |
|
|
681 |
1 |
def prepare_interface_updates( |
682 |
|
self, |
683 |
|
vdur_vim_info_update: dict, |
684 |
|
index: int, |
685 |
|
interface_info: dict, |
686 |
|
server: object, |
687 |
|
vdur_path: str, |
688 |
|
vnfr_update: dict, |
689 |
|
old_interface: dict, |
690 |
|
vdur_update: dict, |
691 |
|
vnfr_id: str, |
692 |
|
) -> None: |
693 |
|
"""Updates network related info in vdur_vim_info and vdur by using the latest interface info. |
694 |
|
|
695 |
|
Args: |
696 |
|
vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates |
697 |
|
index (int): Interface index |
698 |
|
interface_info (dict): The latest interface info |
699 |
|
server (object): The latest VM info |
700 |
|
vdur_path (str): VDUR record path in DB |
701 |
|
vnfr_update (dict): VDUR record path in DB |
702 |
|
old_interface (dict): The previous interface info comes from DB |
703 |
|
vdur_update (dict): Dictionary used to store vdur updates |
704 |
|
vnfr_id (str): VNFR ID |
705 |
|
""" |
706 |
1 |
self.update_vdur_vim_info_interfaces( |
707 |
|
vdur_vim_info_update, index, interface_info, server |
708 |
|
) |
709 |
1 |
self.check_vlan_pci_updates(interface_info, index, vdur_vim_info_update) |
710 |
1 |
self.check_vdur_interface_updates( |
711 |
|
vdur_update, |
712 |
|
vdur_path, |
713 |
|
index, |
714 |
|
interface_info, |
715 |
|
old_interface, |
716 |
|
vnfr_update, |
717 |
|
vnfr_id, |
718 |
|
) |
719 |
|
|
720 |
1 |
def check_vm_interface_updates( |
721 |
|
self, |
722 |
|
server: object, |
723 |
|
existing_vim_info: dict, |
724 |
|
ports: dict, |
725 |
|
vdur_vim_info_update: dict, |
726 |
|
vdur_update: dict, |
727 |
|
vdur_path: str, |
728 |
|
vnfr_update: dict, |
729 |
|
vnfr_id: str, |
730 |
|
) -> None: |
731 |
|
"""Gets the refreshed interfaces info of server and updates the VDUR if interfaces exist, |
732 |
|
otherwise reports that interfaces are deleted. |
733 |
|
|
734 |
|
Args: |
735 |
|
server (object): The latest VM info |
736 |
|
existing_vim_info (dict): VM info details comes from DB |
737 |
|
ports (dict): All ports info belongs to single VIM account |
738 |
|
vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates |
739 |
|
vdur_update (dict): Dictionary used to store vdur updates |
740 |
|
vdur_path (str): VDUR record path in DB |
741 |
|
vnfr_update (dict): VDUR record path in DB |
742 |
|
vnfr_id (str): VNFR ID |
743 |
|
""" |
744 |
1 |
for index, old_interface in enumerate(existing_vim_info["interfaces"]): |
745 |
1 |
interface_info = self.get_interface_info(ports, old_interface, server) |
746 |
1 |
if not interface_info: |
747 |
1 |
vdur_vim_info_update[ |
748 |
|
"vim_message" |
749 |
|
] = f"Interface {old_interface['vim_interface_id']} deleted externally." |
750 |
|
|
751 |
|
else: |
752 |
1 |
if interface_info.get("status") in openStacknetStatusOk: |
753 |
1 |
self.prepare_interface_updates( |
754 |
|
vdur_vim_info_update, |
755 |
|
index, |
756 |
|
interface_info, |
757 |
|
server, |
758 |
|
vdur_path, |
759 |
|
vnfr_update, |
760 |
|
old_interface, |
761 |
|
vdur_update, |
762 |
|
vnfr_id, |
763 |
|
) |
764 |
|
|
765 |
|
else: |
766 |
1 |
vdur_vim_info_update["vim_message"] = ( |
767 |
|
f"Interface {old_interface['vim_interface_id']} status: " |
768 |
|
+ interface_info.get("status") |
769 |
|
) |
770 |
|
|
771 |
1 |
def update_in_database(self, all_updates: list, vnfr_id: str) -> None: |
772 |
|
"""Update differences in VNFR. |
773 |
|
|
774 |
|
Args: |
775 |
|
all_updates (list): List of dictionaries which includes differences |
776 |
|
vnfr_id (str): VNF record ID |
777 |
|
|
778 |
|
Raises: |
779 |
|
MonitorDbException |
780 |
|
""" |
781 |
1 |
try: |
782 |
1 |
for updated_dict in all_updates: |
783 |
1 |
if updated_dict: |
784 |
1 |
self.db.set_list( |
785 |
|
"vnfrs", |
786 |
|
update_dict=updated_dict, |
787 |
|
q_filter={"_id": vnfr_id}, |
788 |
|
) |
789 |
1 |
except DbException as e: |
790 |
1 |
raise MonitorDbException( |
791 |
|
f"Error while updating differences in VNFR {str(e)}" |
792 |
|
) |
793 |
|
|
794 |
1 |
def report_vdur_updates( |
795 |
|
self, server: object, vm_to_monitor: object, ports: dict |
796 |
|
) -> None: |
797 |
|
"""Report VDU updates by changing the VDUR records in DB. |
798 |
|
|
799 |
|
Args: |
800 |
|
server (object): Refreshed VM info |
801 |
|
vm_to_monitor (object): VM to be monitored |
802 |
|
ports (dict): Ports dict includes all ports details regarding with single VIM account |
803 |
|
""" |
804 |
1 |
vm_data = self._get_vm_data_from_db(vm_to_monitor) |
805 |
1 |
if not vm_data: |
806 |
1 |
return |
807 |
1 |
( |
808 |
|
vdur_path, |
809 |
|
vdur_vim_info_update, |
810 |
|
_, |
811 |
|
existing_vim_info, |
812 |
|
vnfr_id, |
813 |
|
vim_info_path, |
814 |
|
) = vm_data |
815 |
1 |
vdur_update, vnfr_update = {}, {} |
816 |
|
|
817 |
1 |
self.check_vm_status_updates( |
818 |
|
vdur_vim_info_update, vdur_update, server, vdur_path |
819 |
|
) |
820 |
|
|
821 |
1 |
self.check_vm_interface_updates( |
822 |
|
server, |
823 |
|
existing_vim_info, |
824 |
|
ports, |
825 |
|
vdur_vim_info_update, |
826 |
|
vdur_update, |
827 |
|
vdur_path, |
828 |
|
vnfr_update, |
829 |
|
vnfr_id, |
830 |
|
) |
831 |
|
# Update vnfr in MongoDB if there are differences |
832 |
1 |
if existing_vim_info != vdur_vim_info_update: |
833 |
1 |
self.logger.info(f"Reporting status updates of VM: {vm_to_monitor.vm_id}.") |
834 |
1 |
self.backup_vdu_interfaces(vdur_vim_info_update) |
835 |
1 |
all_updates = [ |
836 |
|
vdur_update, |
837 |
|
{vim_info_path: vdur_vim_info_update}, |
838 |
|
vnfr_update, |
839 |
|
] |
840 |
1 |
self.update_in_database(all_updates, vnfr_id) |
841 |
1 |
self.logger.info(f"Updated vnfr for vm_id: {server.id}.") |
842 |
|
|
843 |
1 |
def run(self) -> None: |
844 |
|
"""Perfoms the periodic updates of Openstack VMs by sending only two requests to Openstack APIs |
845 |
|
for each VIM account (in order to get details of all servers, all ports). |
846 |
|
|
847 |
|
Raises: |
848 |
|
MonitorVmsException |
849 |
|
""" |
850 |
1 |
try: |
851 |
|
# If there is not any Openstack type VIM account in DB or VM status updates are disabled by config, |
852 |
|
# Openstack VMs will not be monitored. |
853 |
1 |
if not self.db_vims or self.refresh_config.active == -1: |
854 |
1 |
return |
855 |
|
|
856 |
1 |
ro_tasks_to_monitor = self.find_ro_tasks_to_monitor() |
857 |
1 |
db_vims = [vim["_id"] for vim in self.db_vims] |
858 |
1 |
vims_to_monitor = [] |
859 |
|
|
860 |
1 |
for ro_task in ro_tasks_to_monitor: |
861 |
1 |
_, _, target_vim = ro_task["target_id"].partition(":") |
862 |
1 |
if target_vim in db_vims: |
863 |
1 |
self.prepare_vims_to_monitor(vims_to_monitor, ro_task, target_vim) |
864 |
|
|
865 |
1 |
for vim in vims_to_monitor: |
866 |
1 |
all_servers, all_ports = self.my_vims[vim.vim_id].get_monitoring_data() |
867 |
1 |
self.update_vnfrs(all_servers, all_ports, vim.vms) |
868 |
1 |
except ( |
869 |
|
DbException, |
870 |
|
MonitorDbException, |
871 |
|
MonitorVimException, |
872 |
|
MonitorVmsException, |
873 |
|
ValueError, |
874 |
|
KeyError, |
875 |
|
TypeError, |
876 |
|
AttributeError, |
877 |
|
vimconn.VimConnException, |
878 |
|
) as e: |
879 |
1 |
raise MonitorVmsException( |
880 |
|
f"Exception while monitoring Openstack VMs: {str(e)}" |
881 |
|
) |
882 |
|
|
883 |
|
|
884 |
1 |
def start_monitoring(config: dict): |
885 |
|
global monitoring_task |
886 |
1 |
if not (config and config.get("period")): |
887 |
1 |
raise MonitorVmsException("Wrong configuration format is provided.") |
888 |
1 |
instance = MonitorVms(config) |
889 |
1 |
period = instance.refresh_config.active |
890 |
1 |
instance.run() |
891 |
1 |
monitoring_task = threading.Timer(period, start_monitoring, args=(config,)) |
892 |
1 |
monitoring_task.start() |
893 |
|
|
894 |
|
|
895 |
1 |
def stop_monitoring(): |
896 |
|
global monitoring_task |
897 |
1 |
if monitoring_task: |
898 |
1 |
monitoring_task.cancel() |