Coverage for NG-RO/osm_ng_ro/monitor.py: 97%

317 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-14 12:04 +0000

1####################################################################################### 

2# Copyright ETSI Contributors and Others. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16####################################################################################### 

17from copy import deepcopy 

18from dataclasses import dataclass 

19import logging 

20from os import makedirs, path 

21from pprint import pformat 

22import random 

23import threading 

24from typing import Optional 

25 

26from importlib_metadata import entry_points 

27from osm_common import dbmemory, dbmongo 

28from osm_common.dbbase import DbException 

29from osm_ng_ro.ns_thread import ConfigValidate 

30from osm_ro_plugin import vimconn 

31import yaml 

32from yaml.representer import RepresenterError 

33 

34 

35openStackvmStatusOk = [ 

36 "ACTIVE", 

37 "PAUSED", 

38 "SUSPENDED", 

39 "SHUTOFF", 

40 "BUILD", 

41] 

42 

43openStacknetStatusOk = [ 

44 "ACTIVE", 

45 "PAUSED", 

46 "BUILD", 

47] 

48 

49db_vim_collection = "vim_accounts" 

50vim_type = "openstack" 

51ro_task_collection = "ro_tasks" 

52plugin_name = "rovim_openstack" 

53monitoring_task = None 

54 

55 

56@dataclass 

57class VmToMonitor: 

58 vm_id: str 

59 target_record: str 

60 

61 

62@dataclass 

63class VimToMonitor: 

64 vim_id: str 

65 vms: list 

66 

67 

68class MonitorVmsException(Exception): 

69 def __init__(self, message): 

70 super(Exception, self).__init__(message) 

71 

72 

73class MonitorDbException(Exception): 

74 def __init__(self, message): 

75 super(Exception, self).__init__(message) 

76 

77 

78class MonitorVimException(Exception): 

79 def __init__(self, message): 

80 super(Exception, self).__init__(message) 

81 

82 

83class SafeDumper(yaml.SafeDumper): 

84 def represent_data(self, data): 

85 if isinstance(data, dict) and data.__class__ != dict: 

86 # A solution to convert subclasses of dict to dicts which is not handled by pyyaml. 

87 data = dict(data.items()) 

88 return super(SafeDumper, self).represent_data(data) 

89 

90 

91class MonitorVms: 

92 def __init__(self, config: dict): 

93 self.config = config 

94 self.db = None 

95 self.refresh_config = ConfigValidate(config) 

96 self.my_vims = {} 

97 self.plugins = {} 

98 self.logger = logging.getLogger("ro.monitor") 

99 self.connect_db() 

100 self.db_vims = self.get_db_vims() 

101 self.load_vims() 

102 

103 def load_vims(self) -> None: 

104 for vim in self.db_vims: 

105 if vim["_id"] not in self.my_vims: 

106 self._load_vim(vim["_id"]) 

107 

108 def connect_db(self) -> None: 

109 """Connect to the Database. 

110 

111 Raises: 

112 MonitorDbException 

113 """ 

114 try: 

115 if not self.db: 

116 if self.config["database"]["driver"] == "mongo": 

117 self.db = dbmongo.DbMongo() 

118 self.db.db_connect(self.config["database"]) 

119 elif self.config["database"]["driver"] == "memory": 

120 self.db = dbmemory.DbMemory() 

121 self.db.db_connect(self.config["database"]) 

122 else: 

123 raise MonitorDbException( 

124 "Invalid configuration param '{}' at '[database]':'driver'".format( 

125 self.config["database"]["driver"] 

126 ) 

127 ) 

128 except (DbException, MonitorDbException, ValueError) as e: 

129 raise MonitorDbException(str(e)) 

130 

131 def get_db_vims(self) -> list: 

132 """Get all VIM accounts which types are Openstack.""" 

133 return self.db.get_list(db_vim_collection, {"vim_type": vim_type}) 

134 

135 def find_ro_tasks_to_monitor(self) -> list: 

136 """Get the ro_tasks which belongs to vdu and status DONE.""" 

137 return self.db.get_list( 

138 ro_task_collection, 

139 q_filter={ 

140 "tasks.status": ["DONE"], 

141 "tasks.item": ["vdu"], 

142 }, 

143 ) 

