b9a590857b1c4feafe13132a99e5ba760a0d55af
[osm/RO.git] / NG-RO / osm_ng_ro / monitor.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy import deepcopy
18 from dataclasses import dataclass
19 import logging
20 from os import makedirs, path
21 from pprint import pformat
22 import random
23 import threading
24 from typing import Optional
25
26 from importlib_metadata import entry_points
27 from osm_common import dbmemory, dbmongo
28 from osm_common.dbbase import DbException
29 from osm_ng_ro.ns_thread import ConfigValidate
30 from osm_ro_plugin import vimconn
31 import yaml
32 from yaml.representer import RepresenterError
33
34
35 openStackvmStatusOk = [
36 "ACTIVE",
37 "PAUSED",
38 "SUSPENDED",
39 "SHUTOFF",
40 "BUILD",
41 ]
42
43 openStacknetStatusOk = [
44 "ACTIVE",
45 "PAUSED",
46 "BUILD",
47 ]
48
49 db_vim_collection = "vim_accounts"
50 vim_type = "openstack"
51 ro_task_collection = "ro_tasks"
52 plugin_name = "rovim_openstack"
53 monitoring_task = None
54
55
56 @dataclass
57 class VmToMonitor:
58 vm_id: str
59 target_record: str
60
61
62 @dataclass
63 class VimToMonitor:
64 vim_id: str
65 vms: list
66
67
68 class MonitorVmsException(Exception):
69 def __init__(self, message):
70 super(Exception, self).__init__(message)
71
72
73 class MonitorDbException(Exception):
74 def __init__(self, message):
75 super(Exception, self).__init__(message)
76
77
78 class MonitorVimException(Exception):
79 def __init__(self, message):
80 super(Exception, self).__init__(message)
81
82
83 class SafeDumper(yaml.SafeDumper):
84 def represent_data(self, data):
85 if isinstance(data, dict) and data.__class__ != dict:
86 # A solution to convert subclasses of dict to dicts which is not handled by pyyaml.
87 data = dict(data.items())
88 return super(SafeDumper, self).represent_data(data)
89
90
91 class MonitorVms:
92 def __init__(self, config: dict):
93 self.config = config
94 self.db = None
95 self.refresh_config = ConfigValidate(config)
96 self.my_vims = {}
97 self.plugins = {}
98 self.logger = logging.getLogger("ro.monitor")
99 self.connect_db()
100 self.db_vims = self.get_db_vims()
101 self.load_vims()
102
103 def load_vims(self) -> None:
104 for vim in self.db_vims:
105 if vim["_id"] not in self.my_vims:
106 self._load_vim(vim["_id"])
107
108 def connect_db(self) -> None:
109 """Connect to the Database.
110
111 Raises:
112 MonitorDbException
113 """
114 try:
115 if not self.db:
116 if self.config["database"]["driver"] == "mongo":
117 self.db = dbmongo.DbMongo()
118 self.db.db_connect(self.config["database"])
119 elif self.config["database"]["driver"] == "memory":
120 self.db = dbmemory.DbMemory()
121 self.db.db_connect(self.config["database"])
122 else:
123 raise MonitorDbException(
124 "Invalid configuration param '{}' at '[database]':'driver'".format(
125 self.config["database"]["driver"]
126 )
127 )
128 except (DbException, MonitorDbException, ValueError) as e:
129 raise MonitorDbException(str(e))
130
131 def get_db_vims(self) -> list:
132 """Get all VIM accounts which types are Openstack."""
133 return self.db.get_list(db_vim_collection, {"vim_type": vim_type})
134
135 def find_ro_tasks_to_monitor(self) -> list:
136 """Get the ro_tasks which belongs to vdu and status DONE."""
137 return self.db.get_list(
138 ro_task_collection,
139 q_filter={
140 "tasks.status": ["DONE"],
141 "tasks.item": ["vdu"],
142 },
143 )
144
145 @staticmethod
146 def _initialize_target_vim(vim_module_conn, vim: dict) -> object:
147 """Create the VIM connector object with given vim details.
148
149 Args:
150 vim_module_conn (class): VIM connector class
151 vim (dict): VIM details to initialize VIM connecter object
152
153 Returns:
154 VIM connector (object): VIM connector object
155 """
156 return vim_module_conn(
157 uuid=vim["_id"],
158 name=vim["name"],
159 tenant_id=vim.get("vim_tenant_id"),
160 tenant_name=vim.get("vim_tenant_name"),
161 url=vim["vim_url"],
162 url_admin=None,
163 user=vim["vim_user"],
164 passwd=vim["vim_password"],
165 config=vim.get("config") or {},
166 persistent_info={},
167 )
168
169 def _load_vim(self, target_id) -> None:
170 """Load or reload a vim_account.
171 Read content from database, load the plugin if not loaded, then it fills my_vims dictionary.
172
173 Args:
174 target_id (str): ID of vim account
175
176 Raises:
177 MonitorVimException
178 """
179 try:
180 vim = self.db.get_one(db_vim_collection, {"_id": target_id})
181 schema_version = vim.get("schema_version")
182 self.db.encrypt_decrypt_fields(
183 vim,
184 "decrypt",
185 fields=("password", "secret"),
186 schema_version=schema_version,
187 salt=target_id,
188 )
189 self._process_vim_config(target_id, vim)
190 vim_module_conn = self._load_plugin(plugin_name)
191 self.my_vims[target_id] = self._initialize_target_vim(vim_module_conn, vim)
192 self.logger.debug(
193 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
194 )
195 except (
196 DbException,
197 IOError,
198 AttributeError,
199 MonitorDbException,
200 MonitorVimException,
201 TypeError,
202 ) as e:
203 raise MonitorVimException(
204 "Cannot load {} plugin={}: {}".format(target_id, plugin_name, str(e))
205 )
206
207 @staticmethod
208 def _process_vim_config(target_id: str, db_vim: dict) -> None:
209 """
210 Process vim config, creating vim configuration files as ca_cert
211 Args:
212 target_id (str): vim id
213 db_vim (dict): Vim dictionary obtained from database
214
215 Raises:
216 MonitorVimException
217 """
218 if not db_vim.get("config"):
219 return
220 file_name = ""
221 work_dir = "/app/osm_ro/certs"
222 try:
223 if db_vim["config"].get("ca_cert_content"):
224 file_name = f"{work_dir}/{target_id}:{random.randint(0, 99999)}"
225
226 if not path.isdir(file_name):
227 makedirs(file_name)
228
229 file_name = file_name + "/ca_cert"
230
231 with open(file_name, "w") as f:
232 f.write(db_vim["config"]["ca_cert_content"])
233 del db_vim["config"]["ca_cert_content"]
234 db_vim["config"]["ca_cert"] = file_name
235
236 except (FileNotFoundError, IOError, OSError) as e:
237 raise MonitorVimException(
238 "Error writing to file '{}': {}".format(file_name, e)
239 )
240
241 def _load_plugin(self, name: str = "rovim_openstack", type: str = "vim"):
242 """Finds the proper VIM connector and returns VIM connector class name.
243 Args:
244 name (str): rovim_openstack
245 type (str): vim
246
247 Returns:
248 VIM connector class name (class)
249
250 Raises:
251 MonitorVimException
252 """
253 try:
254 if name in self.plugins:
255 return self.plugins[name]
256
257 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
258 self.plugins[name] = ep.load()
259 return self.plugins[name]
260
261 except Exception as e:
262 raise MonitorVimException("Cannot load plugin osm_{}: {}".format(name, e))
263
264 @staticmethod
265 def create_vm_to_monitor(ro_task: dict) -> Optional[object]:
266 """Create VM using dataclass with ro task details.
267
268 Args:
269 ro_task (dict): Details of ro_task
270
271 Returns:
272 VmToMonitor (object)
273 """
274 if not ro_task:
275 return
276 return VmToMonitor(
277 ro_task["vim_info"]["vim_id"], ro_task["tasks"][0]["target_record"]
278 )
279
280 @staticmethod
281 def add_vm_to_existing_vim(
282 vims_to_monitor: list, ro_task: dict, target_vim: str
283 ) -> bool:
284 """Add VmToMonitor to existing VIM list.
285
286 Args:
287 vims_to_monitor (list): List of VIMs to monitor
288 ro_task (dict): ro_task details
289 target_vim (str): ID of target VIM
290
291 Returns:
292 Boolean If VM is added to VIM list, it returns True else False.
293 """
294 for vim in vims_to_monitor:
295 if target_vim == vim.vim_id:
296 vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task)
297 vim.vms.append(vm_to_monitor)
298 return True
299 return False
300
301 @staticmethod
302 def add_new_vim_for_monitoring(
303 vims_to_monitor: list, ro_task: dict, target_vim: str
304 ) -> None:
305 """Create a new VIM object and add to vims_to_monitor list.
306
307 Args:
308 vims_to_monitor (list): List of VIMs to monitor
309 ro_task (dict): ro_task details
310 target_vim (str): ID of target VIM
311 """
312 vim_to_monitor = VimToMonitor(target_vim, [])
313 vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task)
314 vim_to_monitor.vms.append(vm_to_monitor)
315 vims_to_monitor.append(vim_to_monitor)
316
317 @staticmethod
318 def prepare_vims_to_monitor(
319 vims_to_monitor: list, ro_task: dict, target_vim: str
320 ) -> None:
321 """If the required VIM exists in the vims_to_monitor list, add VM under related VIM,
322 otherwise create a new VIM object and add VM to this new created VIM.
323
324 Args:
325 vims_to_monitor (list): List of VIMs to monitor
326 ro_task (dict): ro_task details
327 target_vim (str): ID of target VIM
328 """
329 if not MonitorVms.add_vm_to_existing_vim(vims_to_monitor, ro_task, target_vim):
330 MonitorVms.add_new_vim_for_monitoring(vims_to_monitor, ro_task, target_vim)
331
332 def _get_db_paths(self, target_record: str) -> tuple:
333 """Get the database paths and info of target VDU and VIM.
334
335 Args:
336 target_record (str): A string which includes vnfr_id, vdur_id, vim_id
337
338 Returns:
339 (vim_info_path: str, vim_id: str, vnfr_id: str, vdur_path:str, vdur_index: int, db_vnfr: dict) tuple
340
341 Raises:
342 MonitorVmsException
343 """
344 try:
345 [_, vnfr_id, vdur_info, vim_id] = target_record.split(":")
346 vim_info_path = vdur_info + ":" + vim_id
347 vdur_path = vim_info_path.split(".vim_info.")[0]
348 vdur_index = int(vdur_path.split(".")[1])
349 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}, fail_on_empty=False)
350 return vim_info_path, vim_id, vnfr_id, vdur_path, vdur_index, db_vnfr
351 except (DbException, ValueError) as e:
352 raise MonitorVmsException(str(e))
353
354 @staticmethod
355 def _check_if_vdur_vim_info_exists(
356 db_vnfr: dict, vdur_index: int
357 ) -> Optional[bool]:
358 """Check if VNF record and vdur vim_info record exists.
359
360 Args:
361 db_vnfr (dict): VNF record
362 vdur_index (int): index of vdur under db_vnfr["vdur"]
363
364 Returns:
365 Boolean True if VNF record and vdur vim_info record exists.
366 """
367 try:
368 if db_vnfr and db_vnfr.get("vdur") and isinstance(vdur_index, int):
369 if db_vnfr["vdur"][vdur_index] and db_vnfr["vdur"][vdur_index].get(
370 "vim_info"
371 ):
372 return True
373 except IndexError:
374 return
375
376 def _get_vm_data_from_db(self, vm_to_monitor: object) -> Optional[tuple]:
377 """Get the required DB path and VIM info data from database.
378
379 Args:
380 vm_to_monitor (object): Includes vm_id and target record in DB.
381
382 Returns:
383 (vdur_path: str, vdur_vim_info_update: dict, db_vnfr: dict, existing_vim_info: dict, vnfr_id,vim_info_path: str) (Tuple):
384 Required VM info if _check_if_vdur_vim_info_exists else None
385 """
386 (
387 vim_info_path,
388 vim_id,
389 vnfr_id,
390 vdur_path,
391 vdur_index,
392 db_vnfr,
393 ) = self._get_db_paths(vm_to_monitor.target_record)
394 if not self._check_if_vdur_vim_info_exists(db_vnfr, vdur_index):
395 return
396
397 existing_vim_info = db_vnfr["vdur"][vdur_index]["vim_info"].get("vim:" + vim_id)
398 if not existing_vim_info:
399 return
400
401 vdur_vim_info_update = deepcopy(existing_vim_info)
402 return (
403 vdur_path,
404 vdur_vim_info_update,
405 db_vnfr,
406 existing_vim_info,
407 vnfr_id,
408 vim_info_path,
409 )
410
411 @staticmethod
412 def update_vim_info_for_deleted_vm(vdur_vim_info_update: dict) -> None:
413 """Updates the vdur_vim_info_update to report that VM is deleted.
414
415 Args:
416 vdur_vim_info_update (dict): Dictionary to be updated and used to update VDUR later.
417 """
418 vdur_vim_info_update.update(
419 {
420 "vim_status": "DELETED",
421 "vim_message": "Deleted externally",
422 "vim_details": None,
423 "vim_id": None,
424 "vim_name": None,
425 "interfaces": None,
426 }
427 )
428
429 def report_deleted_vdur(self, vm_to_monitor: object) -> None:
430 """VM does not exist in the Openstack Cloud so update the VNFR to report VM deletion.
431
432 Args:
433 vm_to_monitor (object): VM needs to be reported as deleted.
434 """
435 vm_data = self._get_vm_data_from_db(vm_to_monitor)
436 if not vm_data:
437 return
438 (
439 vdur_path,
440 vdur_vim_info_update,
441 _,
442 existing_vim_info,
443 vnfr_id,
444 vim_info_path,
445 ) = vm_data
446 self.update_vim_info_for_deleted_vm(vdur_vim_info_update)
447 vdur_update = {
448 vdur_path + ".status": "DELETED",
449 }
450
451 if existing_vim_info != vdur_vim_info_update:
452 # VNFR record is updated one time upon VM deletion.
453 self.logger.info(f"Reporting deletion of VM: {vm_to_monitor.vm_id}")
454 self.backup_vdu_interfaces(vdur_vim_info_update)
455 all_updates = [vdur_update, {vim_info_path: vdur_vim_info_update}]
456 self.update_in_database(all_updates, vnfr_id)
457 self.logger.info(f"Updated vnfr for vm_id: {vm_to_monitor.vm_id}.")
458
459 def update_vnfrs(self, servers: list, ports: dict, vms_to_monitor: list) -> None:
460 """Update the VDURs according to the latest information provided by servers list.
461
462 Args:
463 servers (list): List of existing VMs comes from single Openstack VIM account
464 ports (dict): List of all ports comes from single Openstack VIM account
465 vms_to_monitor (list): List of VMs to be monitored and updated.
466 """
467 for vm_to_monitor in vms_to_monitor:
468 server = next(
469 filter(lambda server: server.id == vm_to_monitor.vm_id, servers), None
470 )
471 if server:
472 self.report_vdur_updates(server, vm_to_monitor, ports)
473 else:
474 self.report_deleted_vdur(vm_to_monitor)
475
476 def serialize(self, value: dict) -> Optional[str]:
477 """Serialization of python basic types.
478 In the case value is not serializable a message will be logged.
479
480 Args:
481 value (dict/str): Data to serialize
482
483 Returns:
484 serialized_value (str, yaml)
485 """
486 if isinstance(value, str):
487 return value
488 try:
489 return yaml.dump(
490 value, Dumper=SafeDumper, default_flow_style=True, width=256
491 )
492 except RepresenterError:
493 self.logger.info(
494 "The following entity cannot be serialized in YAML:\n\n%s\n\n",
495 pformat(value),
496 exc_info=True,
497 )
498 return str(value)
499
500 def _get_server_info(self, server: object) -> str:
501 """Get the server info, extract some fields and returns info as string.
502
503 Args:
504 server (object): VM info object
505
506 Returns:
507 server_info (string)
508 """
509 server_info = server.to_dict()
510 server_info.pop("OS-EXT-SRV-ATTR:user_data", None)
511 server_info.pop("user_data", None)
512 return self.serialize(server_info)
513
514 def check_vm_status_updates(
515 self,
516 vdur_vim_info_update: dict,
517 vdur_update: dict,
518 server: object,
519 vdur_path: str,
520 ) -> None:
521 """Fills up dictionaries to update VDUR according to server.status.
522
523 Args:
524 vdur_vim_info_update (dict): Dictionary which keeps the differences of vdur_vim_info
525 vdur_update (dict): Dictionary which keeps the differences of vdur
526 server (server): VM info
527 vdur_path (str): Path of VDUR in DB
528 """
529 if server.status in openStackvmStatusOk:
530 vdur_vim_info_update["vim_status"] = vdur_update[
531 vdur_path + ".status"
532 ] = server.status
533
534 else:
535 vdur_vim_info_update["vim_status"] = vdur_update[
536 vdur_path + ".status"
537 ] = server.status
538 vdur_vim_info_update["vim_message"] = "VIM status reported " + server.status
539
540 vdur_vim_info_update["vim_details"] = self._get_server_info(server)
541 vdur_vim_info_update["vim_id"] = server.id
542 vdur_vim_info_update["vim_name"] = vdur_update[
543 vdur_path + ".name"
544 ] = server.name
545
546 @staticmethod
547 def get_interface_info(
548 ports: dict, interface: dict, server: object
549 ) -> Optional[dict]:
550 """Get the updated port info regarding with existing interface of server.
551
552 Args:
553 ports (dict): List of all ports belong to single VIM account
554 interface (dict): Existing interface info which is taken from DB
555 server (object): Server info
556
557 Returns:
558 port (dict): The updated port info related to existing interface of server
559 """
560 return next(
561 filter(
562 lambda port: port.get("id") == interface.get("vim_interface_id")
563 and port.get("device_id") == server.id,
564 ports["ports"],
565 ),
566 None,
567 )
568
569 @staticmethod
570 def check_vlan_pci_updates(
571 interface_info: dict, index: int, vdur_vim_info_update: dict
572 ) -> None:
573 """If interface has pci and vlan, update vdur_vim_info dictionary with the refreshed data.
574
575 Args:
576 interface_info (dict): Refreshed interface info
577 index (int): Index of interface in VDUR
578 vdur_vim_info_update (dict): Dictionary to be updated and used to update VDUR later.
579 """
580 if interface_info.get("binding:profile") and interface_info[
581 "binding:profile"
582 ].get("pci_slot"):
583 pci = interface_info["binding:profile"]["pci_slot"]
584 vdur_vim_info_update["interfaces"][index]["pci"] = pci
585
586 if interface_info.get("binding:vif_details"):
587 vdur_vim_info_update["interfaces"][index]["vlan"] = interface_info[
588 "binding:vif_details"
589 ].get("vlan")
590
591 @staticmethod
592 def check_vdur_interface_updates(
593 vdur_update: dict,
594 vdur_path: str,
595 index: int,
596 interface_info: dict,
597 old_interface: dict,
598 vnfr_update: dict,
599 vnfr_id: str,
600 ) -> None:
601 """Updates the vdur_update dictionary which stores differences between the latest interface data and data in DB.
602
603 Args:
604 vdur_update (dict): Dictionary used to store vdur updates
605 vdur_path (str): VDUR record path in DB
606 index (int): Index of interface in VDUR
607 interface_info (dict): Refreshed interface info
608 old_interface (dict): The previous interface info comes from DB
609 vnfr_update (dict): VDUR record path in DB
610 vnfr_id (str): VNFR ID
611 """
612 current_ip_address = MonitorVms._get_current_ip_address(interface_info)
613 if current_ip_address:
614 vdur_update[
615 vdur_path + ".interfaces." + str(index) + ".ip-address"
616 ] = current_ip_address
617
618 if old_interface.get("mgmt_vdu_interface"):
619 vdur_update[vdur_path + ".ip-address"] = current_ip_address
620
621 if old_interface.get("mgmt_vnf_interface"):
622 vnfr_update[vnfr_id + ".ip-address"] = current_ip_address
623
624 vdur_update[
625 vdur_path + ".interfaces." + str(index) + ".mac-address"
626 ] = interface_info.get("mac_address")
627
628 @staticmethod
629 def _get_current_ip_address(interface_info: dict) -> Optional[str]:
630 if interface_info.get("fixed_ips") and interface_info["fixed_ips"][0]:
631 return interface_info["fixed_ips"][0].get("ip_address")
632
633 @staticmethod
634 def backup_vdu_interfaces(vdur_vim_info_update: dict) -> None:
635 """Backup VDU interfaces as interfaces_backup.
636
637 Args:
638 vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates
639 """
640 if vdur_vim_info_update.get("interfaces") and not vdur_vim_info_update.get(
641 "vim_message"
642 ):
643 vdur_vim_info_update["interfaces_backup"] = vdur_vim_info_update[
644 "interfaces"
645 ]
646
647 def update_vdur_vim_info_interfaces(
648 self,
649 vdur_vim_info_update: dict,
650 index: int,
651 interface_info: dict,
652 server: object,
653 ) -> None:
654 """Update the vdur_vim_info dictionary with the latest interface info.
655
656 Args:
657 vdur_vim_info_update (dict): The dictionary which is used to store vdur_vim_info updates
658 index (int): Interface index
659 interface_info (dict): The latest interface info
660 server (object): The latest VM info
661 """
662 if not (
663 vdur_vim_info_update.get("interfaces")
664 and vdur_vim_info_update["interfaces"][index]
665 ):
666 raise MonitorVmsException("Existing interfaces info could not found.")
667
668 vdur_vim_info_update["interfaces"][index].update(
669 {
670 "mac_address": interface_info["mac_address"],
671 "ip_address": interface_info["fixed_ips"][0].get("ip_address")
672 if interface_info.get("fixed_ips")
673 else None,
674 "vim_net_id": interface_info["network_id"],
675 "vim_info": self.serialize(interface_info),
676 "compute_node": server.to_dict()["OS-EXT-SRV-ATTR:host"]
677 if server.to_dict().get("OS-EXT-SRV-ATTR:host")
678 else None,
679 }
680 )
681
682 def prepare_interface_updates(
683 self,
684 vdur_vim_info_update: dict,
685 index: int,
686 interface_info: dict,
687 server: object,
688 vdur_path: str,
689 vnfr_update: dict,
690 old_interface: dict,
691 vdur_update: dict,
692 vnfr_id: str,
693 ) -> None:
694 """Updates network related info in vdur_vim_info and vdur by using the latest interface info.
695
696 Args:
697 vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates
698 index (int): Interface index
699 interface_info (dict): The latest interface info
700 server (object): The latest VM info
701 vdur_path (str): VDUR record path in DB
702 vnfr_update (dict): VDUR record path in DB
703 old_interface (dict): The previous interface info comes from DB
704 vdur_update (dict): Dictionary used to store vdur updates
705 vnfr_id (str): VNFR ID
706 """
707 self.update_vdur_vim_info_interfaces(
708 vdur_vim_info_update, index, interface_info, server
709 )
710 self.check_vlan_pci_updates(interface_info, index, vdur_vim_info_update)
711 self.check_vdur_interface_updates(
712 vdur_update,
713 vdur_path,
714 index,
715 interface_info,
716 old_interface,
717 vnfr_update,
718 vnfr_id,
719 )
720
721 def check_vm_interface_updates(
722 self,
723 server: object,
724 existing_vim_info: dict,
725 ports: dict,
726 vdur_vim_info_update: dict,
727 vdur_update: dict,
728 vdur_path: str,
729 vnfr_update: dict,
730 vnfr_id: str,
731 ) -> None:
732 """Gets the refreshed interfaces info of server and updates the VDUR if interfaces exist,
733 otherwise reports that interfaces are deleted.
734
735 Args:
736 server (object): The latest VM info
737 existing_vim_info (dict): VM info details comes from DB
738 ports (dict): All ports info belongs to single VIM account
739 vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates
740 vdur_update (dict): Dictionary used to store vdur updates
741 vdur_path (str): VDUR record path in DB
742 vnfr_update (dict): VDUR record path in DB
743 vnfr_id (str): VNFR ID
744 """
745 for index, old_interface in enumerate(existing_vim_info["interfaces"]):
746 interface_info = self.get_interface_info(ports, old_interface, server)
747 if not interface_info:
748 vdur_vim_info_update[
749 "vim_message"
750 ] = f"Interface {old_interface['vim_interface_id']} deleted externally."
751
752 else:
753 if interface_info.get("status") in openStacknetStatusOk:
754 self.prepare_interface_updates(
755 vdur_vim_info_update,
756 index,
757 interface_info,
758 server,
759 vdur_path,
760 vnfr_update,
761 old_interface,
762 vdur_update,
763 vnfr_id,
764 )
765
766 else:
767 vdur_vim_info_update["vim_message"] = (
768 f"Interface {old_interface['vim_interface_id']} status: "
769 + interface_info.get("status")
770 )
771
772 def update_in_database(self, all_updates: list, vnfr_id: str) -> None:
773 """Update differences in VNFR.
774
775 Args:
776 all_updates (list): List of dictionaries which includes differences
777 vnfr_id (str): VNF record ID
778
779 Raises:
780 MonitorDbException
781 """
782 try:
783 for updated_dict in all_updates:
784 if updated_dict:
785 self.db.set_list(
786 "vnfrs",
787 update_dict=updated_dict,
788 q_filter={"_id": vnfr_id},
789 )
790 except DbException as e:
791 raise MonitorDbException(
792 f"Error while updating differences in VNFR {str(e)}"
793 )
794
795 def report_vdur_updates(
796 self, server: object, vm_to_monitor: object, ports: dict
797 ) -> None:
798 """Report VDU updates by changing the VDUR records in DB.
799
800 Args:
801 server (object): Refreshed VM info
802 vm_to_monitor (object): VM to be monitored
803 ports (dict): Ports dict includes all ports details regarding with single VIM account
804 """
805 vm_data = self._get_vm_data_from_db(vm_to_monitor)
806 if not vm_data:
807 return
808 (
809 vdur_path,
810 vdur_vim_info_update,
811 _,
812 existing_vim_info,
813 vnfr_id,
814 vim_info_path,
815 ) = vm_data
816 vdur_update, vnfr_update = {}, {}
817
818 self.check_vm_status_updates(
819 vdur_vim_info_update, vdur_update, server, vdur_path
820 )
821
822 self.check_vm_interface_updates(
823 server,
824 existing_vim_info,
825 ports,
826 vdur_vim_info_update,
827 vdur_update,
828 vdur_path,
829 vnfr_update,
830 vnfr_id,
831 )
832 # Update vnfr in MongoDB if there are differences
833 if existing_vim_info != vdur_vim_info_update:
834 self.logger.info(f"Reporting status updates of VM: {vm_to_monitor.vm_id}.")
835 self.backup_vdu_interfaces(vdur_vim_info_update)
836 all_updates = [
837 vdur_update,
838 {vim_info_path: vdur_vim_info_update},
839 vnfr_update,
840 ]
841 self.update_in_database(all_updates, vnfr_id)
842 self.logger.info(f"Updated vnfr for vm_id: {server.id}.")
843
844 def run(self) -> None:
845 """Perfoms the periodic updates of Openstack VMs by sending only two requests to Openstack APIs
846 for each VIM account (in order to get details of all servers, all ports).
847
848 Raises:
849 MonitorVmsException
850 """
851 try:
852 # If there is not any Openstack type VIM account in DB or VM status updates are disabled by config,
853 # Openstack VMs will not be monitored.
854 if not self.db_vims or self.refresh_config.active == -1:
855 return
856
857 ro_tasks_to_monitor = self.find_ro_tasks_to_monitor()
858 db_vims = [vim["_id"] for vim in self.db_vims]
859 vims_to_monitor = []
860
861 for ro_task in ro_tasks_to_monitor:
862 _, _, target_vim = ro_task["target_id"].partition(":")
863 if target_vim in db_vims:
864 self.prepare_vims_to_monitor(vims_to_monitor, ro_task, target_vim)
865
866 for vim in vims_to_monitor:
867 all_servers, all_ports = self.my_vims[vim.vim_id].get_monitoring_data()
868 self.update_vnfrs(all_servers, all_ports, vim.vms)
869 except (
870 DbException,
871 MonitorDbException,
872 MonitorVimException,
873 MonitorVmsException,
874 ValueError,
875 KeyError,
876 TypeError,
877 AttributeError,
878 vimconn.VimConnException,
879 ) as e:
880 raise MonitorVmsException(
881 f"Exception while monitoring Openstack VMs: {str(e)}"
882 )
883
884
885 def start_monitoring(config: dict):
886 global monitoring_task
887 if not (config and config.get("period")):
888 raise MonitorVmsException("Wrong configuration format is provided.")
889 instance = MonitorVms(config)
890 period = instance.refresh_config.active
891 instance.run()
892 monitoring_task = threading.Timer(period, start_monitoring, args=(config,))
893 monitoring_task.start()
894
895
896 def stop_monitoring():
897 global monitoring_task
898 if monitoring_task:
899 monitoring_task.cancel()