Code Coverage

Cobertura Coverage Report > NG-RO.osm_ng_ro >

monitor.py

Trend

File Coverage summary

NameClassesLinesConditionals
monitor.py
100%
1/1
99%
303/307
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
monitor.py
99%
303/307
N/A

Source

NG-RO/osm_ng_ro/monitor.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 1 from copy import deepcopy
18 1 from dataclasses import dataclass
19 1 import logging
20 1 from os import makedirs, path
21 1 from pprint import pformat
22 1 import random
23 1 import threading
24 1 from typing import Optional
25
26 1 from importlib_metadata import entry_points
27 1 from osm_common import dbmemory, dbmongo
28 1 from osm_common.dbbase import DbException
29 1 from osm_ng_ro.ns_thread import ConfigValidate
30 1 from osm_ro_plugin import vimconn
31 1 import yaml
32 1 from yaml.representer import RepresenterError
33
34
35 1 openStackvmStatusOk = [
36     "ACTIVE",
37     "PAUSED",
38     "SUSPENDED",
39     "SHUTOFF",
40     "BUILD",
41 ]
42
43 1 openStacknetStatusOk = [
44     "ACTIVE",
45     "PAUSED",
46     "BUILD",
47 ]
48
49 1 db_vim_collection = "vim_accounts"
50 1 vim_type = "openstack"
51 1 ro_task_collection = "ro_tasks"
52 1 plugin_name = "rovim_openstack"
53 1 monitoring_task = None
54
55
56 1 @dataclass
57 1 class VmToMonitor:
58 1     vm_id: str
59 1     target_record: str
60
61
62 1 @dataclass
63 1 class VimToMonitor:
64 1     vim_id: str
65 1     vms: list
66
67
68 1 class MonitorVmsException(Exception):
69 1     def __init__(self, message):
70 1         super(Exception, self).__init__(message)
71
72
73 1 class MonitorDbException(Exception):
74 1     def __init__(self, message):
75 1         super(Exception, self).__init__(message)
76
77
78 1 class MonitorVimException(Exception):
79 1     def __init__(self, message):
80 1         super(Exception, self).__init__(message)
81
82
83 1 class SafeDumper(yaml.SafeDumper):
84 1     def represent_data(self, data):
85 0         if isinstance(data, dict) and data.__class__ != dict:
86             # A solution to convert subclasses of dict to dicts which is not handled by pyyaml.
87 0             data = dict(data.items())
88 0         return super(SafeDumper, self).represent_data(data)
89
90
91 1 class MonitorVms:
92 1     def __init__(self, config: dict):
93 1         self.config = config
94 1         self.db = None
95 1         self.refresh_config = ConfigValidate(config)
96 1         self.my_vims = {}
97 1         self.plugins = {}
98 1         self.logger = logging.getLogger("ro.monitor")
99 1         self.connect_db()
100 1         self.db_vims = self.get_db_vims()
101 1         self.load_vims()
102
103 1     def load_vims(self) -> None:
104 1         for vim in self.db_vims:
105 1             if vim["_id"] not in self.my_vims:
106 1                 self._load_vim(vim["_id"])
107
108 1     def connect_db(self) -> None:
109         """Connect to the Database.
110
111         Raises:
112            MonitorDbException
113         """
114 1         try:
115 1             if not self.db:
116 1                 if self.config["database"]["driver"] == "mongo":
117 1                     self.db = dbmongo.DbMongo()
118 1                     self.db.db_connect(self.config["database"])
119 1                 elif self.config["database"]["driver"] == "memory":
120 1                     self.db = dbmemory.DbMemory()
121 1                     self.db.db_connect(self.config["database"])
122                 else:
123 1                     raise MonitorDbException(
124                         "Invalid configuration param '{}' at '[database]':'driver'".format(
125                             self.config["database"]["driver"]
126                         )
127                     )
128 1         except (DbException, MonitorDbException, ValueError) as e:
129 1             raise MonitorDbException(str(e))
130
131 1     def get_db_vims(self) -> list:
132         """Get all VIM accounts which types are Openstack."""
133 1         return self.db.get_list(db_vim_collection, {"vim_type": vim_type})
134
135 1     def find_ro_tasks_to_monitor(self) -> list:
136         """Get the ro_tasks which belongs to vdu and status DONE."""
137 1         return self.db.get_list(
138             ro_task_collection,
139             q_filter={
140                 "tasks.status": ["DONE"],
141                 "tasks.item": ["vdu"],
142             },
143         )
144
145 1     @staticmethod
146 1     def _initialize_target_vim(vim_module_conn, vim: dict) -> object:
147         """Create the VIM connector object with given vim details.
148
149         Args:
150             vim_module_conn (class):    VIM connector class
151             vim (dict):                 VIM details to initialize VIM connecter object
152
153         Returns:
154             VIM connector   (object):   VIM connector object
155         """
156 1         return vim_module_conn(
157             uuid=vim["_id"],
158             name=vim["name"],
159             tenant_id=vim.get("vim_tenant_id"),
160             tenant_name=vim.get("vim_tenant_name"),
161             url=vim["vim_url"],
162             url_admin=None,
163             user=vim["vim_user"],
164             passwd=vim["vim_password"],
165             config=vim.get("config") or {},
166             persistent_info={},
167         )
168
169 1     def _load_vim(self, target_id) -> None:
170         """Load or reload a vim_account.
171         Read content from database, load the plugin if not loaded, then it fills my_vims dictionary.
172
173         Args:
174             target_id   (str):      ID of vim account
175
176         Raises:
177             MonitorVimException
178         """
179 1         try:
180 1             vim = self.db.get_one(db_vim_collection, {"_id": target_id})
181 1             schema_version = vim.get("schema_version")
182 1             self.db.encrypt_decrypt_fields(
183                 vim,
184                 "decrypt",
185                 fields=("password", "secret"),
186                 schema_version=schema_version,
187                 salt=target_id,
188             )
189 1             self._process_vim_config(target_id, vim)
190 1             vim_module_conn = self._load_plugin(plugin_name)
191 1             self.my_vims[target_id] = self._initialize_target_vim(vim_module_conn, vim)
192 1             self.logger.debug(
193                 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
194             )
195 1         except (
196             DbException,
197             IOError,
198             AttributeError,
199             MonitorDbException,
200             MonitorVimException,
201             TypeError,
202         ) as e:
203 1             raise MonitorVimException(
204                 "Cannot load {} plugin={}: {}".format(target_id, plugin_name, str(e))
205             )
206
207 1     @staticmethod
208 1     def _process_vim_config(target_id: str, db_vim: dict) -> None:
209         """
210         Process vim config, creating vim configuration files as ca_cert
211         Args:
212             target_id   (str): vim id
213             db_vim      (dict): Vim dictionary obtained from database
214
215         Raises:
216             MonitorVimException
217         """
218 1         if not db_vim.get("config"):
219 1             return
220 1         file_name = ""
221 1         work_dir = "/app/osm_ro/certs"
222 1         try:
223 1             if db_vim["config"].get("ca_cert_content"):
224 1                 file_name = f"{work_dir}/{target_id}:{random.randint(0, 99999)}"
225
226 1                 if not path.isdir(file_name):
227 1                     makedirs(file_name)
228
229 1                 file_name = file_name + "/ca_cert"
230
231 1                 with open(file_name, "w") as f:
232 1                     f.write(db_vim["config"]["ca_cert_content"])
233 1                     del db_vim["config"]["ca_cert_content"]
234 1                     db_vim["config"]["ca_cert"] = file_name
235
236 1         except (FileNotFoundError, IOError, OSError) as e:
237 1             raise MonitorVimException(
238                 "Error writing to file '{}': {}".format(file_name, e)
239             )
240
241 1     def _load_plugin(self, name: str = "rovim_openstack", type: str = "vim"):
242         """Finds the proper VIM connector and returns VIM connector class name.
243         Args:
244             name  (str):          rovim_openstack
245             type  (str):          vim
246
247         Returns:
248             VIM connector class name    (class)
249
250         Raises:
251             MonitorVimException
252         """
253 1         try:
254 1             if name in self.plugins:
255 1                 return self.plugins[name]
256
257 1             for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
258 1                 self.plugins[name] = ep.load()
259 1                 return self.plugins[name]
260
261 1         except Exception as e:
262 1             raise MonitorVimException("Cannot load plugin osm_{}: {}".format(name, e))
263
264 1     @staticmethod
265 1     def create_vm_to_monitor(ro_task: dict) -> Optional[object]:
266         """Create VM using dataclass with ro task details.
267
268         Args:
269             ro_task (dict):             Details of ro_task
270
271         Returns:
272             VmToMonitor (object)
273         """
274 1         if not ro_task:
275 1             return
276 1         return VmToMonitor(
277             ro_task["vim_info"]["vim_id"], ro_task["tasks"][0]["target_record"]
278         )
279
280 1     @staticmethod
281 1     def add_vm_to_existing_vim(
282         vims_to_monitor: list, ro_task: dict, target_vim: str
283     ) -> bool:
284         """Add VmToMonitor to existing VIM list.
285
286         Args:
287             vims_to_monitor (list):     List of VIMs to monitor
288             ro_task (dict):             ro_task details
289             target_vim  (str):          ID of target VIM
290
291         Returns:
292             Boolean         If VM is added to VIM list, it returns True else False.
293         """
294 1         for vim in vims_to_monitor:
295 1             if target_vim == vim.vim_id:
296 1                 vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task)
297 1                 vim.vms.append(vm_to_monitor)
298 1                 return True
299 1         return False
300
301 1     @staticmethod
302 1     def add_new_vim_for_monitoring(
303         vims_to_monitor: list, ro_task: dict, target_vim: str
304     ) -> None:
305         """Create a new VIM object and add to vims_to_monitor list.
306
307         Args:
308             vims_to_monitor (list):     List of VIMs to monitor
309             ro_task (dict):             ro_task details
310             target_vim  (str):          ID of target VIM
311         """
312 1         vim_to_monitor = VimToMonitor(target_vim, [])
313 1         vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task)
314 1         vim_to_monitor.vms.append(vm_to_monitor)
315 1         vims_to_monitor.append(vim_to_monitor)
316
317 1     @staticmethod
318 1     def prepare_vims_to_monitor(
319         vims_to_monitor: list, ro_task: dict, target_vim: str
320     ) -> None:
321         """If the required VIM exists in the vims_to_monitor list, add VM under related VIM,
322         otherwise create a new VIM object and add VM to this new created VIM.
323
324         Args:
325             vims_to_monitor (list):     List of VIMs to monitor
326             ro_task (dict):             ro_task details
327             target_vim  (str):          ID of target VIM
328         """
329 1         if not MonitorVms.add_vm_to_existing_vim(vims_to_monitor, ro_task, target_vim):
330 1             MonitorVms.add_new_vim_for_monitoring(vims_to_monitor, ro_task, target_vim)
331
332 1     def _get_db_paths(self, target_record: str) -> tuple:
333         """Get the database paths and info of target VDU and VIM.
334
335         Args:
336             target_record   (str):      A string which includes vnfr_id, vdur_id, vim_id
337
338         Returns:
339             (vim_info_path: str, vim_id: str, vnfr_id: str, vdur_path:str, vdur_index: int, db_vnfr: dict)  tuple
340
341         Raises:
342             MonitorVmsException
343         """
344 1         try:
345 1             [_, vnfr_id, vdur_info, vim_id] = target_record.split(":")
346 1             vim_info_path = vdur_info + ":" + vim_id
347 1             vdur_path = vim_info_path.split(".vim_info.")[0]
348 1             vdur_index = int(vdur_path.split(".")[1])
349 1             db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}, fail_on_empty=False)
350 1             return vim_info_path, vim_id, vnfr_id, vdur_path, vdur_index, db_vnfr
351 1         except (DbException, ValueError) as e:
352 1             raise MonitorVmsException(str(e))
353
354 1     @staticmethod
355 1     def _check_if_vdur_vim_info_exists(
356         db_vnfr: dict, vdur_index: int
357     ) -> Optional[bool]:
358         """Check if VNF record and vdur vim_info record exists.
359
360         Args:
361             db_vnfr (dict):             VNF record
362             vdur_index  (int):          index of vdur under db_vnfr["vdur"]
363
364         Returns:
365             Boolean                     True if VNF record and vdur vim_info record exists.
366         """
367 1         try:
368 1             if db_vnfr and db_vnfr.get("vdur") and isinstance(vdur_index, int):
369 1                 if db_vnfr["vdur"][vdur_index] and db_vnfr["vdur"][vdur_index].get(
370                     "vim_info"
371                 ):
372 1                     return True
373 1         except IndexError:
374 1             return
375
376 1     def _get_vm_data_from_db(self, vm_to_monitor: object) -> Optional[tuple]:
377         """Get the required DB path and VIM info data from database.
378
379         Args:
380             vm_to_monitor   (object):    Includes vm_id and target record in DB.
381
382         Returns:
383             (vdur_path: str, vdur_vim_info_update: dict, db_vnfr: dict, existing_vim_info: dict, vnfr_id,vim_info_path: str)    (Tuple):
384             Required VM info if _check_if_vdur_vim_info_exists else None
385         """
386 1         (
387             vim_info_path,
388             vim_id,
389             vnfr_id,
390             vdur_path,
391             vdur_index,
392             db_vnfr,
393         ) = self._get_db_paths(vm_to_monitor.target_record)
394 1         if not self._check_if_vdur_vim_info_exists(db_vnfr, vdur_index):
395 1             return
396
397 1         existing_vim_info = db_vnfr["vdur"][vdur_index]["vim_info"].get("vim:" + vim_id)
398 1         if not existing_vim_info:
399 1             return
400
401 1         vdur_vim_info_update = deepcopy(existing_vim_info)
402 1         return (
403             vdur_path,
404             vdur_vim_info_update,
405             db_vnfr,
406             existing_vim_info,
407             vnfr_id,
408             vim_info_path,
409         )
410
411 1     @staticmethod
412 1     def update_vim_info_for_deleted_vm(vdur_vim_info_update: dict) -> None:
413         """Updates the vdur_vim_info_update to report that VM is deleted.
414
415         Args:
416              vdur_vim_info_update    (dict):     Dictionary to be updated and used to update VDUR later.
417         """
418 1         vdur_vim_info_update.update(
419             {
420                 "vim_status": "DELETED",
421                 "vim_message": "Deleted externally",
422                 "vim_id": None,
423                 "vim_name": None,
424                 "interfaces": None,
425             }
426         )
427
428 1     def report_deleted_vdur(self, vm_to_monitor: object) -> None:
429         """VM does not exist in the Openstack Cloud so update the VNFR to report VM deletion.
430
431         Args:
432             vm_to_monitor   (object):        VM needs to be reported as deleted.
433         """
434 1         vm_data = self._get_vm_data_from_db(vm_to_monitor)
435 1         if not vm_data:
436 1             return
437 1         (
438             vdur_path,
439             vdur_vim_info_update,
440             _,
441             existing_vim_info,
442             vnfr_id,
443             vim_info_path,
444         ) = vm_data
445 1         self.update_vim_info_for_deleted_vm(vdur_vim_info_update)
446 1         vdur_update = {
447             vdur_path + ".status": "DELETED",
448         }
449
450 1         if existing_vim_info != vdur_vim_info_update:
451             # VNFR record is updated one time upon VM deletion.
452 1             self.logger.info(f"Reporting deletion of VM: {vm_to_monitor.vm_id}")
453 1             self.backup_vdu_interfaces(vdur_vim_info_update)
454 1             all_updates = [vdur_update, {vim_info_path: vdur_vim_info_update}]
455 1             self.update_in_database(all_updates, vnfr_id)
456 1             self.logger.info(f"Updated vnfr for vm_id: {vm_to_monitor.vm_id}.")
457
458 1     def update_vnfrs(self, servers: list, ports: dict, vms_to_monitor: list) -> None:
459         """Update the VDURs according to the latest information provided by servers list.
460
461         Args:
462             servers    (list):          List of existing VMs comes from single Openstack VIM account
463             ports     (dict):           List of all ports comes from single Openstack VIM account
464             vms_to_monitor  (list):     List of VMs to be monitored and updated.
465         """
466 1         for vm_to_monitor in vms_to_monitor:
467 1             server = next(
468                 filter(lambda server: server.id == vm_to_monitor.vm_id, servers), None
469             )
470 1             if server:
471 1                 self.report_vdur_updates(server, vm_to_monitor, ports)
472             else:
473 1                 self.report_deleted_vdur(vm_to_monitor)
474
475 1     def serialize(self, value: dict) -> Optional[str]:
476         """Serialization of python basic types.
477         In the case value is not serializable a message will be logged.
478
479         Args:
480             value   (dict/str):     Data to serialize
481
482         Returns:
483             serialized_value    (str, yaml)
484         """
485 1         if isinstance(value, str):
486 1             return value
487 1         try:
488 1             return yaml.dump(
489                 value, Dumper=SafeDumper, default_flow_style=True, width=256
490             )
491 1         except RepresenterError:
492 1             self.logger.info(
493                 "The following entity cannot be serialized in YAML:\n\n%s\n\n",
494                 pformat(value),
495                 exc_info=True,
496             )
497 1             return str(value)
498
499 1     def _get_server_info(self, server: object) -> str:
500         """Get the server info, extract some fields and returns info as string.
501
502         Args:
503             server  (object):        VM info object
504
505         Returns:
506             server_info (string)
507         """
508 1         server_info = server.to_dict()
509 1         server_info.pop("OS-EXT-SRV-ATTR:user_data", None)
510 1         server_info.pop("user_data", None)
511 1         return self.serialize(server_info)
512
513 1     def check_vm_status_updates(
514         self,
515         vdur_vim_info_update: dict,
516         vdur_update: dict,
517         server: object,
518         vdur_path: str,
519     ) -> None:
520         """Fills up dictionaries to update VDUR according to server.status.
521
522         Args:
523             vdur_vim_info_update    (dict):         Dictionary which keeps the differences of vdur_vim_info
524             vdur_update             (dict):         Dictionary which keeps the differences of vdur
525             server                  (server):       VM info
526             vdur_path               (str):          Path of VDUR in DB
527         """
528 1         if server.status in openStackvmStatusOk:
529 1             vdur_vim_info_update["vim_status"] = vdur_update[vdur_path + ".status"] = (
530                 server.status
531             )
532
533         else:
534 1             vdur_vim_info_update["vim_status"] = vdur_update[vdur_path + ".status"] = (
535                 server.status
536             )
537 1             vdur_vim_info_update["vim_message"] = "VIM status reported " + server.status
538
539 1         vdur_vim_info_update["vim_details"] = self._get_server_info(server)
540 1         vdur_vim_info_update["vim_id"] = server.id
541 1         vdur_vim_info_update["vim_name"] = vdur_update[vdur_path + ".name"] = (
542             server.name
543         )
544
545 1     @staticmethod
546 1     def get_interface_info(
547         ports: dict, interface: dict, server: object
548     ) -> Optional[dict]:
549         """Get the updated port info regarding with existing interface of server.
550
551         Args:
552             ports       (dict):             List of all ports belong to single VIM account
553             interface   (dict):             Existing interface info which is taken from DB
554             server  (object):               Server info
555
556         Returns:
557             port    (dict):                 The updated port info related to existing interface of server
558         """
559 1         return next(
560             filter(
561                 lambda port: port.get("id") == interface.get("vim_interface_id")
562                 and port.get("device_id") == server.id,
563                 ports["ports"],
564             ),
565             None,
566         )
567
568 1     @staticmethod
569 1     def check_vlan_pci_updates(
570         interface_info: dict, index: int, vdur_vim_info_update: dict
571     ) -> None:
572         """If interface has pci and vlan, update vdur_vim_info dictionary with the refreshed data.
573
574         Args:
575             interface_info          (dict):     Refreshed interface info
576             index                   (int):      Index of interface in VDUR
577             vdur_vim_info_update    (dict):     Dictionary to be updated and used to update VDUR later.
578         """
579 1         if interface_info.get("binding:profile") and interface_info[
580             "binding:profile"
581         ].get("pci_slot"):
582 1             pci = interface_info["binding:profile"]["pci_slot"]
583 1             vdur_vim_info_update["interfaces"][index]["pci"] = pci
584
585 1         if interface_info.get("binding:vif_details"):
586 1             vdur_vim_info_update["interfaces"][index]["vlan"] = interface_info[
587                 "binding:vif_details"
588             ].get("vlan")
589
590 1     @staticmethod
591 1     def check_vdur_interface_updates(
592         vdur_update: dict,
593         vdur_path: str,
594         index: int,
595         interface_info: dict,
596         old_interface: dict,
597         vnfr_update: dict,
598         vnfr_id: str,
599     ) -> None:
600         """Updates the vdur_update dictionary which stores differences between the latest interface data and data in DB.
601
602         Args:
603             vdur_update     (dict):         Dictionary used to store vdur updates
604             vdur_path       (str):          VDUR record path in DB
605             index           (int):          Index of interface in VDUR
606             interface_info  (dict):         Refreshed interface info
607             old_interface       (dict):     The previous interface info comes from DB
608             vnfr_update     (dict):         VDUR record path in DB
609             vnfr_id         (str):          VNFR ID
610         """
611 1         current_ip_address = MonitorVms._get_current_ip_address(interface_info)
612 1         if current_ip_address:
613 1             vdur_update[vdur_path + ".interfaces." + str(index) + ".ip-address"] = (
614                 current_ip_address
615             )
616
617 1             if old_interface.get("mgmt_vdu_interface"):
618 1                 vdur_update[vdur_path + ".ip-address"] = current_ip_address
619
620 1             if old_interface.get("mgmt_vnf_interface"):
621 1                 vnfr_update[vnfr_id + ".ip-address"] = current_ip_address
622
623 1         vdur_update[vdur_path + ".interfaces." + str(index) + ".mac-address"] = (
624             interface_info.get("mac_address")
625         )
626
627 1     @staticmethod
628 1     def _get_dual_ip(data=None):
629 1         if data:
630 1             ip_addresses = [item["ip_address"] for item in data]
631 1             return ";".join(ip_addresses) if len(ip_addresses) > 1 else ip_addresses[0]
632         else:
633 0             return None
634
635 1     @staticmethod
636 1     def _get_current_ip_address(interface_info: dict) -> Optional[str]:
637 1         if interface_info.get("fixed_ips") and interface_info["fixed_ips"][0]:
638 1             return MonitorVms._get_dual_ip(interface_info.get("fixed_ips"))
639
640 1     @staticmethod
641 1     def backup_vdu_interfaces(vdur_vim_info_update: dict) -> None:
642         """Backup VDU interfaces as interfaces_backup.
643
644         Args:
645             vdur_vim_info_update    (dict):   Dictionary used to store vdur_vim_info updates
646         """
647 1         if vdur_vim_info_update.get("interfaces") and not vdur_vim_info_update.get(
648             "vim_message"
649         ):
650 1             vdur_vim_info_update["interfaces_backup"] = vdur_vim_info_update[
651                 "interfaces"
652             ]
653
654 1     def update_vdur_vim_info_interfaces(
655         self,
656         vdur_vim_info_update: dict,
657         index: int,
658         interface_info: dict,
659         server: object,
660     ) -> None:
661         """Update the vdur_vim_info dictionary with the latest interface info.
662
663         Args:
664             vdur_vim_info_update    (dict):     The dictionary which is used to store vdur_vim_info updates
665             index       (int):                  Interface index
666             interface_info  (dict):             The latest interface info
667             server  (object):                The latest VM info
668         """
669 1         if not (
670             vdur_vim_info_update.get("interfaces")
671             and vdur_vim_info_update["interfaces"][index]
672         ):
673 1             raise MonitorVmsException("Existing interfaces info could not found.")
674
675 1         vdur_vim_info_update["interfaces"][index].update(
676             {
677                 "mac_address": interface_info["mac_address"],
678                 "ip_address": (
679                     interface_info["fixed_ips"][0].get("ip_address")
680                     if interface_info.get("fixed_ips")
681                     else None
682                 ),
683                 "vim_net_id": interface_info["network_id"],
684                 "vim_info": self.serialize(interface_info),
685                 "compute_node": (
686                     server.to_dict()["OS-EXT-SRV-ATTR:host"]
687                     if server.to_dict().get("OS-EXT-SRV-ATTR:host")
688                     else None
689                 ),
690             }
691         )
692
693 1     def prepare_interface_updates(
694         self,
695         vdur_vim_info_update: dict,
696         index: int,
697         interface_info: dict,
698         server: object,
699         vdur_path: str,
700         vnfr_update: dict,
701         old_interface: dict,
702         vdur_update: dict,
703         vnfr_id: str,
704     ) -> None:
705         """Updates network related info in vdur_vim_info and vdur by using the latest interface info.
706
707         Args:
708             vdur_vim_info_update    (dict):     Dictionary used to store vdur_vim_info updates
709             index       (int):                  Interface index
710             interface_info  (dict):             The latest interface info
711             server  (object):                   The latest VM info
712             vdur_path       (str):              VDUR record path in DB
713             vnfr_update     (dict):             VDUR record path in DB
714             old_interface  (dict):              The previous interface info comes from DB
715             vdur_update     (dict):             Dictionary used to store vdur updates
716             vnfr_id         (str):              VNFR ID
717         """
718 1         self.update_vdur_vim_info_interfaces(
719             vdur_vim_info_update, index, interface_info, server
720         )
721 1         self.check_vlan_pci_updates(interface_info, index, vdur_vim_info_update)
722 1         self.check_vdur_interface_updates(
723             vdur_update,
724             vdur_path,
725             index,
726             interface_info,
727             old_interface,
728             vnfr_update,
729             vnfr_id,
730         )
731
732 1     def check_vm_interface_updates(
733         self,
734         server: object,
735         existing_vim_info: dict,
736         ports: dict,
737         vdur_vim_info_update: dict,
738         vdur_update: dict,
739         vdur_path: str,
740         vnfr_update: dict,
741         vnfr_id: str,
742     ) -> None:
743         """Gets the refreshed interfaces info of server and updates the VDUR if interfaces exist,
744         otherwise reports that interfaces are deleted.
745
746         Args:
747             server  (object):                   The latest VM info
748             existing_vim_info   (dict):         VM info details comes from DB
749             ports       (dict):                 All ports info belongs to single VIM account
750             vdur_vim_info_update    (dict):     Dictionary used to store vdur_vim_info updates
751             vdur_update     (dict):             Dictionary used to store vdur updates
752             vdur_path       (str):              VDUR record path in DB
753             vnfr_update     (dict):             VDUR record path in DB
754             vnfr_id         (str):              VNFR ID
755         """
756 1         for index, old_interface in enumerate(existing_vim_info["interfaces"]):
757 1             interface_info = self.get_interface_info(ports, old_interface, server)
758 1             if not interface_info:
759 1                 vdur_vim_info_update["vim_message"] = (
760                     f"Interface {old_interface['vim_interface_id']} deleted externally."
761                 )
762
763             else:
764 1                 if interface_info.get("status") in openStacknetStatusOk:
765 1                     self.prepare_interface_updates(
766                         vdur_vim_info_update,
767                         index,
768                         interface_info,
769                         server,
770                         vdur_path,
771                         vnfr_update,
772                         old_interface,
773                         vdur_update,
774                         vnfr_id,
775                     )
776
777                 else:
778 1                     vdur_vim_info_update["vim_message"] = (
779                         f"Interface {old_interface['vim_interface_id']} status: "
780                         + interface_info.get("status")
781                     )
782
783 1     def update_in_database(self, all_updates: list, vnfr_id: str) -> None:
784         """Update differences in VNFR.
785
786         Args:
787             all_updates     (list):     List of dictionaries which includes differences
788             vnfr_id         (str):      VNF record ID
789
790         Raises:
791             MonitorDbException
792         """
793 1         try:
794 1             for updated_dict in all_updates:
795 1                 if updated_dict:
796 1                     self.db.set_list(
797                         "vnfrs",
798                         update_dict=updated_dict,
799                         q_filter={"_id": vnfr_id},
800                     )
801 1         except DbException as e:
802 1             raise MonitorDbException(
803                 f"Error while updating differences in VNFR {str(e)}"
804             )
805
806 1     def report_vdur_updates(
807         self, server: object, vm_to_monitor: object, ports: dict
808     ) -> None:
809         """Report VDU updates by changing the VDUR records in DB.
810
811         Args:
812             server      (object):               Refreshed VM info
813             vm_to_monitor   (object):           VM to be monitored
814             ports       (dict):                 Ports dict includes all ports details regarding with single VIM account
815         """
816 1         vm_data = self._get_vm_data_from_db(vm_to_monitor)
817 1         if not vm_data:
818 1             return
819 1         (
820             vdur_path,
821             vdur_vim_info_update,
822             _,
823             existing_vim_info,
824             vnfr_id,
825             vim_info_path,
826         ) = vm_data
827 1         vdur_update, vnfr_update = {}, {}
828
829 1         self.check_vm_status_updates(
830             vdur_vim_info_update, vdur_update, server, vdur_path
831         )
832
833 1         self.check_vm_interface_updates(
834             server,
835             existing_vim_info,
836             ports,
837             vdur_vim_info_update,
838             vdur_update,
839             vdur_path,
840             vnfr_update,
841             vnfr_id,
842         )
843         # Update vnfr in MongoDB if there are differences
844 1         if existing_vim_info != vdur_vim_info_update:
845 1             self.logger.info(f"Reporting status updates of VM: {vm_to_monitor.vm_id}.")
846 1             self.backup_vdu_interfaces(vdur_vim_info_update)
847 1             all_updates = [
848                 vdur_update,
849                 {vim_info_path: vdur_vim_info_update},
850                 vnfr_update,
851             ]
852 1             self.update_in_database(all_updates, vnfr_id)
853 1             self.logger.info(f"Updated vnfr for vm_id: {server.id}.")
854
855 1     def run(self) -> None:
856         """Perfoms the periodic updates of Openstack VMs by sending only two requests to Openstack APIs
857         for each VIM account (in order to get details of all servers, all ports).
858
859         Raises:
860             MonitorVmsException
861         """
862 1         try:
863             # If there is not any Openstack type VIM account in DB or VM status updates are disabled by config,
864             # Openstack VMs will not be monitored.
865 1             if not self.db_vims or self.refresh_config.active == -1:
866 1                 return
867
868 1             ro_tasks_to_monitor = self.find_ro_tasks_to_monitor()
869 1             db_vims = [vim["_id"] for vim in self.db_vims]
870 1             vims_to_monitor = []
871
872 1             for ro_task in ro_tasks_to_monitor:
873 1                 _, _, target_vim = ro_task["target_id"].partition(":")
874 1                 if target_vim in db_vims:
875 1                     self.prepare_vims_to_monitor(vims_to_monitor, ro_task, target_vim)
876
877 1             for vim in vims_to_monitor:
878 1                 all_servers, all_ports = self.my_vims[vim.vim_id].get_monitoring_data()
879 1                 self.update_vnfrs(all_servers, all_ports, vim.vms)
880 1         except (
881             DbException,
882             MonitorDbException,
883             MonitorVimException,
884             MonitorVmsException,
885             ValueError,
886             KeyError,
887             TypeError,
888             AttributeError,
889             vimconn.VimConnException,
890         ) as e:
891 1             raise MonitorVmsException(
892                 f"Exception while monitoring Openstack VMs: {str(e)}"
893             )
894
895
896 1 def start_monitoring(config: dict):
897     global monitoring_task
898 1     if not (config and config.get("period")):
899 1         raise MonitorVmsException("Wrong configuration format is provided.")
900 1     instance = MonitorVms(config)
901 1     period = instance.refresh_config.active
902 1     instance.run()
903 1     monitoring_task = threading.Timer(period, start_monitoring, args=(config,))
904 1     monitoring_task.start()
905
906
907 1 def stop_monitoring():
908     global monitoring_task
909 1     if monitoring_task:
910 1         monitoring_task.cancel()