144 

145 @staticmethod 

146 def _initialize_target_vim(vim_module_conn, vim: dict) -> object: 

147 """Create the VIM connector object with given vim details. 

148 

149 Args: 

150 vim_module_conn (class): VIM connector class 

151 vim (dict): VIM details to initialize VIM connecter object 

152 

153 Returns: 

154 VIM connector (object): VIM connector object 

155 """ 

156 return vim_module_conn( 

157 uuid=vim["_id"], 

158 name=vim["name"], 

159 tenant_id=vim.get("vim_tenant_id"), 

160 tenant_name=vim.get("vim_tenant_name"), 

161 url=vim["vim_url"], 

162 url_admin=None, 

163 user=vim["vim_user"], 

164 passwd=vim["vim_password"], 

165 config=vim.get("config") or {}, 

166 persistent_info={}, 

167 ) 

168 

169 def _load_vim(self, target_id) -> None: 

170 """Load or reload a vim_account. 

171 Read content from database, load the plugin if not loaded, then it fills my_vims dictionary. 

172 

173 Args: 

174 target_id (str): ID of vim account 

175 

176 Raises: 

177 MonitorVimException 

178 """ 

179 try: 

180 vim = self.db.get_one(db_vim_collection, {"_id": target_id}) 

181 schema_version = vim.get("schema_version") 

182 self.db.encrypt_decrypt_fields( 

183 vim, 

184 "decrypt", 

185 fields=("password", "secret"), 

186 schema_version=schema_version, 

187 salt=target_id, 

188 ) 

189 self._process_vim_config(target_id, vim) 

190 vim_module_conn = self._load_plugin(plugin_name) 

191 self.my_vims[target_id] = self._initialize_target_vim(vim_module_conn, vim) 

192 self.logger.debug( 

193 "Connector loaded for {}, plugin={}".format(target_id, plugin_name) 

194 ) 

195 except ( 

196 DbException, 

197 IOError, 

198 AttributeError, 

199 MonitorDbException, 

200 MonitorVimException, 

201 TypeError, 

202 ) as e: 

203 raise MonitorVimException( 

204 "Cannot load {} plugin={}: {}".format(target_id, plugin_name, str(e)) 

205 ) 

206 

207 @staticmethod 

208 def _process_vim_config(target_id: str, db_vim: dict) -> None: 

209 """ 

210 Process vim config, creating vim configuration files as ca_cert 

211 Args: 

212 target_id (str): vim id 

213 db_vim (dict): Vim dictionary obtained from database 

214 

215 Raises: 

216 MonitorVimException 

217 """ 

218 if not db_vim.get("config"): 

219 return 

220 file_name = "" 

221 work_dir = "/app/osm_ro/certs" 

222 try: 

223 if db_vim["config"].get("ca_cert_content"): 

224 file_name = f"{work_dir}/{target_id}:{random.randint(0, 99999)}" 

225 

226 if not path.isdir(file_name): 

227 makedirs(file_name) 

228 

229 file_name = file_name + "/ca_cert" 

230 

231 with open(file_name, "w") as f: 

232 f.write(db_vim["config"]["ca_cert_content"]) 

233 del db_vim["config"]["ca_cert_content"] 

234 db_vim["config"]["ca_cert"] = file_name 

235 

236 except (FileNotFoundError, IOError, OSError) as e: 

237 raise MonitorVimException( 

238 "Error writing to file '{}': {}".format(file_name, e) 

239 ) 

240 

241 def _load_plugin(self, name: str = "rovim_openstack", type: str = "vim"): 

242 """Finds the proper VIM connector and returns VIM connector class name. 

243 Args: 

244 name (str): rovim_openstack 

245 type (str): vim 

246 

247 Returns: 

248 VIM connector class name (class) 

249 

250 Raises: 

251 MonitorVimException 

252 """ 

253 try: 

254 if name in self.plugins: 

255 return self.plugins[name] 

256 

257 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name): 

258 self.plugins[name] = ep.load() 

259 return self.plugins[name] 

260 

261 except Exception as e: 

262 raise MonitorVimException("Cannot load plugin osm_{}: {}".format(name, e)) 

263 

