1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy
import deepcopy
18 from dataclasses
import dataclass
20 from os
import makedirs
, path
21 from pprint
import pformat
24 from typing
import Optional
26 from importlib_metadata
import entry_points
27 from osm_common
import dbmemory
, dbmongo
28 from osm_common
.dbbase
import DbException
29 from osm_ng_ro
.ns_thread
import ConfigValidate
30 from osm_ro_plugin
import vimconn
32 from yaml
.representer
import RepresenterError
35 openStackvmStatusOk
= [
43 openStacknetStatusOk
= [
49 db_vim_collection
= "vim_accounts"
50 vim_type
= "openstack"
51 ro_task_collection
= "ro_tasks"
52 plugin_name
= "rovim_openstack"
53 monitoring_task
= None
68 class MonitorVmsException(Exception):
69 def __init__(self
, message
):
70 super(Exception, self
).__init
__(message
)
73 class MonitorDbException(Exception):
74 def __init__(self
, message
):
75 super(Exception, self
).__init
__(message
)
78 class MonitorVimException(Exception):
79 def __init__(self
, message
):
80 super(Exception, self
).__init
__(message
)
83 class SafeDumper(yaml
.SafeDumper
):
84 def represent_data(self
, data
):
85 if isinstance(data
, dict) and data
.__class
__ != dict:
86 # A solution to convert subclasses of dict to dicts which is not handled by pyyaml.
87 data
= dict(data
.items())
88 return super(SafeDumper
, self
).represent_data(data
)
92 def __init__(self
, config
: dict):
95 self
.refresh_config
= ConfigValidate(config
)
98 self
.logger
= logging
.getLogger("ro.monitor")
100 self
.db_vims
= self
.get_db_vims()
103 def load_vims(self
) -> None:
104 for vim
in self
.db_vims
:
105 if vim
["_id"] not in self
.my_vims
:
106 self
._load
_vim
(vim
["_id"])
108 def connect_db(self
) -> None:
109 """Connect to the Database.
116 if self
.config
["database"]["driver"] == "mongo":
117 self
.db
= dbmongo
.DbMongo()
118 self
.db
.db_connect(self
.config
["database"])
119 elif self
.config
["database"]["driver"] == "memory":
120 self
.db
= dbmemory
.DbMemory()
121 self
.db
.db_connect(self
.config
["database"])
123 raise MonitorDbException(
124 "Invalid configuration param '{}' at '[database]':'driver'".format(
125 self
.config
["database"]["driver"]
128 except (DbException
, MonitorDbException
, ValueError) as e
:
129 raise MonitorDbException(str(e
))
131 def get_db_vims(self
) -> list:
132 """Get all VIM accounts which types are Openstack."""
133 return self
.db
.get_list(db_vim_collection
, {"vim_type": vim_type
})
135 def find_ro_tasks_to_monitor(self
) -> list:
136 """Get the ro_tasks which belongs to vdu and status DONE."""
137 return self
.db
.get_list(
140 "tasks.status": ["DONE"],
141 "tasks.item": ["vdu"],
146 def _initialize_target_vim(vim_module_conn
, vim
: dict) -> object:
147 """Create the VIM connector object with given vim details.
150 vim_module_conn (class): VIM connector class
151 vim (dict): VIM details to initialize VIM connecter object
154 VIM connector (object): VIM connector object
156 return vim_module_conn(
159 tenant_id
=vim
.get("vim_tenant_id"),
160 tenant_name
=vim
.get("vim_tenant_name"),
163 user
=vim
["vim_user"],
164 passwd
=vim
["vim_password"],
165 config
=vim
.get("config") or {},
169 def _load_vim(self
, target_id
) -> None:
170 """Load or reload a vim_account.
171 Read content from database, load the plugin if not loaded, then it fills my_vims dictionary.
174 target_id (str): ID of vim account
180 vim
= self
.db
.get_one(db_vim_collection
, {"_id": target_id
})
181 schema_version
= vim
.get("schema_version")
182 self
.db
.encrypt_decrypt_fields(
185 fields
=("password", "secret"),
186 schema_version
=schema_version
,
189 self
._process
_vim
_config
(target_id
, vim
)
190 vim_module_conn
= self
._load
_plugin
(plugin_name
)
191 self
.my_vims
[target_id
] = self
._initialize
_target
_vim
(vim_module_conn
, vim
)
193 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
203 raise MonitorVimException(
204 "Cannot load {} plugin={}: {}".format(target_id
, plugin_name
, str(e
))
208 def _process_vim_config(target_id
: str, db_vim
: dict) -> None:
210 Process vim config, creating vim configuration files as ca_cert
212 target_id (str): vim id
213 db_vim (dict): Vim dictionary obtained from database
218 if not db_vim
.get("config"):
221 work_dir
= "/app/osm_ro/certs"
223 if db_vim
["config"].get("ca_cert_content"):
224 file_name
= f
"{work_dir}/{target_id}:{random.randint(0, 99999)}"
226 if not path
.isdir(file_name
):
229 file_name
= file_name
+ "/ca_cert"
231 with
open(file_name
, "w") as f
:
232 f
.write(db_vim
["config"]["ca_cert_content"])
233 del db_vim
["config"]["ca_cert_content"]
234 db_vim
["config"]["ca_cert"] = file_name
236 except (FileNotFoundError
, IOError, OSError) as e
:
237 raise MonitorVimException(
238 "Error writing to file '{}': {}".format(file_name
, e
)
241 def _load_plugin(self
, name
: str = "rovim_openstack", type: str = "vim"):
242 """Finds the proper VIM connector and returns VIM connector class name.
244 name (str): rovim_openstack
248 VIM connector class name (class)
254 if name
in self
.plugins
:
255 return self
.plugins
[name
]
257 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
258 self
.plugins
[name
] = ep
.load()
259 return self
.plugins
[name
]
261 except Exception as e
:
262 raise MonitorVimException("Cannot load plugin osm_{}: {}".format(name
, e
))
265 def create_vm_to_monitor(ro_task
: dict) -> Optional
[object]:
266 """Create VM using dataclass with ro task details.
269 ro_task (dict): Details of ro_task
277 ro_task
["vim_info"]["vim_id"], ro_task
["tasks"][0]["target_record"]
281 def add_vm_to_existing_vim(
282 vims_to_monitor
: list, ro_task
: dict, target_vim
: str
284 """Add VmToMonitor to existing VIM list.
287 vims_to_monitor (list): List of VIMs to monitor
288 ro_task (dict): ro_task details
289 target_vim (str): ID of target VIM
292 Boolean If VM is added to VIM list, it returns True else False.
294 for vim
in vims_to_monitor
:
295 if target_vim
== vim
.vim_id
:
296 vm_to_monitor
= MonitorVms
.create_vm_to_monitor(ro_task
)
297 vim
.vms
.append(vm_to_monitor
)
302 def add_new_vim_for_monitoring(
303 vims_to_monitor
: list, ro_task
: dict, target_vim
: str
305 """Create a new VIM object and add to vims_to_monitor list.
308 vims_to_monitor (list): List of VIMs to monitor
309 ro_task (dict): ro_task details
310 target_vim (str): ID of target VIM
312 vim_to_monitor
= VimToMonitor(target_vim
, [])
313 vm_to_monitor
= MonitorVms
.create_vm_to_monitor(ro_task
)
314 vim_to_monitor
.vms
.append(vm_to_monitor
)
315 vims_to_monitor
.append(vim_to_monitor
)
318 def prepare_vims_to_monitor(
319 vims_to_monitor
: list, ro_task
: dict, target_vim
: str
321 """If the required VIM exists in the vims_to_monitor list, add VM under related VIM,
322 otherwise create a new VIM object and add VM to this new created VIM.
325 vims_to_monitor (list): List of VIMs to monitor
326 ro_task (dict): ro_task details
327 target_vim (str): ID of target VIM
329 if not MonitorVms
.add_vm_to_existing_vim(vims_to_monitor
, ro_task
, target_vim
):
330 MonitorVms
.add_new_vim_for_monitoring(vims_to_monitor
, ro_task
, target_vim
)
332 def _get_db_paths(self
, target_record
: str) -> tuple:
333 """Get the database paths and info of target VDU and VIM.
336 target_record (str): A string which includes vnfr_id, vdur_id, vim_id
339 (vim_info_path: str, vim_id: str, vnfr_id: str, vdur_path:str, vdur_index: int, db_vnfr: dict) tuple
345 [_
, vnfr_id
, vdur_info
, vim_id
] = target_record
.split(":")
346 vim_info_path
= vdur_info
+ ":" + vim_id
347 vdur_path
= vim_info_path
.split(".vim_info.")[0]
348 vdur_index
= int(vdur_path
.split(".")[1])
349 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
}, fail_on_empty
=False)
350 return vim_info_path
, vim_id
, vnfr_id
, vdur_path
, vdur_index
, db_vnfr
351 except (DbException
, ValueError) as e
:
352 raise MonitorVmsException(str(e
))
355 def _check_if_vdur_vim_info_exists(
356 db_vnfr
: dict, vdur_index
: int
358 """Check if VNF record and vdur vim_info record exists.
361 db_vnfr (dict): VNF record
362 vdur_index (int): index of vdur under db_vnfr["vdur"]
365 Boolean True if VNF record and vdur vim_info record exists.
368 if db_vnfr
and db_vnfr
.get("vdur") and isinstance(vdur_index
, int):
369 if db_vnfr
["vdur"][vdur_index
] and db_vnfr
["vdur"][vdur_index
].get(
376 def _get_vm_data_from_db(self
, vm_to_monitor
: object) -> Optional
[tuple]:
377 """Get the required DB path and VIM info data from database.
380 vm_to_monitor (object): Includes vm_id and target record in DB.
383 (vdur_path: str, vdur_vim_info_update: dict, db_vnfr: dict, existing_vim_info: dict, vnfr_id,vim_info_path: str) (Tuple):
384 Required VM info if _check_if_vdur_vim_info_exists else None
393 ) = self
._get
_db
_paths
(vm_to_monitor
.target_record
)
394 if not self
._check
_if
_vdur
_vim
_info
_exists
(db_vnfr
, vdur_index
):
397 existing_vim_info
= db_vnfr
["vdur"][vdur_index
]["vim_info"].get("vim:" + vim_id
)
398 if not existing_vim_info
:
401 vdur_vim_info_update
= deepcopy(existing_vim_info
)
404 vdur_vim_info_update
,
412 def update_vim_info_for_deleted_vm(vdur_vim_info_update
: dict) -> None:
413 """Updates the vdur_vim_info_update to report that VM is deleted.
416 vdur_vim_info_update (dict): Dictionary to be updated and used to update VDUR later.
418 vdur_vim_info_update
.update(
420 "vim_status": "DELETED",
421 "vim_message": "Deleted externally",
429 def report_deleted_vdur(self
, vm_to_monitor
: object) -> None:
430 """VM does not exist in the Openstack Cloud so update the VNFR to report VM deletion.
433 vm_to_monitor (object): VM needs to be reported as deleted.
435 vm_data
= self
._get
_vm
_data
_from
_db
(vm_to_monitor
)
440 vdur_vim_info_update
,
446 self
.update_vim_info_for_deleted_vm(vdur_vim_info_update
)
448 vdur_path
+ ".status": "DELETED",
451 if existing_vim_info
!= vdur_vim_info_update
:
452 # VNFR record is updated one time upon VM deletion.
453 self
.logger
.info(f
"Reporting deletion of VM: {vm_to_monitor.vm_id}")
454 self
.backup_vdu_interfaces(vdur_vim_info_update
)
455 all_updates
= [vdur_update
, {vim_info_path
: vdur_vim_info_update
}]
456 self
.update_in_database(all_updates
, vnfr_id
)
457 self
.logger
.info(f
"Updated vnfr for vm_id: {vm_to_monitor.vm_id}.")
459 def update_vnfrs(self
, servers
: list, ports
: dict, vms_to_monitor
: list) -> None:
460 """Update the VDURs according to the latest information provided by servers list.
463 servers (list): List of existing VMs comes from single Openstack VIM account
464 ports (dict): List of all ports comes from single Openstack VIM account
465 vms_to_monitor (list): List of VMs to be monitored and updated.
467 for vm_to_monitor
in vms_to_monitor
:
469 filter(lambda server
: server
.id == vm_to_monitor
.vm_id
, servers
), None
472 self
.report_vdur_updates(server
, vm_to_monitor
, ports
)
474 self
.report_deleted_vdur(vm_to_monitor
)
476 def serialize(self
, value
: dict) -> Optional
[str]:
477 """Serialization of python basic types.
478 In the case value is not serializable a message will be logged.
481 value (dict/str): Data to serialize
484 serialized_value (str, yaml)
486 if isinstance(value
, str):
490 value
, Dumper
=SafeDumper
, default_flow_style
=True, width
=256
492 except RepresenterError
:
494 "The following entity cannot be serialized in YAML:\n\n%s\n\n",
500 def _get_server_info(self
, server
: object) -> str:
501 """Get the server info, extract some fields and returns info as string.
504 server (object): VM info object
509 server_info
= server
.to_dict()
510 server_info
.pop("OS-EXT-SRV-ATTR:user_data", None)
511 server_info
.pop("user_data", None)
512 return self
.serialize(server_info
)
514 def check_vm_status_updates(
516 vdur_vim_info_update
: dict,
521 """Fills up dictionaries to update VDUR according to server.status.
524 vdur_vim_info_update (dict): Dictionary which keeps the differences of vdur_vim_info
525 vdur_update (dict): Dictionary which keeps the differences of vdur
526 server (server): VM info
527 vdur_path (str): Path of VDUR in DB
529 if server
.status
in openStackvmStatusOk
:
530 vdur_vim_info_update
["vim_status"] = vdur_update
[
531 vdur_path
+ ".status"
535 vdur_vim_info_update
["vim_status"] = vdur_update
[
536 vdur_path
+ ".status"
538 vdur_vim_info_update
["vim_message"] = "VIM status reported " + server
.status
540 vdur_vim_info_update
["vim_details"] = self
._get
_server
_info
(server
)
541 vdur_vim_info_update
["vim_id"] = server
.id
542 vdur_vim_info_update
["vim_name"] = vdur_update
[
547 def get_interface_info(
548 ports
: dict, interface
: dict, server
: object
550 """Get the updated port info regarding with existing interface of server.
553 ports (dict): List of all ports belong to single VIM account
554 interface (dict): Existing interface info which is taken from DB
555 server (object): Server info
558 port (dict): The updated port info related to existing interface of server
562 lambda port
: port
.get("id") == interface
.get("vim_interface_id")
563 and port
.get("device_id") == server
.id,
570 def check_vlan_pci_updates(
571 interface_info
: dict, index
: int, vdur_vim_info_update
: dict
573 """If interface has pci and vlan, update vdur_vim_info dictionary with the refreshed data.
576 interface_info (dict): Refreshed interface info
577 index (int): Index of interface in VDUR
578 vdur_vim_info_update (dict): Dictionary to be updated and used to update VDUR later.
580 if interface_info
.get("binding:profile") and interface_info
[
583 pci
= interface_info
["binding:profile"]["pci_slot"]
584 vdur_vim_info_update
["interfaces"][index
]["pci"] = pci
586 if interface_info
.get("binding:vif_details"):
587 vdur_vim_info_update
["interfaces"][index
]["vlan"] = interface_info
[
588 "binding:vif_details"
592 def check_vdur_interface_updates(
596 interface_info
: dict,
601 """Updates the vdur_update dictionary which stores differences between the latest interface data and data in DB.
604 vdur_update (dict): Dictionary used to store vdur updates
605 vdur_path (str): VDUR record path in DB
606 index (int): Index of interface in VDUR
607 interface_info (dict): Refreshed interface info
608 old_interface (dict): The previous interface info comes from DB
609 vnfr_update (dict): VDUR record path in DB
610 vnfr_id (str): VNFR ID
612 current_ip_address
= MonitorVms
._get
_current
_ip
_address
(interface_info
)
613 if current_ip_address
:
615 vdur_path
+ ".interfaces." + str(index
) + ".ip-address"
616 ] = current_ip_address
618 if old_interface
.get("mgmt_vdu_interface"):
619 vdur_update
[vdur_path
+ ".ip-address"] = current_ip_address
621 if old_interface
.get("mgmt_vnf_interface"):
622 vnfr_update
[vnfr_id
+ ".ip-address"] = current_ip_address
625 vdur_path
+ ".interfaces." + str(index
) + ".mac-address"
626 ] = interface_info
.get("mac_address")
629 def _get_current_ip_address(interface_info
: dict) -> Optional
[str]:
630 if interface_info
.get("fixed_ips") and interface_info
["fixed_ips"][0]:
631 return interface_info
["fixed_ips"][0].get("ip_address")
634 def backup_vdu_interfaces(vdur_vim_info_update
: dict) -> None:
635 """Backup VDU interfaces as interfaces_backup.
638 vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates
640 if vdur_vim_info_update
.get("interfaces") and not vdur_vim_info_update
.get(
643 vdur_vim_info_update
["interfaces_backup"] = vdur_vim_info_update
[
647 def update_vdur_vim_info_interfaces(
649 vdur_vim_info_update
: dict,
651 interface_info
: dict,
654 """Update the vdur_vim_info dictionary with the latest interface info.
657 vdur_vim_info_update (dict): The dictionary which is used to store vdur_vim_info updates
658 index (int): Interface index
659 interface_info (dict): The latest interface info
660 server (object): The latest VM info
663 vdur_vim_info_update
.get("interfaces")
664 and vdur_vim_info_update
["interfaces"][index
]
666 raise MonitorVmsException("Existing interfaces info could not found.")
668 vdur_vim_info_update
["interfaces"][index
].update(
670 "mac_address": interface_info
["mac_address"],
671 "ip_address": interface_info
["fixed_ips"][0].get("ip_address")
672 if interface_info
.get("fixed_ips")
674 "vim_net_id": interface_info
["network_id"],
675 "vim_info": self
.serialize(interface_info
),
676 "compute_node": server
.to_dict()["OS-EXT-SRV-ATTR:host"]
677 if server
.to_dict().get("OS-EXT-SRV-ATTR:host")
682 def prepare_interface_updates(
684 vdur_vim_info_update
: dict,
686 interface_info
: dict,
694 """Updates network related info in vdur_vim_info and vdur by using the latest interface info.
697 vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates
698 index (int): Interface index
699 interface_info (dict): The latest interface info
700 server (object): The latest VM info
701 vdur_path (str): VDUR record path in DB
702 vnfr_update (dict): VDUR record path in DB
703 old_interface (dict): The previous interface info comes from DB
704 vdur_update (dict): Dictionary used to store vdur updates
705 vnfr_id (str): VNFR ID
707 self
.update_vdur_vim_info_interfaces(
708 vdur_vim_info_update
, index
, interface_info
, server
710 self
.check_vlan_pci_updates(interface_info
, index
, vdur_vim_info_update
)
711 self
.check_vdur_interface_updates(
721 def check_vm_interface_updates(
724 existing_vim_info
: dict,
726 vdur_vim_info_update
: dict,
732 """Gets the refreshed interfaces info of server and updates the VDUR if interfaces exist,
733 otherwise reports that interfaces are deleted.
736 server (object): The latest VM info
737 existing_vim_info (dict): VM info details comes from DB
738 ports (dict): All ports info belongs to single VIM account
739 vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates
740 vdur_update (dict): Dictionary used to store vdur updates
741 vdur_path (str): VDUR record path in DB
742 vnfr_update (dict): VDUR record path in DB
743 vnfr_id (str): VNFR ID
745 for index
, old_interface
in enumerate(existing_vim_info
["interfaces"]):
746 interface_info
= self
.get_interface_info(ports
, old_interface
, server
)
747 if not interface_info
:
748 vdur_vim_info_update
[
750 ] = f
"Interface {old_interface['vim_interface_id']} deleted externally."
753 if interface_info
.get("status") in openStacknetStatusOk
:
754 self
.prepare_interface_updates(
755 vdur_vim_info_update
,
767 vdur_vim_info_update
["vim_message"] = (
768 f
"Interface {old_interface['vim_interface_id']} status: "
769 + interface_info
.get("status")
772 def update_in_database(self
, all_updates
: list, vnfr_id
: str) -> None:
773 """Update differences in VNFR.
776 all_updates (list): List of dictionaries which includes differences
777 vnfr_id (str): VNF record ID
783 for updated_dict
in all_updates
:
787 update_dict
=updated_dict
,
788 q_filter
={"_id": vnfr_id
},
790 except DbException
as e
:
791 raise MonitorDbException(
792 f
"Error while updating differences in VNFR {str(e)}"
795 def report_vdur_updates(
796 self
, server
: object, vm_to_monitor
: object, ports
: dict
798 """Report VDU updates by changing the VDUR records in DB.
801 server (object): Refreshed VM info
802 vm_to_monitor (object): VM to be monitored
803 ports (dict): Ports dict includes all ports details regarding with single VIM account
805 vm_data
= self
._get
_vm
_data
_from
_db
(vm_to_monitor
)
810 vdur_vim_info_update
,
816 vdur_update
, vnfr_update
= {}, {}
818 self
.check_vm_status_updates(
819 vdur_vim_info_update
, vdur_update
, server
, vdur_path
822 self
.check_vm_interface_updates(
826 vdur_vim_info_update
,
832 # Update vnfr in MongoDB if there are differences
833 if existing_vim_info
!= vdur_vim_info_update
:
834 self
.logger
.info(f
"Reporting status updates of VM: {vm_to_monitor.vm_id}.")
835 self
.backup_vdu_interfaces(vdur_vim_info_update
)
838 {vim_info_path
: vdur_vim_info_update
},
841 self
.update_in_database(all_updates
, vnfr_id
)
842 self
.logger
.info(f
"Updated vnfr for vm_id: {server.id}.")
844 def run(self
) -> None:
845 """Perfoms the periodic updates of Openstack VMs by sending only two requests to Openstack APIs
846 for each VIM account (in order to get details of all servers, all ports).
852 # If there is not any Openstack type VIM account in DB or VM status updates are disabled by config,
853 # Openstack VMs will not be monitored.
854 if not self
.db_vims
or self
.refresh_config
.active
== -1:
857 ro_tasks_to_monitor
= self
.find_ro_tasks_to_monitor()
858 db_vims
= [vim
["_id"] for vim
in self
.db_vims
]
861 for ro_task
in ro_tasks_to_monitor
:
862 _
, _
, target_vim
= ro_task
["target_id"].partition(":")
863 if target_vim
in db_vims
:
864 self
.prepare_vims_to_monitor(vims_to_monitor
, ro_task
, target_vim
)
866 for vim
in vims_to_monitor
:
867 all_servers
, all_ports
= self
.my_vims
[vim
.vim_id
].get_monitoring_data()
868 self
.update_vnfrs(all_servers
, all_ports
, vim
.vms
)
878 vimconn
.VimConnException
,
880 raise MonitorVmsException(
881 f
"Exception while monitoring Openstack VMs: {str(e)}"
885 def start_monitoring(config
: dict):
886 global monitoring_task
887 if not (config
and config
.get("period")):
888 raise MonitorVmsException("Wrong configuration format is provided.")
889 instance
= MonitorVms(config
)
890 period
= instance
.refresh_config
.active
892 monitoring_task
= threading
.Timer(period
, start_monitoring
, args
=(config
,))
893 monitoring_task
.start()
896 def stop_monitoring():
897 global monitoring_task
899 monitoring_task
.cancel()