Code Coverage

Cobertura Coverage Report > NG-RO.osm_ng_ro >

monitor.py

Trend

File Coverage summary

NameClassesLinesConditionals
monitor.py
100%
1/1
99%
298/301
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
monitor.py
99%
298/301
N/A

Source

NG-RO/osm_ng_ro/monitor.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 1 from copy import deepcopy
18 1 from dataclasses import dataclass
19 1 import logging
20 1 from os import makedirs, path
21 1 from pprint import pformat
22 1 import random
23 1 import threading
24 1 from typing import Optional
25
26 1 from importlib_metadata import entry_points
27 1 from osm_common import dbmemory, dbmongo
28 1 from osm_common.dbbase import DbException
29 1 from osm_ng_ro.ns_thread import ConfigValidate
30 1 from osm_ro_plugin import vimconn
31 1 import yaml
32 1 from yaml.representer import RepresenterError
33
34
35 1 openStackvmStatusOk = [
36     "ACTIVE",
37     "PAUSED",
38     "SUSPENDED",
39     "SHUTOFF",
40     "BUILD",
41 ]
42
43 1 openStacknetStatusOk = [
44     "ACTIVE",
45     "PAUSED",
46     "BUILD",
47 ]
48
49 1 db_vim_collection = "vim_accounts"
50 1 vim_type = "openstack"
51 1 ro_task_collection = "ro_tasks"
52 1 plugin_name = "rovim_openstack"
53 1 monitoring_task = None
54
55
56 1 @dataclass
57 1 class VmToMonitor:
58 1     vm_id: str
59 1     target_record: str
60
61
62 1 @dataclass
63 1 class VimToMonitor:
64 1     vim_id: str
65 1     vms: list
66
67
68 1 class MonitorVmsException(Exception):
69 1     def __init__(self, message):
70 1         super(Exception, self).__init__(message)
71
72
73 1 class MonitorDbException(Exception):
74 1     def __init__(self, message):
75 1         super(Exception, self).__init__(message)
76
77
78 1 class MonitorVimException(Exception):
79 1     def __init__(self, message):
80 1         super(Exception, self).__init__(message)
81
82
83 1 class SafeDumper(yaml.SafeDumper):
84 1     def represent_data(self, data):
85 0         if isinstance(data, dict) and data.__class__ != dict:
86             # A solution to convert subclasses of dict to dicts which is not handled by pyyaml.
87 0             data = dict(data.items())
88 0         return super(SafeDumper, self).represent_data(data)
89
90
91 1 class MonitorVms:
92 1     def __init__(self, config: dict):
93 1         self.config = config
94 1         self.db = None
95 1         self.refresh_config = ConfigValidate(config)
96 1         self.my_vims = {}
97 1         self.plugins = {}
98 1         self.logger = logging.getLogger("ro.monitor")
99 1         self.connect_db()
100 1         self.db_vims = self.get_db_vims()
101 1         self.load_vims()
102
103 1     def load_vims(self) -> None:
104 1         for vim in self.db_vims:
105 1             if vim["_id"] not in self.my_vims:
106 1                 self._load_vim(vim["_id"])
107
108 1     def connect_db(self) -> None:
109         """Connect to the Database.
110
111         Raises:
112            MonitorDbException
113         """
114 1         try:
115 1             if not self.db:
116 1                 if self.config["database"]["driver"] == "mongo":
117 1                     self.db = dbmongo.DbMongo()
118 1                     self.db.db_connect(self.config["database"])
119 1                 elif self.config["database"]["driver"] == "memory":
120 1                     self.db = dbmemory.DbMemory()
121 1                     self.db.db_connect(self.config["database"])
122                 else:
123 1                     raise MonitorDbException(
124                         "Invalid configuration param '{}' at '[database]':'driver'".format(
125                             self.config["database"]["driver"]
126                         )
127                     )
128 1         except (DbException, MonitorDbException, ValueError) as e:
129 1             raise MonitorDbException(str(e))
130
131 1     def get_db_vims(self) -> list:
132         """Get all VIM accounts which types are Openstack."""
133 1         return self.db.get_list(db_vim_collection, {"vim_type": vim_type})
134
135 1     def find_ro_tasks_to_monitor(self) -> list:
136         """Get the ro_tasks which belongs to vdu and status DONE."""
137 1         return self.db.get_list(
138             ro_task_collection,
139             q_filter={
140                 "tasks.status": ["DONE"],
141                 "tasks.item": ["vdu"],
142             },
143         )
144
145 1     @staticmethod
146 1     def _initialize_target_vim(vim_module_conn, vim: dict) -> object:
147         """Create the VIM connector object with given vim details.
148
149         Args:
150             vim_module_conn (class):    VIM connector class
151             vim (dict):                 VIM details to initialize VIM connecter object
152
153         Returns:
154             VIM connector   (object):   VIM connector object
155         """
156 1         return vim_module_conn(
157             uuid=vim["_id"],
158             name=vim["name"],
159             tenant_id=vim.get("vim_tenant_id"),
160             tenant_name=vim.get("vim_tenant_name"),
161             url=vim["vim_url"],
162             url_admin=None,
163             user=vim["vim_user"],
164             passwd=vim["vim_password"],
165             config=vim.get("config") or {},
166             persistent_info={},
167         )
168
169 1     def _load_vim(self, target_id) -> None:
170         """Load or reload a vim_account.
171         Read content from database, load the plugin if not loaded, then it fills my_vims dictionary.
172
173         Args:
174             target_id   (str):      ID of vim account
175
176         Raises:
177             MonitorVimException
178         """
179 1         try:
180 1             vim = self.db.get_one(db_vim_collection, {"_id": target_id})
181 1             schema_version = vim.get("schema_version")
182 1             self.db.encrypt_decrypt_fields(
183                 vim,
184                 "decrypt",
185                 fields=("password", "secret"),
186                 schema_version=schema_version,
187                 salt=target_id,
188             )
189 1             self._process_vim_config(target_id, vim)
190 1             vim_module_conn = self._load_plugin(plugin_name)
191 1             self.my_vims[target_id] = self._initialize_target_vim(vim_module_conn, vim)
192 1             self.logger.debug(
193                 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
194             )
195 1         except (
196             DbException,
197             IOError,
198             AttributeError,
199             MonitorDbException,
200             MonitorVimException,
201             TypeError,
202         ) as e:
203 1             raise MonitorVimException(
204                 "Cannot load {} plugin={}: {}".format(target_id, plugin_name, str(e))
205             )
206
207 1     @staticmethod
208 1     def _process_vim_config(target_id: str, db_vim: dict) -> None:
209         """
210         Process vim config, creating vim configuration files as ca_cert
211         Args:
212             target_id   (str): vim id
213             db_vim      (dict): Vim dictionary obtained from database
214
215         Raises:
216             MonitorVimException
217         """
218 1         if not db_vim.get("config"):
219 1             return
220 1         file_name = ""
221 1         work_dir = "/app/osm_ro/certs"
222 1         try:
223 1             if db_vim["config"].get("ca_cert_content"):
224 1                 file_name = f"{work_dir}/{target_id}:{random.randint(0, 99999)}"
225
226 1                 if not path.isdir(file_name):
227 1                     makedirs(file_name)
228
229 1                 file_name = file_name + "/ca_cert"
230
231 1                 with open(file_name, "w") as f:
232 1                     f.write(db_vim["config"]["ca_cert_content"])
233 1                     del db_vim["config"]["ca_cert_content"]
234 1                     db_vim["config"]["ca_cert"] = file_name
235
236 1         except (FileNotFoundError, IOError, OSError) as e:
237 1             raise MonitorVimException(
238                 "Error writing to file '{}': {}".format(file_name, e)
239             )
240
241 1     def _load_plugin(self, name: str = "rovim_openstack", type: str = "vim"):
242         """Finds the proper VIM connector and returns VIM connector class name.
243         Args:
244             name  (str):          rovim_openstack
245             type  (str):          vim
246
247         Returns:
248             VIM connector class name    (class)
249
250         Raises:
251             MonitorVimException
252         """
253 1         try:
254 1             if name in self.plugins:
255 1                 return self.plugins[name]
256
257 1             for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
258 1                 self.plugins[name] = ep.load()
259 1                 return self.plugins[name]
260
261 1         except Exception as e:
262 1             raise MonitorVimException("Cannot load plugin osm_{}: {}".format(name, e))
263
264 1     @staticmethod
265 1     def create_vm_to_monitor(ro_task: dict) -> Optional[object]:
266         """Create VM using dataclass with ro task details.
267
268         Args:
269             ro_task (dict):             Details of ro_task
270
271         Returns:
272             VmToMonitor (object)
273         """
274 1         if not ro_task:
275 1             return
276 1         return VmToMonitor(
277             ro_task["vim_info"]["vim_id"], ro_task["tasks"][0]["target_record"]
278         )
279
280 1     @staticmethod
281 1     def add_vm_to_existing_vim(
282         vims_to_monitor: list, ro_task: dict, target_vim: str
283     ) -> bool:
284         """Add VmToMonitor to existing VIM list.
285
286         Args:
287             vims_to_monitor (list):     List of VIMs to monitor
288             ro_task (dict):             ro_task details
289             target_vim  (str):          ID of target VIM
290
291         Returns:
292             Boolean         If VM is added to VIM list, it returns True else False.
293         """
294 1         for vim in vims_to_monitor:
295 1             if target_vim == vim.vim_id:
296 1                 vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task)
297 1                 vim.vms.append(vm_to_monitor)
298 1                 return True
299 1         return False
300
301 1     @staticmethod
302 1     def add_new_vim_for_monitoring(
303         vims_to_monitor: list, ro_task: dict, target_vim: str
304     ) -> None:
305         """Create a new VIM object and add to vims_to_monitor list.
306
307         Args:
308             vims_to_monitor (list):     List of VIMs to monitor
309             ro_task (dict):             ro_task details
310             target_vim  (str):          ID of target VIM
311         """
312 1         vim_to_monitor = VimToMonitor(target_vim, [])
313 1         vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task)
314 1         vim_to_monitor.vms.append(vm_to_monitor)
315 1         vims_to_monitor.append(vim_to_monitor)
316
317 1     @staticmethod
318 1     def prepare_vims_to_monitor(
319         vims_to_monitor: list, ro_task: dict, target_vim: str
320     ) -> None:
321         """If the required VIM exists in the vims_to_monitor list, add VM under related VIM,
322         otherwise create a new VIM object and add VM to this new created VIM.
323
324         Args:
325             vims_to_monitor (list):     List of VIMs to monitor
326             ro_task (dict):             ro_task details
327             target_vim  (str):          ID of target VIM
328         """
329 1         if not MonitorVms.add_vm_to_existing_vim(vims_to_monitor, ro_task, target_vim):
330 1             MonitorVms.add_new_vim_for_monitoring(vims_to_monitor, ro_task, target_vim)
331
332 1     def _get_db_paths(self, target_record: str) -> tuple:
333         """Get the database paths and info of target VDU and VIM.
334
335         Args:
336             target_record   (str):      A string which includes vnfr_id, vdur_id, vim_id
337
338         Returns:
339             (vim_info_path: str, vim_id: str, vnfr_id: str, vdur_path:str, vdur_index: int, db_vnfr: dict)  tuple
340
341         Raises:
342             MonitorVmsException
343         """
344 1         try:
345 1             [_, vnfr_id, vdur_info, vim_id] = target_record.split(":")
346 1             vim_info_path = vdur_info + ":" + vim_id
347 1             vdur_path = vim_info_path.split(".vim_info.")[0]
348 1             vdur_index = int(vdur_path.split(".")[1])
349 1             db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}, fail_on_empty=False)
350 1             return vim_info_path, vim_id, vnfr_id, vdur_path, vdur_index, db_vnfr
351 1         except (DbException, ValueError) as e:
352 1             raise MonitorVmsException(str(e))
353
354 1     @staticmethod
355 1     def _check_if_vdur_vim_info_exists(
356         db_vnfr: dict, vdur_index: int
357     ) -> Optional[bool]:
358         """Check if VNF record and vdur vim_info record exists.
359
360         Args:
361             db_vnfr (dict):             VNF record
362             vdur_index  (int):          index of vdur under db_vnfr["vdur"]
363
364         Returns:
365             Boolean                     True if VNF record and vdur vim_info record exists.
366         """
367 1         try:
368 1             if db_vnfr and db_vnfr.get("vdur") and isinstance(vdur_index, int):
369 1                 if db_vnfr["vdur"][vdur_index] and db_vnfr["vdur"][vdur_index].get(
370                     "vim_info"
371                 ):
372 1                     return True
373 1         except IndexError:
374 1             return
375
376 1     def _get_vm_data_from_db(self, vm_to_monitor: object) -> Optional[tuple]:
377         """Get the required DB path and VIM info data from database.
378
379         Args:
380             vm_to_monitor   (object):    Includes vm_id and target record in DB.
381
382         Returns:
383             (vdur_path: str, vdur_vim_info_update: dict, db_vnfr: dict, existing_vim_info: dict, vnfr_id,vim_info_path: str)    (Tuple):
384             Required VM info if _check_if_vdur_vim_info_exists else None
385         """
386 1         (
387             vim_info_path,
388             vim_id,
389             vnfr_id,
390             vdur_path,
391             vdur_index,
392             db_vnfr,
393         ) = self._get_db_paths(vm_to_monitor.target_record)
394 1         if not self._check_if_vdur_vim_info_exists(db_vnfr, vdur_index):
395 1             return
396
397 1         existing_vim_info = db_vnfr["vdur"][vdur_index]["vim_info"].get("vim:" + vim_id)
398 1         if not existing_vim_info:
399 1             return
400
401 1         vdur_vim_info_update = deepcopy(existing_vim_info)
402 1         return (
403             vdur_path,
404             vdur_vim_info_update,
405             db_vnfr,
406             existing_vim_info,
407             vnfr_id,
408             vim_info_path,
409         )
410
411 1     @staticmethod
412 1     def update_vim_info_for_deleted_vm(vdur_vim_info_update: dict) -> None:
413         """Updates the vdur_vim_info_update to report that VM is deleted.
414
415         Args:
416              vdur_vim_info_update    (dict):     Dictionary to be updated and used to update VDUR later.
417         """
418 1         vdur_vim_info_update.update(
419             {
420                 "vim_status": "DELETED",
421                 "vim_message": "Deleted externally",
422                 "vim_id": None,
423                 "vim_name": None,
424                 "interfaces": None,
425             }
426         )
427
428 1     def report_deleted_vdur(self, vm_to_monitor: object) -> None:
429         """VM does not exist in the Openstack Cloud so update the VNFR to report VM deletion.
430
431         Args:
432             vm_to_monitor   (object):        VM needs to be reported as deleted.
433         """
434 1         vm_data = self._get_vm_data_from_db(vm_to_monitor)
435 1         if not vm_data:
436 1             return
437 1         (
438             vdur_path,
439             vdur_vim_info_update,
440             _,
441             existing_vim_info,
442             vnfr_id,
443             vim_info_path,
444         ) = vm_data
445 1         self.update_vim_info_for_deleted_vm(vdur_vim_info_update)
446 1         vdur_update = {
447             vdur_path + ".status": "DELETED",
448         }
449
450 1         if existing_vim_info != vdur_vim_info_update:
451             # VNFR record is updated one time upon VM deletion.
452 1             self.logger.info(f"Reporting deletion of VM: {vm_to_monitor.vm_id}")
453 1             self.backup_vdu_interfaces(vdur_vim_info_update)
454 1             all_updates = [vdur_update, {vim_info_path: vdur_vim_info_update}]
455 1             self.update_in_database(all_updates, vnfr_id)
456 1             self.logger.info(f"Updated vnfr for vm_id: {vm_to_monitor.vm_id}.")
457
458 1     def update_vnfrs(self, servers: list, ports: dict, vms_to_monitor: list) -> None:
459         """Update the VDURs according to the latest information provided by servers list.
460
461         Args:
462             servers    (list):          List of existing VMs comes from single Openstack VIM account
463             ports     (dict):           List of all ports comes from single Openstack VIM account
464             vms_to_monitor  (list):     List of VMs to be monitored and updated.
465         """
466 1         for vm_to_monitor in vms_to_monitor:
467 1             server = next(
468                 filter(lambda server: server.id == vm_to_monitor.vm_id, servers), None
469             )
470 1             if server:
471 1                 self.report_vdur_updates(server, vm_to_monitor, ports)
472             else:
473 1                 self.report_deleted_vdur(vm_to_monitor)
474
475 1     def serialize(self, value: dict) -> Optional[str]:
476         """Serialization of python basic types.
477         In the case value is not serializable a message will be logged.
478
479         Args:
480             value   (dict/str):     Data to serialize
481
482         Returns:
483             serialized_value    (str, yaml)
484         """
485 1         if isinstance(value, str):
486 1             return value
487 1         try:
488 1             return yaml.dump(
489                 value, Dumper=SafeDumper, default_flow_style=True, width=256
490             )
491 1         except RepresenterError:
492 1             self.logger.info(
493                 "The following entity cannot be serialized in YAML:\n\n%s\n\n",
494                 pformat(value),
495                 exc_info=True,
496             )
497 1             return str(value)
498
499 1     def _get_server_info(self, server: object) -> str:
500         """Get the server info, extract some fields and returns info as string.
501
502         Args:
503             server  (object):        VM info object
504
505         Returns:
506             server_info (string)
507         """
508 1         server_info = server.to_dict()
509 1         server_info.pop("OS-EXT-SRV-ATTR:user_data", None)
510 1         server_info.pop("user_data", None)
511 1         return self.serialize(server_info)
512
513 1     def check_vm_status_updates(
514         self,
515         vdur_vim_info_update: dict,
516         vdur_update: dict,
517         server: object,
518         vdur_path: str,
519     ) -> None:
520         """Fills up dictionaries to update VDUR according to server.status.
521
522         Args:
523             vdur_vim_info_update    (dict):         Dictionary which keeps the differences of vdur_vim_info
524             vdur_update             (dict):         Dictionary which keeps the differences of vdur
525             server                  (server):       VM info
526             vdur_path               (str):          Path of VDUR in DB
527         """
528 1         if server.status in openStackvmStatusOk:
529 1             vdur_vim_info_update["vim_status"] = vdur_update[
530                 vdur_path + ".status"
531             ] = server.status
532
533         else:
534 1             vdur_vim_info_update["vim_status"] = vdur_update[
535                 vdur_path + ".status"
536             ] = server.status
537 1             vdur_vim_info_update["vim_message"] = "VIM status reported " + server.status
538
539 1         vdur_vim_info_update["vim_details"] = self._get_server_info(server)
540 1         vdur_vim_info_update["vim_id"] = server.id
541 1         vdur_vim_info_update["vim_name"] = vdur_update[
542             vdur_path + ".name"
543         ] = server.name
544
545 1     @staticmethod
546 1     def get_interface_info(
547         ports: dict, interface: dict, server: object
548     ) -> Optional[dict]:
549         """Get the updated port info regarding with existing interface of server.
550
551         Args:
552             ports       (dict):             List of all ports belong to single VIM account
553             interface   (dict):             Existing interface info which is taken from DB
554             server  (object):               Server info
555
556         Returns:
557             port    (dict):                 The updated port info related to existing interface of server
558         """
559 1         return next(
560             filter(
561                 lambda port: port.get("id") == interface.get("vim_interface_id")
562                 and port.get("device_id") == server.id,
563                 ports["ports"],
564             ),
565             None,
566         )
567
568 1     @staticmethod
569 1     def check_vlan_pci_updates(
570         interface_info: dict, index: int, vdur_vim_info_update: dict
571     ) -> None:
572         """If interface has pci and vlan, update vdur_vim_info dictionary with the refreshed data.
573
574         Args:
575             interface_info          (dict):     Refreshed interface info
576             index                   (int):      Index of interface in VDUR
577             vdur_vim_info_update    (dict):     Dictionary to be updated and used to update VDUR later.
578         """
579 1         if interface_info.get("binding:profile") and interface_info[
580             "binding:profile"
581         ].get("pci_slot"):
582 1             pci = interface_info["binding:profile"]["pci_slot"]
583 1             vdur_vim_info_update["interfaces"][index]["pci"] = pci
584
585 1         if interface_info.get("binding:vif_details"):
586 1             vdur_vim_info_update["interfaces"][index]["vlan"] = interface_info[
587                 "binding:vif_details"
588             ].get("vlan")
589
590 1     @staticmethod
591 1     def check_vdur_interface_updates(
592         vdur_update: dict,
593         vdur_path: str,
594         index: int,
595         interface_info: dict,
596         old_interface: dict,
597         vnfr_update: dict,
598         vnfr_id: str,
599     ) -> None:
600         """Updates the vdur_update dictionary which stores differences between the latest interface data and data in DB.
601
602         Args:
603             vdur_update     (dict):         Dictionary used to store vdur updates
604             vdur_path       (str):          VDUR record path in DB
605             index           (int):          Index of interface in VDUR
606             interface_info  (dict):         Refreshed interface info
607             old_interface       (dict):     The previous interface info comes from DB
608             vnfr_update     (dict):         VDUR record path in DB
609             vnfr_id         (str):          VNFR ID
610         """
611 1         current_ip_address = MonitorVms._get_current_ip_address(interface_info)
612 1         if current_ip_address:
613 1             vdur_update[
614                 vdur_path + ".interfaces." + str(index) + ".ip-address"
615             ] = current_ip_address
616
617 1             if old_interface.get("mgmt_vdu_interface"):
618 1                 vdur_update[vdur_path + ".ip-address"] = current_ip_address
619
620 1             if old_interface.get("mgmt_vnf_interface"):
621 1                 vnfr_update[vnfr_id + ".ip-address"] = current_ip_address
622
623 1         vdur_update[
624             vdur_path + ".interfaces." + str(index) + ".mac-address"
625         ] = interface_info.get("mac_address")
626
627 1     @staticmethod
628 1     def _get_current_ip_address(interface_info: dict) -> Optional[str]:
629 1         if interface_info.get("fixed_ips") and interface_info["fixed_ips"][0]:
630 1             return interface_info["fixed_ips"][0].get("ip_address")
631
632 1     @staticmethod
633 1     def backup_vdu_interfaces(vdur_vim_info_update: dict) -> None:
634         """Backup VDU interfaces as interfaces_backup.
635
636         Args:
637             vdur_vim_info_update    (dict):   Dictionary used to store vdur_vim_info updates
638         """
639 1         if vdur_vim_info_update.get("interfaces") and not vdur_vim_info_update.get(
640             "vim_message"
641         ):
642 1             vdur_vim_info_update["interfaces_backup"] = vdur_vim_info_update[
643                 "interfaces"
644             ]
645
646 1     def update_vdur_vim_info_interfaces(
647         self,
648         vdur_vim_info_update: dict,
649         index: int,
650         interface_info: dict,
651         server: object,
652     ) -> None:
653         """Update the vdur_vim_info dictionary with the latest interface info.
654
655         Args:
656             vdur_vim_info_update    (dict):     The dictionary which is used to store vdur_vim_info updates
657             index       (int):                  Interface index
658             interface_info  (dict):             The latest interface info
659             server  (object):                The latest VM info
660         """
661 1         if not (
662             vdur_vim_info_update.get("interfaces")
663             and vdur_vim_info_update["interfaces"][index]
664         ):
665 1             raise MonitorVmsException("Existing interfaces info could not found.")
666
667 1         vdur_vim_info_update["interfaces"][index].update(
668             {
669                 "mac_address": interface_info["mac_address"],
670                 "ip_address": interface_info["fixed_ips"][0].get("ip_address")
671                 if interface_info.get("fixed_ips")
672                 else None,
673                 "vim_net_id": interface_info["network_id"],
674                 "vim_info": self.serialize(interface_info),
675                 "compute_node": server.to_dict()["OS-EXT-SRV-ATTR:host"]
676                 if server.to_dict().get("OS-EXT-SRV-ATTR:host")
677                 else None,
678             }
679         )
680
681 1     def prepare_interface_updates(
682         self,
683         vdur_vim_info_update: dict,
684         index: int,
685         interface_info: dict,
686         server: object,
687         vdur_path: str,
688         vnfr_update: dict,
689         old_interface: dict,
690         vdur_update: dict,
691         vnfr_id: str,
692     ) -> None:
693         """Updates network related info in vdur_vim_info and vdur by using the latest interface info.
694
695         Args:
696             vdur_vim_info_update    (dict):     Dictionary used to store vdur_vim_info updates
697             index       (int):                  Interface index
698             interface_info  (dict):             The latest interface info
699             server  (object):                   The latest VM info
700             vdur_path       (str):              VDUR record path in DB
701             vnfr_update     (dict):             VDUR record path in DB
702             old_interface  (dict):              The previous interface info comes from DB
703             vdur_update     (dict):             Dictionary used to store vdur updates
704             vnfr_id         (str):              VNFR ID
705         """
706 1         self.update_vdur_vim_info_interfaces(
707             vdur_vim_info_update, index, interface_info, server
708         )
709 1         self.check_vlan_pci_updates(interface_info, index, vdur_vim_info_update)
710 1         self.check_vdur_interface_updates(
711             vdur_update,
712             vdur_path,
713             index,
714             interface_info,
715             old_interface,
716             vnfr_update,
717             vnfr_id,
718         )
719
720 1     def check_vm_interface_updates(
721         self,
722         server: object,
723         existing_vim_info: dict,
724         ports: dict,
725         vdur_vim_info_update: dict,
726         vdur_update: dict,
727         vdur_path: str,
728         vnfr_update: dict,
729         vnfr_id: str,
730     ) -> None:
731         """Gets the refreshed interfaces info of server and updates the VDUR if interfaces exist,
732         otherwise reports that interfaces are deleted.
733
734         Args:
735             server  (object):                   The latest VM info
736             existing_vim_info   (dict):         VM info details comes from DB
737             ports       (dict):                 All ports info belongs to single VIM account
738             vdur_vim_info_update    (dict):     Dictionary used to store vdur_vim_info updates
739             vdur_update     (dict):             Dictionary used to store vdur updates
740             vdur_path       (str):              VDUR record path in DB
741             vnfr_update     (dict):             VDUR record path in DB
742             vnfr_id         (str):              VNFR ID
743         """
744 1         for index, old_interface in enumerate(existing_vim_info["interfaces"]):
745 1             interface_info = self.get_interface_info(ports, old_interface, server)
746 1             if not interface_info:
747 1                 vdur_vim_info_update[
748                     "vim_message"
749                 ] = f"Interface {old_interface['vim_interface_id']} deleted externally."
750
751             else:
752 1                 if interface_info.get("status") in openStacknetStatusOk:
753 1                     self.prepare_interface_updates(
754                         vdur_vim_info_update,
755                         index,
756                         interface_info,
757                         server,
758                         vdur_path,
759                         vnfr_update,
760                         old_interface,
761                         vdur_update,
762                         vnfr_id,
763                     )
764
765                 else:
766 1                     vdur_vim_info_update["vim_message"] = (
767                         f"Interface {old_interface['vim_interface_id']} status: "
768                         + interface_info.get("status")
769                     )
770
771 1     def update_in_database(self, all_updates: list, vnfr_id: str) -> None:
772         """Update differences in VNFR.
773
774         Args:
775             all_updates     (list):     List of dictionaries which includes differences
776             vnfr_id         (str):      VNF record ID
777
778         Raises:
779             MonitorDbException
780         """
781 1         try:
782 1             for updated_dict in all_updates:
783 1                 if updated_dict:
784 1                     self.db.set_list(
785                         "vnfrs",
786                         update_dict=updated_dict,
787                         q_filter={"_id": vnfr_id},
788                     )
789 1         except DbException as e:
790 1             raise MonitorDbException(
791                 f"Error while updating differences in VNFR {str(e)}"
792             )
793
794 1     def report_vdur_updates(
795         self, server: object, vm_to_monitor: object, ports: dict
796     ) -> None:
797         """Report VDU updates by changing the VDUR records in DB.
798
799         Args:
800             server      (object):               Refreshed VM info
801             vm_to_monitor   (object):           VM to be monitored
802             ports       (dict):                 Ports dict includes all ports details regarding with single VIM account
803         """
804 1         vm_data = self._get_vm_data_from_db(vm_to_monitor)
805 1         if not vm_data:
806 1             return
807 1         (
808             vdur_path,
809             vdur_vim_info_update,
810             _,
811             existing_vim_info,
812             vnfr_id,
813             vim_info_path,
814         ) = vm_data
815 1         vdur_update, vnfr_update = {}, {}
816
817 1         self.check_vm_status_updates(
818             vdur_vim_info_update, vdur_update, server, vdur_path
819         )
820
821 1         self.check_vm_interface_updates(
822             server,
823             existing_vim_info,
824             ports,
825             vdur_vim_info_update,
826             vdur_update,
827             vdur_path,
828             vnfr_update,
829             vnfr_id,
830         )
831         # Update vnfr in MongoDB if there are differences
832 1         if existing_vim_info != vdur_vim_info_update:
833 1             self.logger.info(f"Reporting status updates of VM: {vm_to_monitor.vm_id}.")
834 1             self.backup_vdu_interfaces(vdur_vim_info_update)
835 1             all_updates = [
836                 vdur_update,
837                 {vim_info_path: vdur_vim_info_update},
838                 vnfr_update,
839             ]
840 1             self.update_in_database(all_updates, vnfr_id)
841 1             self.logger.info(f"Updated vnfr for vm_id: {server.id}.")
842
843 1     def run(self) -> None:
844         """Perfoms the periodic updates of Openstack VMs by sending only two requests to Openstack APIs
845         for each VIM account (in order to get details of all servers, all ports).
846
847         Raises:
848             MonitorVmsException
849         """
850 1         try:
851             # If there is not any Openstack type VIM account in DB or VM status updates are disabled by config,
852             # Openstack VMs will not be monitored.
853 1             if not self.db_vims or self.refresh_config.active == -1:
854 1                 return
855
856 1             ro_tasks_to_monitor = self.find_ro_tasks_to_monitor()
857 1             db_vims = [vim["_id"] for vim in self.db_vims]
858 1             vims_to_monitor = []
859
860 1             for ro_task in ro_tasks_to_monitor:
861 1                 _, _, target_vim = ro_task["target_id"].partition(":")
862 1                 if target_vim in db_vims:
863 1                     self.prepare_vims_to_monitor(vims_to_monitor, ro_task, target_vim)
864
865 1             for vim in vims_to_monitor:
866 1                 all_servers, all_ports = self.my_vims[vim.vim_id].get_monitoring_data()
867 1                 self.update_vnfrs(all_servers, all_ports, vim.vms)
868 1         except (
869             DbException,
870             MonitorDbException,
871             MonitorVimException,
872             MonitorVmsException,
873             ValueError,
874             KeyError,
875             TypeError,
876             AttributeError,
877             vimconn.VimConnException,
878         ) as e:
879 1             raise MonitorVmsException(
880                 f"Exception while monitoring Openstack VMs: {str(e)}"
881             )
882
883
884 1 def start_monitoring(config: dict):
885     global monitoring_task
886 1     if not (config and config.get("period")):
887 1         raise MonitorVmsException("Wrong configuration format is provided.")
888 1     instance = MonitorVms(config)
889 1     period = instance.refresh_config.active
890 1     instance.run()
891 1     monitoring_task = threading.Timer(period, start_monitoring, args=(config,))
892 1     monitoring_task.start()
893
894
895 1 def stop_monitoring():
896     global monitoring_task
897 1     if monitoring_task:
898 1         monitoring_task.cancel()