264 @staticmethod 

265 def create_vm_to_monitor(ro_task: dict) -> Optional[object]: 

266 """Create VM using dataclass with ro task details. 

267 

268 Args: 

269 ro_task (dict): Details of ro_task 

270 

271 Returns: 

272 VmToMonitor (object) 

273 """ 

274 if not ro_task: 

275 return 

276 return VmToMonitor( 

277 ro_task["vim_info"]["vim_id"], ro_task["tasks"][0]["target_record"] 

278 ) 

279 

280 @staticmethod 

281 def add_vm_to_existing_vim( 

282 vims_to_monitor: list, ro_task: dict, target_vim: str 

283 ) -> bool: 

284 """Add VmToMonitor to existing VIM list. 

285 

286 Args: 

287 vims_to_monitor (list): List of VIMs to monitor 

288 ro_task (dict): ro_task details 

289 target_vim (str): ID of target VIM 

290 

291 Returns: 

292 Boolean If VM is added to VIM list, it returns True else False. 

293 """ 

294 for vim in vims_to_monitor: 

295 if target_vim == vim.vim_id: 

296 vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task) 

297 vim.vms.append(vm_to_monitor) 

298 return True 

299 return False 

300 

301 @staticmethod 

302 def add_new_vim_for_monitoring( 

303 vims_to_monitor: list, ro_task: dict, target_vim: str 

304 ) -> None: 

305 """Create a new VIM object and add to vims_to_monitor list. 

306 

307 Args: 

308 vims_to_monitor (list): List of VIMs to monitor 

309 ro_task (dict): ro_task details 

310 target_vim (str): ID of target VIM 

311 """ 

312 vim_to_monitor = VimToMonitor(target_vim, []) 

313 vm_to_monitor = MonitorVms.create_vm_to_monitor(ro_task) 

314 vim_to_monitor.vms.append(vm_to_monitor) 

315 vims_to_monitor.append(vim_to_monitor) 

316 

317 @staticmethod 

318 def prepare_vims_to_monitor( 

319 vims_to_monitor: list, ro_task: dict, target_vim: str 

320 ) -> None: 

321 """If the required VIM exists in the vims_to_monitor list, add VM under related VIM, 

322 otherwise create a new VIM object and add VM to this new created VIM. 

323 

324 Args: 

325 vims_to_monitor (list): List of VIMs to monitor 

326 ro_task (dict): ro_task details 

327 target_vim (str): ID of target VIM 

328 """ 

329 if not MonitorVms.add_vm_to_existing_vim(vims_to_monitor, ro_task, target_vim): 

330 MonitorVms.add_new_vim_for_monitoring(vims_to_monitor, ro_task, target_vim) 

331 

332 def _get_db_paths(self, target_record: str) -> tuple: 

333 """Get the database paths and info of target VDU and VIM. 

334 

335 Args: 

336 target_record (str): A string which includes vnfr_id, vdur_id, vim_id 

337 

338 Returns: 

339 (vim_info_path: str, vim_id: str, vnfr_id: str, vdur_path:str, vdur_index: int, db_vnfr: dict) tuple 

340 

341 Raises: 

342 MonitorVmsException 

343 """ 

344 try: 

345 [_, vnfr_id, vdur_info, vim_id] = target_record.split(":") 

346 vim_info_path = vdur_info + ":" + vim_id 

347 vdur_path = vim_info_path.split(".vim_info.")[0] 

348 vdur_index = int(vdur_path.split(".")[1]) 

349 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}, fail_on_empty=False) 

350 return vim_info_path, vim_id, vnfr_id, vdur_path, vdur_index, db_vnfr 

351 except (DbException, ValueError) as e: 

352 raise MonitorVmsException(str(e)) 

353 

354 @staticmethod 

355 def _check_if_vdur_vim_info_exists( 

356 db_vnfr: dict, vdur_index: int 

357 ) -> Optional[bool]: 

358 """Check if VNF record and vdur vim_info record exists. 

359 

360 Args: 

361 db_vnfr (dict): VNF record 

362 vdur_index (int): index of vdur under db_vnfr["vdur"] 

363 

364 Returns: 

365 Boolean True if VNF record and vdur vim_info record exists. 

366 """ 

367 try: 

368 if db_vnfr and db_vnfr.get("vdur") and isinstance(vdur_index, int): 

369 if db_vnfr["vdur"][vdur_index] and db_vnfr["vdur"][vdur_index].get( 

370 "vim_info" 

371 ): 

372 return True 

373 except IndexError: 

374 return 

375 

376 def _get_vm_data_from_db(self, vm_to_monitor: object) -> Optional[tuple]: 

377 """Get the required DB path and VIM info data from database. 

378 

379 Args: 

380 vm_to_monitor (object): Includes vm_id and target record in DB. 

381 

382 Returns: 

383 (vdur_path: str, vdur_vim_info_update: dict, db_vnfr: dict, existing_vim_info: dict, vnfr_id,vim_info_path: str) (Tuple): 

384 Required VM info if _check_if_vdur_vim_info_exists else None 

385 """ 

386 ( 

387 vim_info_path, 

388 vim_id, 

389 vnfr_id, 

390 vdur_path, 

391 vdur_index, 

392 db_vnfr, 

393 ) = self._get_db_paths(vm_to_monitor.target_record) 

394 if not self._check_if_vdur_vim_info_exists(db_vnfr, vdur_index): 

395 return 

396 

397 existing_vim_info = db_vnfr["vdur"][vdur_index]["vim_info"].get("vim:" + vim_id) 

398 if not existing_vim_info: 

399 return 

400 

401 vdur_vim_info_update = deepcopy(existing_vim_info) 

402 return ( 

403 vdur_path, 

404 vdur_vim_info_update, 

405 db_vnfr, 

406 existing_vim_info, 

407 vnfr_id, 

408 vim_info_path, 

409 ) 

410 

411 @staticmethod 

412 def update_vim_info_for_deleted_vm(vdur_vim_info_update: dict) -> None: 

413 """Updates the vdur_vim_info_update to report that VM is deleted. 

414 

415 Args: 

416 vdur_vim_info_update (dict): Dictionary to be updated and used to update VDUR later. 

417 """ 

418 vdur_vim_info_update.update( 

419 { 

420 "vim_status": "DELETED", 

421 "vim_message": "Deleted externally", 

422 "vim_id": None, 

423 "vim_name": None, 

424 "interfaces": None, 

425 } 

426 ) 

427 

428 def report_deleted_vdur(self, vm_to_monitor: object) -> None: 

429 """VM does not exist in the Openstack Cloud so update the VNFR to report VM deletion. 

430 

431 Args: 

432 vm_to_monitor (object): VM needs to be reported as deleted. 

433 """ 

434 vm_data = self._get_vm_data_from_db(vm_to_monitor) 

435 if not vm_data: 

436 return 

437 ( 

438 vdur_path, 

439 vdur_vim_info_update, 

440 _, 

441 existing_vim_info, 

442 vnfr_id, 

443 vim_info_path, 

444 ) = vm_data 

445 self.update_vim_info_for_deleted_vm(vdur_vim_info_update) 

446 vdur_update = { 

447 vdur_path + ".status": "DELETED", 

448 } 

449 

450 if existing_vim_info != vdur_vim_info_update: 

451 # VNFR record is updated one time upon VM deletion. 

452 self.logger.info(f"Reporting deletion of VM: {vm_to_monitor.vm_id}") 

453 self.backup_vdu_interfaces(vdur_vim_info_update) 

454 all_updates = [vdur_update, {vim_info_path: vdur_vim_info_update}] 

455 self.update_in_database(all_updates, vnfr_id) 

456 self.logger.info(f"Updated vnfr for vm_id: {vm_to_monitor.vm_id}.") 

457 

458 def update_vnfrs(self, servers: list, ports: dict, vms_to_monitor: list) -> None: 

459 """Update the VDURs according to the latest information provided by servers list. 

460 

461 Args: 

462 servers (list): List of existing VMs comes from single Openstack VIM account 

463 ports (dict): List of all ports comes from single Openstack VIM account 

464 vms_to_monitor (list): List of VMs to be monitored and updated. 

465 """ 

466 for vm_to_monitor in vms_to_monitor: 

467 server = next( 

468 filter(lambda server: server.id == vm_to_monitor.vm_id, servers), None 

469 ) 

470 if server: 

471 self.report_vdur_updates(server, vm_to_monitor, ports) 

472 else: 

473 self.report_deleted_vdur(vm_to_monitor) 

474 

475 def serialize(self, value: dict) -> Optional[str]: 

476 """Serialization of python basic types. 

477 In the case value is not serializable a message will be logged. 

478 

479 Args: 

480 value (dict/str): Data to serialize 

481 

482 Returns: 

483 serialized_value (str, yaml) 

484 """ 

485 if isinstance(value, str): 

486 return value 

487 try: 

488 return yaml.dump( 

489 value, Dumper=SafeDumper, default_flow_style=True, width=256 

490 ) 

491 except RepresenterError: 

492 self.logger.info( 

493 "The following entity cannot be serialized in YAML:\n\n%s\n\n", 

494 pformat(value), 

495 exc_info=True, 

496 ) 

497 return str(value) 

498 

499 def _get_server_info(self, server: object) -> str: 

500 """Get the server info, extract some fields and returns info as string. 

501 

502 Args: 

503 server (object): VM info object 

504 

505 Returns: 

506 server_info (string) 

507 """ 

508 server_info = server.to_dict() 

509 server_info.pop("OS-EXT-SRV-ATTR:user_data", None) 

510 server_info.pop("user_data", None) 

511 return self.serialize(server_info) 

512 

513 def check_vm_status_updates( 

514 self, 

515 vdur_vim_info_update: dict, 

516 vdur_update: dict, 

517 server: object, 

518 vdur_path: str, 

519 ) -> None: 

520 """Fills up dictionaries to update VDUR according to server.status. 

521 

522 Args: 

523 vdur_vim_info_update (dict): Dictionary which keeps the differences of vdur_vim_info 

524 vdur_update (dict): Dictionary which keeps the differences of vdur 

525 server (server): VM info 

526 vdur_path (str): Path of VDUR in DB 

527 """ 

528 if server.status in openStackvmStatusOk: 

529 vdur_vim_info_update["vim_status"] = vdur_update[vdur_path + ".status"] = ( 

530 server.status 

531 ) 

532 

533 else: 

534 vdur_vim_info_update["vim_status"] = vdur_update[vdur_path + ".status"] = ( 

535 server.status 

536 ) 

537 vdur_vim_info_update["vim_message"] = "VIM status reported " + server.status 

538 

539 vdur_vim_info_update["vim_details"] = self._get_server_info(server) 

540 vdur_vim_info_update["vim_id"] = server.id 

541 vdur_vim_info_update["vim_name"] = vdur_update[vdur_path + ".name"] = ( 

542 server.name 

543 ) 

544 

545 @staticmethod 

546 def get_interface_info( 

547 ports: dict, interface: dict, server: object 

548 ) -> Optional[dict]: 

549 """Get the updated port info regarding with existing interface of server. 

550 

551 Args: 

552 ports (dict): List of all ports belong to single VIM account 

553 interface (dict): Existing interface info which is taken from DB 

554 server (object): Server info 

555 

556 Returns: 

557 port (dict): The updated port info related to existing interface of server 

558 """ 

559 return next( 

560 filter( 

561 lambda port: port.get("id") == interface.get("vim_interface_id") 

562 and port.get("device_id") == server.id, 

563 ports["ports"], 

564 ), 

565 None, 

566 ) 

567 

568 @staticmethod 

569 def check_vlan_pci_updates( 

570 interface_info: dict, index: int, vdur_vim_info_update: dict 

571 ) -> None: 

572 """If interface has pci and vlan, update vdur_vim_info dictionary with the refreshed data. 

573 

574 Args: 

575 interface_info (dict): Refreshed interface info 

576 index (int): Index of interface in VDUR 

577 vdur_vim_info_update (dict): Dictionary to be updated and used to update VDUR later. 

578 """ 

579 if interface_info.get("binding:profile") and interface_info[ 

580 "binding:profile" 

581 ].get("pci_slot"): 

582 pci = interface_info["binding:profile"]["pci_slot"] 

583 vdur_vim_info_update["interfaces"][index]["pci"] = pci 

584 

585 if interface_info.get("binding:vif_details"): 

586 vdur_vim_info_update["interfaces"][index]["vlan"] = interface_info[ 

587 "binding:vif_details" 

588 ].get("vlan") 

589 

590 @staticmethod 

591 def check_vdur_interface_updates( 

592 vdur_update: dict, 

593 vdur_path: str, 

594 index: int, 

595 interface_info: dict, 

596 old_interface: dict, 

597 vnfr_update: dict, 

598 vnfr_id: str, 

599 ) -> None: 

600 """Updates the vdur_update dictionary which stores differences between the latest interface data and data in DB. 

601 

602 Args: 

603 vdur_update (dict): Dictionary used to store vdur updates 

604 vdur_path (str): VDUR record path in DB 

605 index (int): Index of interface in VDUR 

606 interface_info (dict): Refreshed interface info 

607 old_interface (dict): The previous interface info comes from DB 

608 vnfr_update (dict): VDUR record path in DB 

609 vnfr_id (str): VNFR ID 

610 """ 

611 current_ip_address = MonitorVms._get_current_ip_address(interface_info) 

612 if current_ip_address: 

613 vdur_update[vdur_path + ".interfaces." + str(index) + ".ip-address"] = ( 

614 current_ip_address 

615 ) 

616 

617 if old_interface.get("mgmt_vdu_interface"): 

618 vdur_update[vdur_path + ".ip-address"] = current_ip_address 

619 

620 if old_interface.get("mgmt_vnf_interface"): 

621 vnfr_update[vnfr_id + ".ip-address"] = current_ip_address 

622 

623 vdur_update[vdur_path + ".interfaces." + str(index) + ".mac-address"] = ( 

624 interface_info.get("mac_address") 

625 ) 

626 

627 @staticmethod 

628 def _get_dual_ip(data=None): 

629 if data: 

630 ip_addresses = [item["ip_address"] for item in data] 

631 return ";".join(ip_addresses) if len(ip_addresses) > 1 else ip_addresses[0] 

632 else: 

633 return None 

634 

635 @staticmethod 

636 def _get_current_ip_address(interface_info: dict) -> Optional[str]: 

637 if interface_info.get("fixed_ips") and interface_info["fixed_ips"][0]: 

638 return MonitorVms._get_dual_ip(interface_info.get("fixed_ips")) 

639 

640 @staticmethod 

641 def backup_vdu_interfaces(vdur_vim_info_update: dict) -> None: 

642 """Backup VDU interfaces as interfaces_backup. 

643 

644 Args: 

645 vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates 

646 """ 

647 if vdur_vim_info_update.get("interfaces") and not vdur_vim_info_update.get( 

648 "vim_message" 

649 ): 

650 vdur_vim_info_update["interfaces_backup"] = vdur_vim_info_update[ 

651 "interfaces" 

652 ] 

653 

654 def update_vdur_vim_info_interfaces( 

655 self, 

656 vdur_vim_info_update: dict, 

657 index: int, 

658 interface_info: dict, 

659 server: object, 

660 ) -> None: 

661 """Update the vdur_vim_info dictionary with the latest interface info. 

662 

663 Args: 

664 vdur_vim_info_update (dict): The dictionary which is used to store vdur_vim_info updates 

665 index (int): Interface index 

666 interface_info (dict): The latest interface info 

667 server (object): The latest VM info 

668 """ 

669 if not ( 

670 vdur_vim_info_update.get("interfaces") 

671 and vdur_vim_info_update["interfaces"][index] 

672 ): 

673 raise MonitorVmsException("Existing interfaces info could not found.") 

674 

675 vdur_vim_info_update["interfaces"][index].update( 

676 { 

677 "mac_address": interface_info["mac_address"], 

678 "ip_address": ( 

679 interface_info["fixed_ips"][0].get("ip_address") 

680 if interface_info.get("fixed_ips") 

681 else None 

682 ), 

683 "vim_net_id": interface_info["network_id"], 

684 "vim_info": self.serialize(interface_info), 

685 "compute_node": ( 

686 server.to_dict()["OS-EXT-SRV-ATTR:host"] 

687 if server.to_dict().get("OS-EXT-SRV-ATTR:host") 

688 else None 

689 ), 

690 } 

691 ) 

692 

693 def prepare_interface_updates( 

694 self, 

695 vdur_vim_info_update: dict, 

696 index: int, 

697 interface_info: dict, 

698 server: object, 

699 vdur_path: str, 

700 vnfr_update: dict, 

701 old_interface: dict, 

702 vdur_update: dict, 

703 vnfr_id: str, 

704 ) -> None: 

705 """Updates network related info in vdur_vim_info and vdur by using the latest interface info. 

706 

707 Args: 

708 vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates 

709 index (int): Interface index 

710 interface_info (dict): The latest interface info 

711 server (object): The latest VM info 

712 vdur_path (str): VDUR record path in DB 

713 vnfr_update (dict): VDUR record path in DB 

714 old_interface (dict): The previous interface info comes from DB 

715 vdur_update (dict): Dictionary used to store vdur updates 

716 vnfr_id (str): VNFR ID 

717 """ 

718 self.update_vdur_vim_info_interfaces( 

719 vdur_vim_info_update, index, interface_info, server 

720 ) 

721 self.check_vlan_pci_updates(interface_info, index, vdur_vim_info_update) 

722 self.check_vdur_interface_updates( 

723 vdur_update, 

724 vdur_path, 

725 index, 

726 interface_info, 

727 old_interface, 

728 vnfr_update, 

729 vnfr_id, 

730 ) 

731 

732 def check_vm_interface_updates( 

733 self, 

734 server: object, 

735 existing_vim_info: dict, 

736 ports: dict, 

737 vdur_vim_info_update: dict, 

738 vdur_update: dict, 

739 vdur_path: str, 

740 vnfr_update: dict, 

741 vnfr_id: str, 

742 ) -> None: 

743 """Gets the refreshed interfaces info of server and updates the VDUR if interfaces exist, 

744 otherwise reports that interfaces are deleted. 

745 

746 Args: 

747 server (object): The latest VM info 

748 existing_vim_info (dict): VM info details comes from DB 

749 ports (dict): All ports info belongs to single VIM account 

750 vdur_vim_info_update (dict): Dictionary used to store vdur_vim_info updates 

751 vdur_update (dict): Dictionary used to store vdur updates 

752 vdur_path (str): VDUR record path in DB 

753 vnfr_update (dict): VDUR record path in DB 

754 vnfr_id (str): VNFR ID 

755 """ 

756 for index, old_interface in enumerate(existing_vim_info["interfaces"]): 

757 interface_info = self.get_interface_info(ports, old_interface, server) 

758 if not interface_info: 

759 vdur_vim_info_update["vim_message"] = ( 

760 f"Interface {old_interface['vim_interface_id']} deleted externally." 

761 ) 

762 

763 else: 

764 if interface_info.get("status") in openStacknetStatusOk: 

765 self.prepare_interface_updates( 

766 vdur_vim_info_update, 

767 index, 

768 interface_info, 

769 server, 

770 vdur_path, 

771 vnfr_update, 

772 old_interface, 

773 vdur_update, 

774 vnfr_id, 

775 ) 

776 

777 else: 

778 vdur_vim_info_update["vim_message"] = ( 

779 f"Interface {old_interface['vim_interface_id']} status: " 

780 + interface_info.get("status") 

781 ) 

782 

783 def update_in_database(self, all_updates: list, vnfr_id: str) -> None: 

784 """Update differences in VNFR. 

785 

786 Args: 

787 all_updates (list): List of dictionaries which includes differences 

788 vnfr_id (str): VNF record ID 

789 

790 Raises: 

791 MonitorDbException 

792 """ 

793 try: 

794 for updated_dict in all_updates: 

795 if updated_dict: 

796 self.db.set_list( 

797 "vnfrs", 

798 update_dict=updated_dict, 

799 q_filter={"_id": vnfr_id}, 

800 ) 

801 except DbException as e: 

802 raise MonitorDbException( 

803 f"Error while updating differences in VNFR {str(e)}" 

804 ) 

805 

806 def report_vdur_updates( 

807 self, server: object, vm_to_monitor: object, ports: dict 

808 ) -> None: 

809 """Report VDU updates by changing the VDUR records in DB. 

810 

811 Args: 

812 server (object): Refreshed VM info 

813 vm_to_monitor (object): VM to be monitored 

814 ports (dict): Ports dict includes all ports details regarding with single VIM account 

815 """ 

816 vm_data = self._get_vm_data_from_db(vm_to_monitor) 

817 if not vm_data: 

818 return 

819 ( 

820 vdur_path, 

821 vdur_vim_info_update, 

822 _, 

823 existing_vim_info, 

824 vnfr_id, 

825 vim_info_path, 

826 ) = vm_data 

827 vdur_update, vnfr_update = {}, {} 

828 

829 self.check_vm_status_updates( 

830 vdur_vim_info_update, vdur_update, server, vdur_path 

831 ) 

832 

833 self.check_vm_interface_updates( 

834 server, 

835 existing_vim_info, 

836 ports, 

837 vdur_vim_info_update, 

838 vdur_update, 

839 vdur_path, 

840 vnfr_update, 

841 vnfr_id, 

842 ) 

843 # Update vnfr in MongoDB if there are differences 

844 if existing_vim_info != vdur_vim_info_update: 

845 self.logger.info(f"Reporting status updates of VM: {vm_to_monitor.vm_id}.") 

846 self.backup_vdu_interfaces(vdur_vim_info_update) 

847 all_updates = [ 

848 vdur_update, 

849 {vim_info_path: vdur_vim_info_update}, 

850 vnfr_update, 

851 ] 

852 self.update_in_database(all_updates, vnfr_id) 

853 self.logger.info(f"Updated vnfr for vm_id: {server.id}.") 

854 

855 def run(self) -> None: 

856 """Perfoms the periodic updates of Openstack VMs by sending only two requests to Openstack APIs 

857 for each VIM account (in order to get details of all servers, all ports). 

858 

859 Raises: 

860 MonitorVmsException 

861 """ 

862 try: 

863 # If there is not any Openstack type VIM account in DB or VM status updates are disabled by config, 

864 # Openstack VMs will not be monitored. 

865 if not self.db_vims or self.refresh_config.active == -1: 

866 return 

867 

868 ro_tasks_to_monitor = self.find_ro_tasks_to_monitor() 

869 db_vims = [vim["_id"] for vim in self.db_vims] 

870 vims_to_monitor = [] 

871 

872 for ro_task in ro_tasks_to_monitor: 

873 _, _, target_vim = ro_task["target_id"].partition(":") 

874 if target_vim in db_vims: 

875 self.prepare_vims_to_monitor(vims_to_monitor, ro_task, target_vim) 

876 

877 for vim in vims_to_monitor: 

878 try: 

879 all_servers, all_ports = self.my_vims[ 

880 vim.vim_id 

881 ].get_monitoring_data() 

882 self.update_vnfrs(all_servers, all_ports, vim.vms) 

883 except (DbException, MonitorDbException) as e: 

884 raise MonitorVmsException(str(e)) 

885 except Exception as e: 

886 self.logger.info("Exception in vim monitoring {}".format(e)) 

887 continue 

888 except ( 

889 DbException, 

890 MonitorDbException, 

891 MonitorVimException, 

892 MonitorVmsException, 

893 ValueError, 

894 KeyError, 

895 TypeError, 

896 AttributeError, 

897 vimconn.VimConnException, 

898 ) as e: 

899 raise MonitorVmsException( 

900 f"Exception while monitoring Openstack VMs: {str(e)}" 

901 ) 

902 except Exception as e: 

903 self.logger.info("Exception in monitoring {}".format(e)) 

904 

905 

906def start_monitoring(config: dict): 

907 global monitoring_task 

908 if not (config and config.get("period")): 

909 raise MonitorVmsException("Wrong configuration format is provided.") 

910 instance = MonitorVms(config) 

911 period = instance.refresh_config.active 

912 instance.run() 

913 if period == -1: 

914 period = 10 * 24 * 60 * 60 # 10 days (big enough) 

915 monitoring_task = threading.Timer(period, start_monitoring, args=(config,)) 

916 monitoring_task.start() 

917 

918 

919def stop_monitoring(): 

920 global monitoring_task 

921 if monitoring_task: 

922 monitoring_task.cancel()