Coverage for NG-RO/osm_ng_ro/ns_thread.py: 30%
1457 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-14 12:04 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-14 12:04 +0000
1# -*- coding: utf-8 -*-
3##
4# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17#
18##
20"""
21This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22The tasks are stored at database in table ro_tasks
23A single ro_task refers to a VIM element (flavor, image, network, ...).
24A ro_task can contain several 'tasks', each one with a target, where to store the results
25"""
27from copy import deepcopy
28from http import HTTPStatus
29import logging
30from os import makedirs
31from os import path
32import queue
33import threading
34import time
35import traceback
36from typing import Dict
37from unittest.mock import Mock
39from importlib_metadata import entry_points
40from osm_common.dbbase import DbException
41from osm_ng_ro.vim_admin import LockRenew
42from osm_ro_plugin import sdnconn
43from osm_ro_plugin import vimconn
44from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45from osm_ro_plugin.vim_dummy import VimDummyConnector
46import yaml
48__author__ = "Alfonso Tierno"
49__date__ = "$28-Sep-2017 12:07:15$"
52def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
68class NsWorkerException(Exception):
69 pass
72class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
89class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
93class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
111 return "DONE", {}
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
121class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
235 return "FAILED", ro_vim_item_update
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 (
289 ro_vim_item_update.get("vim_message")
290 if ro_vim_item_update.get("vim_status") != "ACTIVE"
291 else ""
292 ),
293 )
294 )
296 return task_status, ro_vim_item_update
298 def delete(self, ro_task, task_index):
299 task = ro_task["tasks"][task_index]
300 task_id = task["task_id"]
301 net_vim_id = ro_task["vim_info"]["vim_id"]
302 ro_vim_item_update_ok = {
303 "vim_status": "DELETED",
304 "created": False,
305 "vim_message": "DELETED",
306 "vim_id": None,
307 }
309 try:
310 if net_vim_id or ro_task["vim_info"]["created_items"]:
311 target_vim = self.my_vims[ro_task["target_id"]]
312 target_vim.delete_network(
313 net_vim_id, ro_task["vim_info"]["created_items"]
314 )
315 except vimconn.VimConnNotFoundException:
316 ro_vim_item_update_ok["vim_message"] = "already deleted"
317 except vimconn.VimConnException as e:
318 self.logger.error(
319 "ro_task={} vim={} del-net={}: {}".format(
320 ro_task["_id"], ro_task["target_id"], net_vim_id, e
321 )
322 )
323 ro_vim_item_update = {
324 "vim_status": "VIM_ERROR",
325 "vim_message": "Error while deleting: {}".format(e),
326 }
328 return "FAILED", ro_vim_item_update
330 self.logger.debug(
331 "task={} {} del-net={} {}".format(
332 task_id,
333 ro_task["target_id"],
334 net_vim_id,
335 ro_vim_item_update_ok.get("vim_message", ""),
336 )
337 )
339 return "DONE", ro_vim_item_update_ok
342class VimInteractionClassification(VimInteractionBase):
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 target_vim = self.my_vims[ro_task["target_id"]]
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
354 name = params_copy.pop("name")
355 logical_source_port_index = int(
356 params_copy.pop("logical_source_port_index")
357 )
358 logical_source_port = params_copy["logical_source_port"]
360 if logical_source_port.startswith("TASK-"):
361 vm_id = task_depends[logical_source_port]
362 params_copy["logical_source_port"] = target_vim.refresh_vms_status(
363 [vm_id]
364 )[vm_id]["interfaces"][logical_source_port_index]["vim_interface_id"]
366 vim_classification_id = target_vim.new_classification(
367 name, "legacy_flow_classifier", params_copy
368 )
370 ro_vim_item_update = {
371 "vim_id": vim_classification_id,
372 "vim_status": "DONE",
373 "created": created,
374 "vim_details": None,
375 "vim_message": None,
376 }
377 self.logger.debug(
378 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
379 )
381 return "DONE", ro_vim_item_update
382 except (vimconn.VimConnException, NsWorkerException) as e:
383 self.logger.debug(traceback.format_exc())
384 self.logger.error(
385 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
386 )
387 ro_vim_item_update = {
388 "vim_status": "VIM_ERROR",
389 "created": created,
390 "vim_message": str(e),
391 }
393 return "FAILED", ro_vim_item_update
395 def delete(self, ro_task, task_index):
396 task = ro_task["tasks"][task_index]
397 task_id = task["task_id"]
398 classification_vim_id = ro_task["vim_info"]["vim_id"]
399 ro_vim_item_update_ok = {
400 "vim_status": "DELETED",
401 "created": False,
402 "vim_message": "DELETED",
403 "vim_id": None,
404 }
406 try:
407 if classification_vim_id:
408 target_vim = self.my_vims[ro_task["target_id"]]
409 target_vim.delete_classification(classification_vim_id)
410 except vimconn.VimConnNotFoundException:
411 ro_vim_item_update_ok["vim_message"] = "already deleted"
412 except vimconn.VimConnException as e:
413 self.logger.error(
414 "ro_task={} vim={} del-classification={}: {}".format(
415 ro_task["_id"], ro_task["target_id"], classification_vim_id, e
416 )
417 )
418 ro_vim_item_update = {
419 "vim_status": "VIM_ERROR",
420 "vim_message": "Error while deleting: {}".format(e),
421 }
423 return "FAILED", ro_vim_item_update
425 self.logger.debug(
426 "task={} {} del-classification={} {}".format(
427 task_id,
428 ro_task["target_id"],
429 classification_vim_id,
430 ro_vim_item_update_ok.get("vim_message", ""),
431 )
432 )
434 return "DONE", ro_vim_item_update_ok
437class VimInteractionSfi(VimInteractionBase):
438 def new(self, ro_task, task_index, task_depends):
439 task = ro_task["tasks"][task_index]
440 task_id = task["task_id"]
441 created = False
442 target_vim = self.my_vims[ro_task["target_id"]]
444 try:
445 created = True
446 params = task["params"]
447 params_copy = deepcopy(params)
448 name = params_copy["name"]
449 ingress_port = params_copy["ingress_port"]
450 egress_port = params_copy["egress_port"]
451 ingress_port_index = params_copy["ingress_port_index"]
452 egress_port_index = params_copy["egress_port_index"]
454 ingress_port_id = ingress_port
455 egress_port_id = egress_port
457 vm_id = task_depends[ingress_port]
459 if ingress_port.startswith("TASK-"):
460 ingress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][
461 "interfaces"
462 ][ingress_port_index]["vim_interface_id"]
464 if ingress_port == egress_port:
465 egress_port_id = ingress_port_id
466 else:
467 if egress_port.startswith("TASK-"):
468 egress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][
469 "interfaces"
470 ][egress_port_index]["vim_interface_id"]
472 ingress_port_id_list = [ingress_port_id]
473 egress_port_id_list = [egress_port_id]
475 vim_sfi_id = target_vim.new_sfi(
476 name, ingress_port_id_list, egress_port_id_list, sfc_encap=False
477 )
479 ro_vim_item_update = {
480 "vim_id": vim_sfi_id,
481 "vim_status": "DONE",
482 "created": created,
483 "vim_details": None,
484 "vim_message": None,
485 }
486 self.logger.debug(
487 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
488 )
490 return "DONE", ro_vim_item_update
491 except (vimconn.VimConnException, NsWorkerException) as e:
492 self.logger.debug(traceback.format_exc())
493 self.logger.error(
494 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
495 )
496 ro_vim_item_update = {
497 "vim_status": "VIM_ERROR",
498 "created": created,
499 "vim_message": str(e),
500 }
502 return "FAILED", ro_vim_item_update
504 def delete(self, ro_task, task_index):
505 task = ro_task["tasks"][task_index]
506 task_id = task["task_id"]
507 sfi_vim_id = ro_task["vim_info"]["vim_id"]
508 ro_vim_item_update_ok = {
509 "vim_status": "DELETED",
510 "created": False,
511 "vim_message": "DELETED",
512 "vim_id": None,
513 }
515 try:
516 if sfi_vim_id:
517 target_vim = self.my_vims[ro_task["target_id"]]
518 target_vim.delete_sfi(sfi_vim_id)
519 except vimconn.VimConnNotFoundException:
520 ro_vim_item_update_ok["vim_message"] = "already deleted"
521 except vimconn.VimConnException as e:
522 self.logger.error(
523 "ro_task={} vim={} del-sfi={}: {}".format(
524 ro_task["_id"], ro_task["target_id"], sfi_vim_id, e
525 )
526 )
527 ro_vim_item_update = {
528 "vim_status": "VIM_ERROR",
529 "vim_message": "Error while deleting: {}".format(e),
530 }
532 return "FAILED", ro_vim_item_update
534 self.logger.debug(
535 "task={} {} del-sfi={} {}".format(
536 task_id,
537 ro_task["target_id"],
538 sfi_vim_id,
539 ro_vim_item_update_ok.get("vim_message", ""),
540 )
541 )
543 return "DONE", ro_vim_item_update_ok
546class VimInteractionSf(VimInteractionBase):
547 def new(self, ro_task, task_index, task_depends):
548 task = ro_task["tasks"][task_index]
549 task_id = task["task_id"]
550 created = False
551 target_vim = self.my_vims[ro_task["target_id"]]
553 try:
554 created = True
555 params = task["params"]
556 params_copy = deepcopy(params)
557 name = params_copy["name"]
558 sfi_list = params_copy["sfis"]
559 sfi_id_list = []
561 for sfi in sfi_list:
562 sfi_id = task_depends[sfi] if sfi.startswith("TASK-") else sfi
563 sfi_id_list.append(sfi_id)
565 vim_sf_id = target_vim.new_sf(name, sfi_id_list, sfc_encap=False)
567 ro_vim_item_update = {
568 "vim_id": vim_sf_id,
569 "vim_status": "DONE",
570 "created": created,
571 "vim_details": None,
572 "vim_message": None,
573 }
574 self.logger.debug(
575 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
576 )
578 return "DONE", ro_vim_item_update
579 except (vimconn.VimConnException, NsWorkerException) as e:
580 self.logger.debug(traceback.format_exc())
581 self.logger.error(
582 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
583 )
584 ro_vim_item_update = {
585 "vim_status": "VIM_ERROR",
586 "created": created,
587 "vim_message": str(e),
588 }
590 return "FAILED", ro_vim_item_update
592 def delete(self, ro_task, task_index):
593 task = ro_task["tasks"][task_index]
594 task_id = task["task_id"]
595 sf_vim_id = ro_task["vim_info"]["vim_id"]
596 ro_vim_item_update_ok = {
597 "vim_status": "DELETED",
598 "created": False,
599 "vim_message": "DELETED",
600 "vim_id": None,
601 }
603 try:
604 if sf_vim_id:
605 target_vim = self.my_vims[ro_task["target_id"]]
606 target_vim.delete_sf(sf_vim_id)
607 except vimconn.VimConnNotFoundException:
608 ro_vim_item_update_ok["vim_message"] = "already deleted"
609 except vimconn.VimConnException as e:
610 self.logger.error(
611 "ro_task={} vim={} del-sf={}: {}".format(
612 ro_task["_id"], ro_task["target_id"], sf_vim_id, e
613 )
614 )
615 ro_vim_item_update = {
616 "vim_status": "VIM_ERROR",
617 "vim_message": "Error while deleting: {}".format(e),
618 }
620 return "FAILED", ro_vim_item_update
622 self.logger.debug(
623 "task={} {} del-sf={} {}".format(
624 task_id,
625 ro_task["target_id"],
626 sf_vim_id,
627 ro_vim_item_update_ok.get("vim_message", ""),
628 )
629 )
631 return "DONE", ro_vim_item_update_ok
634class VimInteractionSfp(VimInteractionBase):
635 def new(self, ro_task, task_index, task_depends):
636 task = ro_task["tasks"][task_index]
637 task_id = task["task_id"]
638 created = False
639 target_vim = self.my_vims[ro_task["target_id"]]
641 try:
642 created = True
643 params = task["params"]
644 params_copy = deepcopy(params)
645 name = params_copy["name"]
646 sf_list = params_copy["sfs"]
647 classification_list = params_copy["classifications"]
649 classification_id_list = []
650 sf_id_list = []
652 for classification in classification_list:
653 classi_id = (
654 task_depends[classification]
655 if classification.startswith("TASK-")
656 else classification
657 )
658 classification_id_list.append(classi_id)
660 for sf in sf_list:
661 sf_id = task_depends[sf] if sf.startswith("TASK-") else sf
662 sf_id_list.append(sf_id)
664 vim_sfp_id = target_vim.new_sfp(
665 name, classification_id_list, sf_id_list, sfc_encap=False
666 )
668 ro_vim_item_update = {
669 "vim_id": vim_sfp_id,
670 "vim_status": "DONE",
671 "created": created,
672 "vim_details": None,
673 "vim_message": None,
674 }
675 self.logger.debug(
676 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
677 )
679 return "DONE", ro_vim_item_update
680 except (vimconn.VimConnException, NsWorkerException) as e:
681 self.logger.debug(traceback.format_exc())
682 self.logger.error(
683 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
684 )
685 ro_vim_item_update = {
686 "vim_status": "VIM_ERROR",
687 "created": created,
688 "vim_message": str(e),
689 }
691 return "FAILED", ro_vim_item_update
693 def delete(self, ro_task, task_index):
694 task = ro_task["tasks"][task_index]
695 task_id = task["task_id"]
696 sfp_vim_id = ro_task["vim_info"]["vim_id"]
697 ro_vim_item_update_ok = {
698 "vim_status": "DELETED",
699 "created": False,
700 "vim_message": "DELETED",
701 "vim_id": None,
702 }
704 try:
705 if sfp_vim_id:
706 target_vim = self.my_vims[ro_task["target_id"]]
707 target_vim.delete_sfp(sfp_vim_id)
708 except vimconn.VimConnNotFoundException:
709 ro_vim_item_update_ok["vim_message"] = "already deleted"
710 except vimconn.VimConnException as e:
711 self.logger.error(
712 "ro_task={} vim={} del-sfp={}: {}".format(
713 ro_task["_id"], ro_task["target_id"], sfp_vim_id, e
714 )
715 )
716 ro_vim_item_update = {
717 "vim_status": "VIM_ERROR",
718 "vim_message": "Error while deleting: {}".format(e),
719 }
721 return "FAILED", ro_vim_item_update
723 self.logger.debug(
724 "task={} {} del-sfp={} {}".format(
725 task_id,
726 ro_task["target_id"],
727 sfp_vim_id,
728 ro_vim_item_update_ok.get("vim_message", ""),
729 )
730 )
732 return "DONE", ro_vim_item_update_ok
735class VimInteractionVdu(VimInteractionBase):
736 max_retries_inject_ssh_key = 20 # 20 times
737 time_retries_inject_ssh_key = 30 # wevery 30 seconds
739 def new(self, ro_task, task_index, task_depends):
740 task = ro_task["tasks"][task_index]
741 task_id = task["task_id"]
742 created = False
743 target_vim = self.my_vims[ro_task["target_id"]]
744 try:
745 created = True
746 params = task["params"]
747 params_copy = deepcopy(params)
748 net_list = params_copy["net_list"]
750 for net in net_list:
751 # change task_id into network_id
752 if "net_id" in net and net["net_id"].startswith("TASK-"):
753 network_id = task_depends[net["net_id"]]
755 if not network_id:
756 raise NsWorkerException(
757 "Cannot create VM because depends on a network not created or found "
758 "for {}".format(net["net_id"])
759 )
761 net["net_id"] = network_id
763 if params_copy["image_id"].startswith("TASK-"):
764 params_copy["image_id"] = task_depends[params_copy["image_id"]]
766 if params_copy["flavor_id"].startswith("TASK-"):
767 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
769 affinity_group_list = params_copy["affinity_group_list"]
770 for affinity_group in affinity_group_list:
771 # change task_id into affinity_group_id
772 if "affinity_group_id" in affinity_group and affinity_group[
773 "affinity_group_id"
774 ].startswith("TASK-"):
775 affinity_group_id = task_depends[
776 affinity_group["affinity_group_id"]
777 ]
779 if not affinity_group_id:
780 raise NsWorkerException(
781 "found for {}".format(affinity_group["affinity_group_id"])
782 )
784 affinity_group["affinity_group_id"] = affinity_group_id
785 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
786 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
788 # add to created items previous_created_volumes (healing)
789 if task.get("previous_created_volumes"):
790 for k, v in task["previous_created_volumes"].items():
791 created_items[k] = v
793 ro_vim_item_update = {
794 "vim_id": vim_vm_id,
795 "vim_status": "BUILD",
796 "created": created,
797 "created_items": created_items,
798 "vim_details": None,
799 "vim_message": None,
800 "interfaces_vim_ids": interfaces,
801 "interfaces": [],
802 "interfaces_backup": [],
803 }
804 self.logger.debug(
805 "task={} {} new-vm={} created={}".format(
806 task_id, ro_task["target_id"], vim_vm_id, created
807 )
808 )
810 return "BUILD", ro_vim_item_update
811 except (vimconn.VimConnException, NsWorkerException) as e:
812 self.logger.debug(traceback.format_exc())
813 self.logger.error(
814 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
815 )
816 ro_vim_item_update = {
817 "vim_status": "VIM_ERROR",
818 "created": created,
819 "vim_message": str(e),
820 }
822 return "FAILED", ro_vim_item_update
824 def delete(self, ro_task, task_index):
825 task = ro_task["tasks"][task_index]
826 task_id = task["task_id"]
827 vm_vim_id = ro_task["vim_info"]["vim_id"]
828 ro_vim_item_update_ok = {
829 "vim_status": "DELETED",
830 "created": False,
831 "vim_message": "DELETED",
832 "vim_id": None,
833 }
835 try:
836 self.logger.debug(
837 "delete_vminstance: vm_vim_id={} created_items={}".format(
838 vm_vim_id, ro_task["vim_info"]["created_items"]
839 )
840 )
841 if vm_vim_id or ro_task["vim_info"]["created_items"]:
842 target_vim = self.my_vims[ro_task["target_id"]]
843 target_vim.delete_vminstance(
844 vm_vim_id,
845 ro_task["vim_info"]["created_items"],
846 ro_task["vim_info"].get("volumes_to_hold", []),
847 )
848 except vimconn.VimConnNotFoundException:
849 ro_vim_item_update_ok["vim_message"] = "already deleted"
850 except vimconn.VimConnException as e:
851 self.logger.error(
852 "ro_task={} vim={} del-vm={}: {}".format(
853 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
854 )
855 )
856 ro_vim_item_update = {
857 "vim_status": "VIM_ERROR",
858 "vim_message": "Error while deleting: {}".format(e),
859 }
861 return "FAILED", ro_vim_item_update
863 self.logger.debug(
864 "task={} {} del-vm={} {}".format(
865 task_id,
866 ro_task["target_id"],
867 vm_vim_id,
868 ro_vim_item_update_ok.get("vim_message", ""),
869 )
870 )
872 return "DONE", ro_vim_item_update_ok
874 def refresh(self, ro_task):
875 """Call VIM to get vm status"""
876 ro_task_id = ro_task["_id"]
877 target_vim = self.my_vims[ro_task["target_id"]]
878 vim_id = ro_task["vim_info"]["vim_id"]
880 if not vim_id:
881 return None, None
883 vm_to_refresh_list = [vim_id]
884 try:
885 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
886 vim_info = vim_dict[vim_id]
888 if vim_info["status"] == "ACTIVE":
889 task_status = "DONE"
890 elif vim_info["status"] == "BUILD":
891 task_status = "BUILD"
892 else:
893 task_status = "FAILED"
895 # try to load and parse vim_information
896 try:
897 vim_info_info = yaml.safe_load(vim_info["vim_info"])
898 if vim_info_info.get("name"):
899 vim_info["name"] = vim_info_info["name"]
900 except Exception as vim_info_error:
901 self.logger.exception(
902 f"{vim_info_error} occured while getting the vim_info from yaml"
903 )
904 except vimconn.VimConnException as e:
905 # Mark all tasks at VIM_ERROR status
906 self.logger.error(
907 "ro_task={} vim={} get-vm={}: {}".format(
908 ro_task_id, ro_task["target_id"], vim_id, e
909 )
910 )
911 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
912 task_status = "FAILED"
914 ro_vim_item_update = {}
916 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
917 vim_interfaces = []
918 if vim_info.get("interfaces"):
919 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
920 iface = next(
921 (
922 iface
923 for iface in vim_info["interfaces"]
924 if vim_iface_id == iface["vim_interface_id"]
925 ),
926 None,
927 )
928 # if iface:
929 # iface.pop("vim_info", None)
930 vim_interfaces.append(iface)
932 task_create = next(
933 t
934 for t in ro_task["tasks"]
935 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
936 )
937 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
938 vim_interfaces[task_create["mgmt_vnf_interface"]][
939 "mgmt_vnf_interface"
940 ] = True
942 mgmt_vdu_iface = task_create.get(
943 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
944 )
945 if vim_interfaces:
946 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
948 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
949 ro_vim_item_update["interfaces"] = vim_interfaces
951 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
952 ro_vim_item_update["vim_status"] = vim_info["status"]
954 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
955 ro_vim_item_update["vim_name"] = vim_info.get("name")
957 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
958 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
959 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
960 elif vim_info["status"] == "DELETED":
961 ro_vim_item_update["vim_id"] = None
962 ro_vim_item_update["vim_message"] = "Deleted externally"
963 else:
964 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
965 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
967 if ro_vim_item_update:
968 self.logger.debug(
969 "ro_task={} {} get-vm={}: status={} {}".format(
970 ro_task_id,
971 ro_task["target_id"],
972 vim_id,
973 ro_vim_item_update.get("vim_status"),
974 (
975 ro_vim_item_update.get("vim_message")
976 if ro_vim_item_update.get("vim_status") != "ACTIVE"
977 else ""
978 ),
979 )
980 )
982 return task_status, ro_vim_item_update
984 def exec(self, ro_task, task_index, task_depends):
985 task = ro_task["tasks"][task_index]
986 task_id = task["task_id"]
987 target_vim = self.my_vims[ro_task["target_id"]]
988 db_task_update = {"retries": 0}
989 retries = task.get("retries", 0)
991 try:
992 params = task["params"]
993 params_copy = deepcopy(params)
994 params_copy["ro_key"] = self.db.decrypt(
995 params_copy.pop("private_key"),
996 params_copy.pop("schema_version"),
997 params_copy.pop("salt"),
998 )
999 params_copy["ip_addr"] = params_copy.pop("ip_address")
1000 target_vim.inject_user_key(**params_copy)
1001 self.logger.debug(
1002 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
1003 )
1005 return (
1006 "DONE",
1007 None,
1008 db_task_update,
1009 ) # params_copy["key"]
1010 except (vimconn.VimConnException, NsWorkerException) as e:
1011 retries += 1
1013 self.logger.debug(traceback.format_exc())
1014 if retries < self.max_retries_inject_ssh_key:
1015 return (
1016 "BUILD",
1017 None,
1018 {
1019 "retries": retries,
1020 "next_retry": self.time_retries_inject_ssh_key,
1021 },
1022 )
1024 self.logger.error(
1025 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
1026 )
1027 ro_vim_item_update = {"vim_message": str(e)}
1029 return "FAILED", ro_vim_item_update, db_task_update
1032class VimInteractionImage(VimInteractionBase):
1033 def new(self, ro_task, task_index, task_depends):
1034 task = ro_task["tasks"][task_index]
1035 task_id = task["task_id"]
1036 created = False
1037 created_items = {}
1038 target_vim = self.my_vims[ro_task["target_id"]]
1040 try:
1041 # FIND
1042 vim_image_id = ""
1043 if task.get("find_params"):
1044 vim_images = target_vim.get_image_list(
1045 task["find_params"].get("filter_dict", {})
1046 )
1048 if not vim_images:
1049 raise NsWorkerExceptionNotFound(
1050 "Image not found with this criteria: '{}'".format(
1051 task["find_params"]
1052 )
1053 )
1054 elif len(vim_images) > 1:
1055 raise NsWorkerException(
1056 "More than one image found with this criteria: '{}'".format(
1057 task["find_params"]
1058 )
1059 )
1060 else:
1061 vim_image_id = vim_images[0]["id"]
1063 ro_vim_item_update = {
1064 "vim_id": vim_image_id,
1065 "vim_status": "ACTIVE",
1066 "created": created,
1067 "created_items": created_items,
1068 "vim_details": None,
1069 "vim_message": None,
1070 }
1071 self.logger.debug(
1072 "task={} {} new-image={} created={}".format(
1073 task_id, ro_task["target_id"], vim_image_id, created
1074 )
1075 )
1077 return "DONE", ro_vim_item_update
1078 except (NsWorkerException, vimconn.VimConnException) as e:
1079 self.logger.error(
1080 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
1081 )
1082 ro_vim_item_update = {
1083 "vim_status": "VIM_ERROR",
1084 "created": created,
1085 "vim_message": str(e),
1086 }
1088 return "FAILED", ro_vim_item_update
1091class VimInteractionSharedVolume(VimInteractionBase):
1092 def delete(self, ro_task, task_index):
1093 task = ro_task["tasks"][task_index]
1094 task_id = task["task_id"]
1095 shared_volume_vim_id = ro_task["vim_info"]["vim_id"]
1096 created_items = ro_task["vim_info"]["created_items"]
1097 ro_vim_item_update_ok = {
1098 "vim_status": "DELETED",
1099 "created": False,
1100 "vim_message": "DELETED",
1101 "vim_id": None,
1102 }
1103 if created_items and created_items.get(shared_volume_vim_id).get("keep"):
1104 ro_vim_item_update_ok = {
1105 "vim_status": "ACTIVE",
1106 "created": False,
1107 "vim_message": None,
1108 }
1109 return "DONE", ro_vim_item_update_ok
1110 try:
1111 if shared_volume_vim_id:
1112 target_vim = self.my_vims[ro_task["target_id"]]
1113 target_vim.delete_shared_volumes(shared_volume_vim_id)
1114 except vimconn.VimConnNotFoundException:
1115 ro_vim_item_update_ok["vim_message"] = "already deleted"
1116 except vimconn.VimConnException as e:
1117 self.logger.error(
1118 "ro_task={} vim={} del-shared-volume={}: {}".format(
1119 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e
1120 )
1121 )
1122 ro_vim_item_update = {
1123 "vim_status": "VIM_ERROR",
1124 "vim_message": "Error while deleting: {}".format(e),
1125 }
1127 return "FAILED", ro_vim_item_update
1129 self.logger.debug(
1130 "task={} {} del-shared-volume={} {}".format(
1131 task_id,
1132 ro_task["target_id"],
1133 shared_volume_vim_id,
1134 ro_vim_item_update_ok.get("vim_message", ""),
1135 )
1136 )
1138 return "DONE", ro_vim_item_update_ok
1140 def new(self, ro_task, task_index, task_depends):
1141 task = ro_task["tasks"][task_index]
1142 task_id = task["task_id"]
1143 created = False
1144 created_items = {}
1145 target_vim = self.my_vims[ro_task["target_id"]]
1147 try:
1148 shared_volume_vim_id = None
1149 shared_volume_data = None
1151 if task.get("params"):
1152 shared_volume_data = task["params"]
1154 if shared_volume_data:
1155 self.logger.info(
1156 f"Creating the new shared_volume for {shared_volume_data}\n"
1157 )
1158 (
1159 shared_volume_name,
1160 shared_volume_vim_id,
1161 ) = target_vim.new_shared_volumes(shared_volume_data)
1162 created = True
1163 created_items[shared_volume_vim_id] = {
1164 "name": shared_volume_name,
1165 "keep": shared_volume_data.get("keep"),
1166 }
1168 ro_vim_item_update = {
1169 "vim_id": shared_volume_vim_id,
1170 "vim_status": "ACTIVE",
1171 "created": created,
1172 "created_items": created_items,
1173 "vim_details": None,
1174 "vim_message": None,
1175 }
1176 self.logger.debug(
1177 "task={} {} new-shared-volume={} created={}".format(
1178 task_id, ro_task["target_id"], shared_volume_vim_id, created
1179 )
1180 )
1182 return "DONE", ro_vim_item_update
1183 except (vimconn.VimConnException, NsWorkerException) as e:
1184 self.logger.error(
1185 "task={} vim={} new-shared-volume:"
1186 " {}".format(task_id, ro_task["target_id"], e)
1187 )
1188 ro_vim_item_update = {
1189 "vim_status": "VIM_ERROR",
1190 "created": created,
1191 "vim_message": str(e),
1192 }
1194 return "FAILED", ro_vim_item_update
1197class VimInteractionFlavor(VimInteractionBase):
1198 def delete(self, ro_task, task_index):
1199 task = ro_task["tasks"][task_index]
1200 task_id = task["task_id"]
1201 flavor_vim_id = ro_task["vim_info"]["vim_id"]
1202 ro_vim_item_update_ok = {
1203 "vim_status": "DELETED",
1204 "created": False,
1205 "vim_message": "DELETED",
1206 "vim_id": None,
1207 }
1209 try:
1210 if flavor_vim_id:
1211 target_vim = self.my_vims[ro_task["target_id"]]
1212 target_vim.delete_flavor(flavor_vim_id)
1213 except vimconn.VimConnNotFoundException:
1214 ro_vim_item_update_ok["vim_message"] = "already deleted"
1215 except vimconn.VimConnException as e:
1216 self.logger.error(
1217 "ro_task={} vim={} del-flavor={}: {}".format(
1218 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
1219 )
1220 )
1221 ro_vim_item_update = {
1222 "vim_status": "VIM_ERROR",
1223 "vim_message": "Error while deleting: {}".format(e),
1224 }
1226 return "FAILED", ro_vim_item_update
1228 self.logger.debug(
1229 "task={} {} del-flavor={} {}".format(
1230 task_id,
1231 ro_task["target_id"],
1232 flavor_vim_id,
1233 ro_vim_item_update_ok.get("vim_message", ""),
1234 )
1235 )
1237 return "DONE", ro_vim_item_update_ok
1239 def new(self, ro_task, task_index, task_depends):
1240 task = ro_task["tasks"][task_index]
1241 task_id = task["task_id"]
1242 created = False
1243 created_items = {}
1244 target_vim = self.my_vims[ro_task["target_id"]]
1245 try:
1246 # FIND
1247 vim_flavor_id = None
1249 if task.get("find_params", {}).get("vim_flavor_id"):
1250 vim_flavor_id = task["find_params"]["vim_flavor_id"]
1251 db_nsr = self.db.get_one("nsrs", {"_id": task["nsr_id"]})
1252 for vnfr_id in db_nsr.get("constituent-vnfr-ref"):
1253 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1254 for each_flavor in db_nsr["flavor"]:
1255 nsd_flavor_id = each_flavor["id"]
1256 for vdur in db_vnfr["vdur"]:
1257 if (
1258 vdur.get("ns-flavor-id")
1259 and vdur.get("ns-flavor-id") == nsd_flavor_id
1260 ):
1261 if vdur["additionalParams"]["OSM"].get("vim_flavor_id"):
1262 flavor_id = vdur["additionalParams"]["OSM"][
1263 "vim_flavor_id"
1264 ]
1265 flavor_details = target_vim.get_flavor(flavor_id)
1266 flavor_dict = {
1267 "memory-mb": flavor_details["ram"],
1268 "storage-gb": flavor_details["disk"],
1269 "vcpu-count": flavor_details["vcpus"],
1270 }
1271 each_flavor.update(flavor_dict)
1272 self.db.set_one("nsrs", {"_id": task["nsr_id"]}, db_nsr)
1273 elif task.get("find_params", {}).get("vim_flavor_name"):
1274 db_nsr = self.db.get_one("nsrs", {"_id": task["nsr_id"]})
1275 for vnfr_id in db_nsr.get("constituent-vnfr-ref"):
1276 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1277 for each_flavor in db_nsr["flavor"]:
1278 nsd_flavor_id = each_flavor["id"]
1279 for vdur in db_vnfr["vdur"]:
1280 if vdur.get("ns-flavor-id") == nsd_flavor_id:
1281 if vdur["additionalParams"]["OSM"].get(
1282 "vim_flavor_name"
1283 ):
1284 flavor_name = vdur["additionalParams"]["OSM"][
1285 "vim_flavor_name"
1286 ]
1287 flavor_details = target_vim.get_flavor(
1288 flavor_name=flavor_name
1289 )
1290 flavor_dict = {
1291 "memory-mb": flavor_details["ram"],
1292 "storage-gb": flavor_details["disk"],
1293 "vcpu-count": flavor_details["vcpus"],
1294 }
1295 each_flavor.update(flavor_dict)
1296 vim_flavor_id = flavor_details.get("id")
1297 self.db.set_one("nsrs", {"_id": task["nsr_id"]}, db_nsr)
1298 elif task.get("find_params", {}).get("flavor_data"):
1299 try:
1300 flavor_data = task["find_params"]["flavor_data"]
1301 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
1302 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
1303 self.logger.warning(
1304 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
1305 )
1307 if not vim_flavor_id and task.get("params"):
1308 # CREATE
1309 flavor_data = task["params"]["flavor_data"]
1310 vim_flavor_id = target_vim.new_flavor(flavor_data)
1311 created = True
1313 ro_vim_item_update = {
1314 "vim_id": vim_flavor_id,
1315 "vim_status": "ACTIVE",
1316 "created": created,
1317 "created_items": created_items,
1318 "vim_details": None,
1319 "vim_message": None,
1320 }
1321 self.logger.debug(
1322 "task={} {} new-flavor={} created={}".format(
1323 task_id, ro_task["target_id"], vim_flavor_id, created
1324 )
1325 )
1327 return "DONE", ro_vim_item_update
1328 except (vimconn.VimConnException, NsWorkerException) as e:
1329 self.logger.error(
1330 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
1331 )
1332 ro_vim_item_update = {
1333 "vim_status": "VIM_ERROR",
1334 "created": created,
1335 "vim_message": str(e),
1336 }
1338 return "FAILED", ro_vim_item_update
1341class VimInteractionAffinityGroup(VimInteractionBase):
1342 def delete(self, ro_task, task_index):
1343 task = ro_task["tasks"][task_index]
1344 task_id = task["task_id"]
1345 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
1346 ro_vim_item_update_ok = {
1347 "vim_status": "DELETED",
1348 "created": False,
1349 "vim_message": "DELETED",
1350 "vim_id": None,
1351 }
1353 try:
1354 if affinity_group_vim_id:
1355 target_vim = self.my_vims[ro_task["target_id"]]
1356 target_vim.delete_affinity_group(affinity_group_vim_id)
1357 except vimconn.VimConnNotFoundException:
1358 ro_vim_item_update_ok["vim_message"] = "already deleted"
1359 except vimconn.VimConnException as e:
1360 self.logger.error(
1361 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
1362 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
1363 )
1364 )
1365 ro_vim_item_update = {
1366 "vim_status": "VIM_ERROR",
1367 "vim_message": "Error while deleting: {}".format(e),
1368 }
1370 return "FAILED", ro_vim_item_update
1372 self.logger.debug(
1373 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
1374 task_id,
1375 ro_task["target_id"],
1376 affinity_group_vim_id,
1377 ro_vim_item_update_ok.get("vim_message", ""),
1378 )
1379 )
1381 return "DONE", ro_vim_item_update_ok
1383 def new(self, ro_task, task_index, task_depends):
1384 task = ro_task["tasks"][task_index]
1385 task_id = task["task_id"]
1386 created = False
1387 created_items = {}
1388 target_vim = self.my_vims[ro_task["target_id"]]
1390 try:
1391 affinity_group_vim_id = None
1392 affinity_group_data = None
1393 param_affinity_group_id = ""
1395 if task.get("params"):
1396 affinity_group_data = task["params"].get("affinity_group_data")
1398 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
1399 try:
1400 param_affinity_group_id = task["params"]["affinity_group_data"].get(
1401 "vim-affinity-group-id"
1402 )
1403 affinity_group_vim_id = target_vim.get_affinity_group(
1404 param_affinity_group_id
1405 ).get("id")
1406 except vimconn.VimConnNotFoundException:
1407 self.logger.error(
1408 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
1409 "could not be found at VIM. Creating a new one.".format(
1410 task_id, ro_task["target_id"], param_affinity_group_id
1411 )
1412 )
1414 if not affinity_group_vim_id and affinity_group_data:
1415 affinity_group_vim_id = target_vim.new_affinity_group(
1416 affinity_group_data
1417 )
1418 created = True
1420 ro_vim_item_update = {
1421 "vim_id": affinity_group_vim_id,
1422 "vim_status": "ACTIVE",
1423 "created": created,
1424 "created_items": created_items,
1425 "vim_details": None,
1426 "vim_message": None,
1427 }
1428 self.logger.debug(
1429 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
1430 task_id, ro_task["target_id"], affinity_group_vim_id, created
1431 )
1432 )
1434 return "DONE", ro_vim_item_update
1435 except (vimconn.VimConnException, NsWorkerException) as e:
1436 self.logger.error(
1437 "task={} vim={} new-affinity-or-anti-affinity-group:"
1438 " {}".format(task_id, ro_task["target_id"], e)
1439 )
1440 ro_vim_item_update = {
1441 "vim_status": "VIM_ERROR",
1442 "created": created,
1443 "vim_message": str(e),
1444 }
1446 return "FAILED", ro_vim_item_update
1449class VimInteractionUpdateVdu(VimInteractionBase):
1450 def exec(self, ro_task, task_index, task_depends):
1451 task = ro_task["tasks"][task_index]
1452 task_id = task["task_id"]
1453 db_task_update = {"retries": 0}
1454 target_vim = self.my_vims[ro_task["target_id"]]
1456 try:
1457 vim_vm_id = ""
1458 if task.get("params"):
1459 vim_vm_id = task["params"].get("vim_vm_id")
1460 action = task["params"].get("action")
1461 context = {action: action}
1462 target_vim.action_vminstance(vim_vm_id, context)
1463 # created = True
1464 ro_vim_item_update = {
1465 "vim_id": vim_vm_id,
1466 "vim_status": "ACTIVE",
1467 }
1468 self.logger.debug(
1469 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1470 )
1471 return "DONE", ro_vim_item_update, db_task_update
1472 except (vimconn.VimConnException, NsWorkerException) as e:
1473 self.logger.error(
1474 "task={} vim={} VM Migration:"
1475 " {}".format(task_id, ro_task["target_id"], e)
1476 )
1477 ro_vim_item_update = {
1478 "vim_status": "VIM_ERROR",
1479 "vim_message": str(e),
1480 }
1482 return "FAILED", ro_vim_item_update, db_task_update
1485class VimInteractionSdnNet(VimInteractionBase):
1486 @staticmethod
1487 def _match_pci(port_pci, mapping):
1488 """
1489 Check if port_pci matches with mapping.
1490 The mapping can have brackets to indicate that several chars are accepted. e.g
1491 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1492 :param port_pci: text
1493 :param mapping: text, can contain brackets to indicate several chars are available
1494 :return: True if matches, False otherwise
1495 """
1496 if not port_pci or not mapping:
1497 return False
1498 if port_pci == mapping:
1499 return True
1501 mapping_index = 0
1502 pci_index = 0
1503 while True:
1504 bracket_start = mapping.find("[", mapping_index)
1506 if bracket_start == -1:
1507 break
1509 bracket_end = mapping.find("]", bracket_start)
1510 if bracket_end == -1:
1511 break
1513 length = bracket_start - mapping_index
1514 if (
1515 length
1516 and port_pci[pci_index : pci_index + length]
1517 != mapping[mapping_index:bracket_start]
1518 ):
1519 return False
1521 if (
1522 port_pci[pci_index + length]
1523 not in mapping[bracket_start + 1 : bracket_end]
1524 ):
1525 return False
1527 pci_index += length + 1
1528 mapping_index = bracket_end + 1
1530 if port_pci[pci_index:] != mapping[mapping_index:]:
1531 return False
1533 return True
1535 def _get_interfaces(self, vlds_to_connect, vim_account_id):
1536 """
1537 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1538 :param vim_account_id:
1539 :return:
1540 """
1541 interfaces = []
1543 for vld in vlds_to_connect:
1544 table, _, db_id = vld.partition(":")
1545 db_id, _, vld = db_id.partition(":")
1546 _, _, vld_id = vld.partition(".")
1548 if table == "vnfrs":
1549 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1550 iface_key = "vnf-vld-id"
1551 else: # table == "nsrs"
1552 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1553 iface_key = "ns-vld-id"
1555 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1557 for db_vnfr in db_vnfrs:
1558 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1559 for iface_index, interface in enumerate(vdur["interfaces"]):
1560 if interface.get(iface_key) == vld_id and interface.get(
1561 "type"
1562 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1563 # only SR-IOV o PT
1564 interface_ = interface.copy()
1565 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1566 db_vnfr["_id"], vdu_index, iface_index
1567 )
1569 if vdur.get("status") == "ERROR":
1570 interface_["status"] = "ERROR"
1572 interfaces.append(interface_)
1574 return interfaces
1576 def refresh(self, ro_task):
1577 # look for task create
1578 task_create_index, _ = next(
1579 i_t
1580 for i_t in enumerate(ro_task["tasks"])
1581 if i_t[1]
1582 and i_t[1]["action"] == "CREATE"
1583 and i_t[1]["status"] != "FINISHED"
1584 )
1586 return self.new(ro_task, task_create_index, None)
1588 def new(self, ro_task, task_index, task_depends):
1589 task = ro_task["tasks"][task_index]
1590 task_id = task["task_id"]
1591 target_vim = self.my_vims[ro_task["target_id"]]
1593 sdn_net_id = ro_task["vim_info"]["vim_id"]
1595 created_items = ro_task["vim_info"].get("created_items")
1596 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1597 new_connected_ports = []
1598 last_update = ro_task["vim_info"].get("last_update", 0)
1599 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1600 error_list = []
1601 created = ro_task["vim_info"].get("created", False)
1603 try:
1604 # CREATE
1605 db_vim = {}
1606 params = task["params"]
1607 vlds_to_connect = params.get("vlds", [])
1608 associated_vim = params.get("target_vim")
1609 # external additional ports
1610 additional_ports = params.get("sdn-ports") or ()
1611 _, _, vim_account_id = (
1612 (None, None, None)
1613 if associated_vim is None
1614 else associated_vim.partition(":")
1615 )
1617 if associated_vim:
1618 # get associated VIM
1619 if associated_vim not in self.db_vims:
1620 self.db_vims[associated_vim] = self.db.get_one(
1621 "vim_accounts", {"_id": vim_account_id}
1622 )
1624 db_vim = self.db_vims[associated_vim]
1626 # look for ports to connect
1627 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1628 # print(ports)
1630 sdn_ports = []
1631 pending_ports = error_ports = 0
1632 vlan_used = None
1633 sdn_need_update = False
1635 for port in ports:
1636 vlan_used = port.get("vlan") or vlan_used
1638 # TODO. Do not connect if already done
1639 if not port.get("compute_node") or not port.get("pci"):
1640 if port.get("status") == "ERROR":
1641 error_ports += 1
1642 else:
1643 pending_ports += 1
1644 continue
1646 pmap = None
1647 compute_node_mappings = next(
1648 (
1649 c
1650 for c in db_vim["config"].get("sdn-port-mapping", ())
1651 if c and c["compute_node"] == port["compute_node"]
1652 ),
1653 None,
1654 )
1656 if compute_node_mappings:
1657 # process port_mapping pci of type 0000:af:1[01].[1357]
1658 pmap = next(
1659 (
1660 p
1661 for p in compute_node_mappings["ports"]
1662 if self._match_pci(port["pci"], p.get("pci"))
1663 ),
1664 None,
1665 )
1667 if not pmap:
1668 if not db_vim["config"].get("mapping_not_needed"):
1669 error_list.append(
1670 "Port mapping not found for compute_node={} pci={}".format(
1671 port["compute_node"], port["pci"]
1672 )
1673 )
1674 continue
1676 pmap = {}
1678 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1679 new_port = {
1680 "service_endpoint_id": pmap.get("service_endpoint_id")
1681 or service_endpoint_id,
1682 "service_endpoint_encapsulation_type": (
1683 "dot1q" if port["type"] == "SR-IOV" else None
1684 ),
1685 "service_endpoint_encapsulation_info": {
1686 "vlan": port.get("vlan"),
1687 "mac": port.get("mac-address"),
1688 "device_id": pmap.get("device_id") or port["compute_node"],
1689 "device_interface_id": pmap.get("device_interface_id")
1690 or port["pci"],
1691 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1692 "switch_port": pmap.get("switch_port"),
1693 "service_mapping_info": pmap.get("service_mapping_info"),
1694 },
1695 }
1697 # TODO
1698 # if port["modified_at"] > last_update:
1699 # sdn_need_update = True
1700 new_connected_ports.append(port["id"]) # TODO
1701 sdn_ports.append(new_port)
1703 if error_ports:
1704 error_list.append(
1705 "{} interfaces have not been created as VDU is on ERROR status".format(
1706 error_ports
1707 )
1708 )
1710 # connect external ports
1711 for index, additional_port in enumerate(additional_ports):
1712 additional_port_id = additional_port.get(
1713 "service_endpoint_id"
1714 ) or "external-{}".format(index)
1715 sdn_ports.append(
1716 {
1717 "service_endpoint_id": additional_port_id,
1718 "service_endpoint_encapsulation_type": additional_port.get(
1719 "service_endpoint_encapsulation_type", "dot1q"
1720 ),
1721 "service_endpoint_encapsulation_info": {
1722 "vlan": additional_port.get("vlan") or vlan_used,
1723 "mac": additional_port.get("mac_address"),
1724 "device_id": additional_port.get("device_id"),
1725 "device_interface_id": additional_port.get(
1726 "device_interface_id"
1727 ),
1728 "switch_dpid": additional_port.get("switch_dpid")
1729 or additional_port.get("switch_id"),
1730 "switch_port": additional_port.get("switch_port"),
1731 "service_mapping_info": additional_port.get(
1732 "service_mapping_info"
1733 ),
1734 },
1735 }
1736 )
1737 new_connected_ports.append(additional_port_id)
1738 sdn_info = ""
1740 # if there are more ports to connect or they have been modified, call create/update
1741 if error_list:
1742 sdn_status = "ERROR"
1743 sdn_info = "; ".join(error_list)
1744 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1745 last_update = time.time()
1747 if not sdn_net_id:
1748 if len(sdn_ports) < 2:
1749 sdn_status = "ACTIVE"
1751 if not pending_ports:
1752 self.logger.debug(
1753 "task={} {} new-sdn-net done, less than 2 ports".format(
1754 task_id, ro_task["target_id"]
1755 )
1756 )
1757 else:
1758 net_type = params.get("type") or "ELAN"
1759 (
1760 sdn_net_id,
1761 created_items,
1762 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1763 created = True
1764 self.logger.debug(
1765 "task={} {} new-sdn-net={} created={}".format(
1766 task_id, ro_task["target_id"], sdn_net_id, created
1767 )
1768 )
1769 else:
1770 created_items = target_vim.edit_connectivity_service(
1771 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1772 )
1773 created = True
1774 self.logger.debug(
1775 "task={} {} update-sdn-net={} created={}".format(
1776 task_id, ro_task["target_id"], sdn_net_id, created
1777 )
1778 )
1780 connected_ports = new_connected_ports
1781 elif sdn_net_id:
1782 wim_status_dict = target_vim.get_connectivity_service_status(
1783 sdn_net_id, conn_info=created_items
1784 )
1785 sdn_status = wim_status_dict["sdn_status"]
1787 if wim_status_dict.get("sdn_info"):
1788 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1790 if wim_status_dict.get("error_msg"):
1791 sdn_info = wim_status_dict.get("error_msg") or ""
1793 if pending_ports:
1794 if sdn_status != "ERROR":
1795 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1796 len(ports) - pending_ports, len(ports)
1797 )
1799 if sdn_status == "ACTIVE":
1800 sdn_status = "BUILD"
1802 ro_vim_item_update = {
1803 "vim_id": sdn_net_id,
1804 "vim_status": sdn_status,
1805 "created": created,
1806 "created_items": created_items,
1807 "connected_ports": connected_ports,
1808 "vim_details": sdn_info,
1809 "vim_message": None,
1810 "last_update": last_update,
1811 }
1813 return sdn_status, ro_vim_item_update
1814 except Exception as e:
1815 self.logger.error(
1816 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1817 exc_info=not isinstance(
1818 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1819 ),
1820 )
1821 ro_vim_item_update = {
1822 "vim_status": "VIM_ERROR",
1823 "created": created,
1824 "vim_message": str(e),
1825 }
1827 return "FAILED", ro_vim_item_update
1829 def delete(self, ro_task, task_index):
1830 task = ro_task["tasks"][task_index]
1831 task_id = task["task_id"]
1832 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1833 ro_vim_item_update_ok = {
1834 "vim_status": "DELETED",
1835 "created": False,
1836 "vim_message": "DELETED",
1837 "vim_id": None,
1838 }
1840 try:
1841 if sdn_vim_id:
1842 target_vim = self.my_vims[ro_task["target_id"]]
1843 target_vim.delete_connectivity_service(
1844 sdn_vim_id, ro_task["vim_info"].get("created_items")
1845 )
1847 except Exception as e:
1848 if (
1849 isinstance(e, sdnconn.SdnConnectorError)
1850 and e.http_code == HTTPStatus.NOT_FOUND.value
1851 ):
1852 ro_vim_item_update_ok["vim_message"] = "already deleted"
1853 else:
1854 self.logger.error(
1855 "ro_task={} vim={} del-sdn-net={}: {}".format(
1856 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1857 ),
1858 exc_info=not isinstance(
1859 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1860 ),
1861 )
1862 ro_vim_item_update = {
1863 "vim_status": "VIM_ERROR",
1864 "vim_message": "Error while deleting: {}".format(e),
1865 }
1867 return "FAILED", ro_vim_item_update
1869 self.logger.debug(
1870 "task={} {} del-sdn-net={} {}".format(
1871 task_id,
1872 ro_task["target_id"],
1873 sdn_vim_id,
1874 ro_vim_item_update_ok.get("vim_message", ""),
1875 )
1876 )
1878 return "DONE", ro_vim_item_update_ok
1881class VimInteractionMigration(VimInteractionBase):
1882 def exec(self, ro_task, task_index, task_depends):
1883 task = ro_task["tasks"][task_index]
1884 task_id = task["task_id"]
1885 db_task_update = {"retries": 0}
1886 target_vim = self.my_vims[ro_task["target_id"]]
1887 vim_interfaces = []
1888 refreshed_vim_info = {}
1890 try:
1891 vim_vm_id = ""
1892 if task.get("params"):
1893 vim_vm_id = task["params"].get("vim_vm_id")
1894 migrate_host = task["params"].get("migrate_host")
1895 _, migrated_compute_node = target_vim.migrate_instance(
1896 vim_vm_id, migrate_host
1897 )
1899 if migrated_compute_node:
1900 # When VM is migrated, vdu["vim_info"] needs to be updated
1901 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1902 ro_task["target_id"]
1903 )
1905 # Refresh VM to get new vim_info
1906 vm_to_refresh_list = [vim_vm_id]
1907 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1908 refreshed_vim_info = vim_dict[vim_vm_id]
1910 if refreshed_vim_info.get("interfaces"):
1911 for old_iface in vdu_old_vim_info.get("interfaces"):
1912 iface = next(
1913 (
1914 iface
1915 for iface in refreshed_vim_info["interfaces"]
1916 if old_iface["vim_interface_id"]
1917 == iface["vim_interface_id"]
1918 ),
1919 None,
1920 )
1921 vim_interfaces.append(iface)
1923 ro_vim_item_update = {
1924 "vim_id": vim_vm_id,
1925 "vim_status": "ACTIVE",
1926 "vim_details": None,
1927 "vim_message": None,
1928 }
1930 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1931 "ERROR",
1932 "VIM_ERROR",
1933 ):
1934 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1936 if vim_interfaces:
1937 ro_vim_item_update["interfaces"] = vim_interfaces
1939 self.logger.debug(
1940 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1941 )
1943 return "DONE", ro_vim_item_update, db_task_update
1945 except (vimconn.VimConnException, NsWorkerException) as e:
1946 self.logger.error(
1947 "task={} vim={} VM Migration:"
1948 " {}".format(task_id, ro_task["target_id"], e)
1949 )
1950 ro_vim_item_update = {
1951 "vim_status": "VIM_ERROR",
1952 "vim_message": str(e),
1953 }
1955 return "FAILED", ro_vim_item_update, db_task_update
1958class VimInteractionResize(VimInteractionBase):
1959 def exec(self, ro_task, task_index, task_depends):
1960 task = ro_task["tasks"][task_index]
1961 task_id = task["task_id"]
1962 db_task_update = {"retries": 0}
1963 target_flavor_uuid = None
1964 refreshed_vim_info = {}
1965 target_vim = self.my_vims[ro_task["target_id"]]
1967 try:
1968 params = task["params"]
1969 params_copy = deepcopy(params)
1970 target_flavor_uuid = task_depends[params_copy["flavor_id"]]
1971 vim_vm_id = ""
1972 if task.get("params"):
1973 self.logger.info("vim_vm_id %s", vim_vm_id)
1975 if target_flavor_uuid is not None:
1976 resized_status = target_vim.resize_instance(
1977 vim_vm_id, target_flavor_uuid
1978 )
1980 if resized_status:
1981 # Refresh VM to get new vim_info
1982 vm_to_refresh_list = [vim_vm_id]
1983 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1984 refreshed_vim_info = vim_dict[vim_vm_id]
1986 ro_vim_item_update = {
1987 "vim_id": vim_vm_id,
1988 "vim_status": "ACTIVE",
1989 "vim_details": None,
1990 "vim_message": None,
1991 }
1993 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1994 "ERROR",
1995 "VIM_ERROR",
1996 ):
1997 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1999 self.logger.debug(
2000 "task={} {} resize done".format(task_id, ro_task["target_id"])
2001 )
2002 return "DONE", ro_vim_item_update, db_task_update
2003 except (vimconn.VimConnException, NsWorkerException) as e:
2004 self.logger.error(
2005 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
2006 )
2007 ro_vim_item_update = {
2008 "vim_status": "VIM_ERROR",
2009 "vim_message": str(e),
2010 }
2012 return "FAILED", ro_vim_item_update, db_task_update
2015class ConfigValidate:
2016 def __init__(self, config: Dict):
2017 self.conf = config
2019 @property
2020 def active(self):
2021 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
2022 if (
2023 self.conf["period"]["refresh_active"] >= 60
2024 or self.conf["period"]["refresh_active"] == -1
2025 ):
2026 return self.conf["period"]["refresh_active"]
2028 return 60
2030 @property
2031 def build(self):
2032 return self.conf["period"]["refresh_build"]
2034 @property
2035 def image(self):
2036 return self.conf["period"]["refresh_image"]
2038 @property
2039 def error(self):
2040 return self.conf["period"]["refresh_error"]
2042 @property
2043 def queue_size(self):
2044 return self.conf["period"]["queue_size"]
2047class NsWorker(threading.Thread):
2048 def __init__(self, worker_index, config, plugins, db):
2049 """
2050 :param worker_index: thread index
2051 :param config: general configuration of RO, among others the process_id with the docker id where it runs
2052 :param plugins: global shared dict with the loaded plugins
2053 :param db: database class instance to use
2054 """
2055 threading.Thread.__init__(self)
2056 self.config = config
2057 self.plugins = plugins
2058 self.plugin_name = "unknown"
2059 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
2060 self.worker_index = worker_index
2061 # refresh periods for created items
2062 self.refresh_config = ConfigValidate(config)
2063 self.task_queue = queue.Queue(self.refresh_config.queue_size)
2064 # targetvim: vimplugin class
2065 self.my_vims = {}
2066 # targetvim: vim information from database
2067 self.db_vims = {}
2068 # targetvim list
2069 self.vim_targets = []
2070 self.my_id = config["process_id"] + ":" + str(worker_index)
2071 self.db = db
2072 self.item2class = {
2073 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
2074 "shared-volumes": VimInteractionSharedVolume(
2075 self.db, self.my_vims, self.db_vims, self.logger
2076 ),
2077 "classification": VimInteractionClassification(
2078 self.db, self.my_vims, self.db_vims, self.logger
2079 ),
2080 "sfi": VimInteractionSfi(self.db, self.my_vims, self.db_vims, self.logger),
2081 "sf": VimInteractionSf(self.db, self.my_vims, self.db_vims, self.logger),
2082 "sfp": VimInteractionSfp(self.db, self.my_vims, self.db_vims, self.logger),
2083 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
2084 "image": VimInteractionImage(
2085 self.db, self.my_vims, self.db_vims, self.logger
2086 ),
2087 "flavor": VimInteractionFlavor(
2088 self.db, self.my_vims, self.db_vims, self.logger
2089 ),
2090 "sdn_net": VimInteractionSdnNet(
2091 self.db, self.my_vims, self.db_vims, self.logger
2092 ),
2093 "update": VimInteractionUpdateVdu(
2094 self.db, self.my_vims, self.db_vims, self.logger
2095 ),
2096 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
2097 self.db, self.my_vims, self.db_vims, self.logger
2098 ),
2099 "migrate": VimInteractionMigration(
2100 self.db, self.my_vims, self.db_vims, self.logger
2101 ),
2102 "verticalscale": VimInteractionResize(
2103 self.db, self.my_vims, self.db_vims, self.logger
2104 ),
2105 }
2106 self.time_last_task_processed = None
2107 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
2108 self.tasks_to_delete = []
2109 # it is idle when there are not vim_targets associated
2110 self.idle = True
2111 self.task_locked_time = config["global"]["task_locked_time"]
2113 def insert_task(self, task):
2114 try:
2115 self.task_queue.put(task, False)
2116 return None
2117 except queue.Full:
2118 raise NsWorkerException("timeout inserting a task")
2120 def terminate(self):
2121 self.insert_task("exit")
2123 def del_task(self, task):
2124 with self.task_lock:
2125 if task["status"] == "SCHEDULED":
2126 task["status"] = "SUPERSEDED"
2127 return True
2128 else: # task["status"] == "processing"
2129 self.task_lock.release()
2130 return False
2132 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
2133 """
2134 Process vim config, creating vim configuration files as ca_cert
2135 :param target_id: vim/sdn/wim + id
2136 :param db_vim: Vim dictionary obtained from database
2137 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
2138 """
2139 if not db_vim.get("config"):
2140 return
2142 file_name = ""
2143 work_dir = "/app/osm_ro/certs"
2145 try:
2146 if db_vim["config"].get("ca_cert_content"):
2147 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
2149 if not path.isdir(file_name):
2150 makedirs(file_name)
2152 file_name = file_name + "/ca_cert"
2154 with open(file_name, "w") as f:
2155 f.write(db_vim["config"]["ca_cert_content"])
2156 del db_vim["config"]["ca_cert_content"]
2157 db_vim["config"]["ca_cert"] = file_name
2158 except Exception as e:
2159 raise NsWorkerException(
2160 "Error writing to file '{}': {}".format(file_name, e)
2161 )
2163 def _load_plugin(self, name, type="vim"):
2164 # type can be vim or sdn
2165 if "rovim_dummy" not in self.plugins:
2166 self.plugins["rovim_dummy"] = VimDummyConnector
2168 if "rosdn_dummy" not in self.plugins:
2169 self.plugins["rosdn_dummy"] = SdnDummyConnector
2171 if name in self.plugins:
2172 return self.plugins[name]
2174 try:
2175 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
2176 self.plugins[name] = ep.load()
2177 except Exception as e:
2178 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
2180 if name and name not in self.plugins:
2181 raise NsWorkerException(
2182 "Plugin 'osm_{n}' has not been installed".format(n=name)
2183 )
2185 return self.plugins[name]
2187 def _unload_vim(self, target_id):
2188 """
2189 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
2190 :param target_id: Contains type:_id; where type can be 'vim', ...
2191 :return: None.
2192 """
2193 try:
2194 self.db_vims.pop(target_id, None)
2195 self.my_vims.pop(target_id, None)
2197 if target_id in self.vim_targets:
2198 self.vim_targets.remove(target_id)
2200 self.logger.info("Unloaded {}".format(target_id))
2201 except Exception as e:
2202 self.logger.error("Cannot unload {}: {}".format(target_id, e))
2204 def _check_vim(self, target_id):
2205 """
2206 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
2207 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
2208 :return: None.
2209 """
2210 target, _, _id = target_id.partition(":")
2211 now = time.time()
2212 update_dict = {}
2213 unset_dict = {}
2214 op_text = ""
2215 step = ""
2216 loaded = target_id in self.vim_targets
2217 target_database = (
2218 "vim_accounts"
2219 if target == "vim"
2220 else "wim_accounts" if target == "wim" else "sdns"
2221 )
2222 error_text = ""
2224 try:
2225 step = "Getting {} from db".format(target_id)
2226 db_vim = self.db.get_one(target_database, {"_id": _id})
2228 for op_index, operation in enumerate(
2229 db_vim["_admin"].get("operations", ())
2230 ):
2231 if operation["operationState"] != "PROCESSING":
2232 continue
2234 locked_at = operation.get("locked_at")
2236 if locked_at is not None and locked_at >= now - self.task_locked_time:
2237 # some other thread is doing this operation
2238 return
2240 # lock
2241 op_text = "_admin.operations.{}.".format(op_index)
2243 if not self.db.set_one(
2244 target_database,
2245 q_filter={
2246 "_id": _id,
2247 op_text + "operationState": "PROCESSING",
2248 op_text + "locked_at": locked_at,
2249 },
2250 update_dict={
2251 op_text + "locked_at": now,
2252 "admin.current_operation": op_index,
2253 },
2254 fail_on_empty=False,
2255 ):
2256 return
2258 unset_dict[op_text + "locked_at"] = None
2259 unset_dict["current_operation"] = None
2260 step = "Loading " + target_id
2261 error_text = self._load_vim(target_id)
2263 if not error_text:
2264 step = "Checking connectivity"
2266 if target == "vim":
2267 self.my_vims[target_id].check_vim_connectivity()
2268 else:
2269 self.my_vims[target_id].check_credentials()
2271 update_dict["_admin.operationalState"] = "ENABLED"
2272 update_dict["_admin.detailed-status"] = ""
2273 unset_dict[op_text + "detailed-status"] = None
2274 update_dict[op_text + "operationState"] = "COMPLETED"
2276 return
2278 except Exception as e:
2279 error_text = "{}: {}".format(step, e)
2280 self.logger.error("{} for {}: {}".format(step, target_id, e))
2282 finally:
2283 if update_dict or unset_dict:
2284 if error_text:
2285 update_dict[op_text + "operationState"] = "FAILED"
2286 update_dict[op_text + "detailed-status"] = error_text
2287 unset_dict.pop(op_text + "detailed-status", None)
2288 update_dict["_admin.operationalState"] = "ERROR"
2289 update_dict["_admin.detailed-status"] = error_text
2291 if op_text:
2292 update_dict[op_text + "statusEnteredTime"] = now
2294 self.db.set_one(
2295 target_database,
2296 q_filter={"_id": _id},
2297 update_dict=update_dict,
2298 unset=unset_dict,
2299 fail_on_empty=False,
2300 )
2302 if not loaded:
2303 self._unload_vim(target_id)
2305 def _reload_vim(self, target_id):
2306 if target_id in self.vim_targets:
2307 self._load_vim(target_id)
2308 else:
2309 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
2310 # just remove it to force load again next time it is needed
2311 self.db_vims.pop(target_id, None)
2313 def _load_vim(self, target_id):
2314 """
2315 Load or reload a vim_account, sdn_controller or wim_account.
2316 Read content from database, load the plugin if not loaded.
2317 In case of error loading the plugin, it loads a failing VIM_connector
2318 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
2319 :param target_id: Contains type:_id; where type can be 'vim', ...
2320 :return: None if ok, descriptive text if error
2321 """
2322 target, _, _id = target_id.partition(":")
2323 target_database = (
2324 "vim_accounts"
2325 if target == "vim"
2326 else "wim_accounts" if target == "wim" else "sdns"
2327 )
2328 plugin_name = ""
2329 vim = None
2330 step = "Getting {}={} from db".format(target, _id)
2332 try:
2333 # TODO process for wim, sdnc, ...
2334 vim = self.db.get_one(target_database, {"_id": _id})
2336 # if deep_get(vim, "config", "sdn-controller"):
2337 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
2338 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
2340 step = "Decrypting password"
2341 schema_version = vim.get("schema_version")
2342 self.db.encrypt_decrypt_fields(
2343 vim,
2344 "decrypt",
2345 fields=("password", "secret"),
2346 schema_version=schema_version,
2347 salt=_id,
2348 )
2349 self._process_vim_config(target_id, vim)
2351 if target == "vim":
2352 plugin_name = "rovim_" + vim["vim_type"]
2353 step = "Loading plugin '{}'".format(plugin_name)
2354 vim_module_conn = self._load_plugin(plugin_name)
2355 step = "Loading {}'".format(target_id)
2356 self.my_vims[target_id] = vim_module_conn(
2357 uuid=vim["_id"],
2358 name=vim["name"],
2359 tenant_id=vim.get("vim_tenant_id"),
2360 tenant_name=vim.get("vim_tenant_name"),
2361 url=vim["vim_url"],
2362 url_admin=None,
2363 user=vim["vim_user"],
2364 passwd=vim["vim_password"],
2365 config=vim.get("config") or {},
2366 persistent_info={},
2367 )
2368 else: # sdn
2369 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
2370 step = "Loading plugin '{}'".format(plugin_name)
2371 vim_module_conn = self._load_plugin(plugin_name, "sdn")
2372 step = "Loading {}'".format(target_id)
2373 wim = deepcopy(vim)
2374 wim_config = wim.pop("config", {}) or {}
2375 wim["uuid"] = wim["_id"]
2376 if "url" in wim and "wim_url" not in wim:
2377 wim["wim_url"] = wim["url"]
2378 elif "url" not in wim and "wim_url" in wim:
2379 wim["url"] = wim["wim_url"]
2381 if wim.get("dpid"):
2382 wim_config["dpid"] = wim.pop("dpid")
2384 if wim.get("switch_id"):
2385 wim_config["switch_id"] = wim.pop("switch_id")
2387 # wim, wim_account, config
2388 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
2389 self.db_vims[target_id] = vim
2390 self.error_status = None
2392 self.logger.info(
2393 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
2394 )
2395 except Exception as e:
2396 self.logger.error(
2397 "Cannot load {} plugin={}: {} {}".format(
2398 target_id, plugin_name, step, e
2399 )
2400 )
2402 self.db_vims[target_id] = vim or {}
2403 self.db_vims[target_id] = FailingConnector(str(e))
2404 error_status = "{} Error: {}".format(step, e)
2406 return error_status
2407 finally:
2408 if target_id not in self.vim_targets:
2409 self.vim_targets.append(target_id)
2411 def _get_db_task(self):
2412 """
2413 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
2414 :return: None
2415 """
2416 now = time.time()
2418 if not self.time_last_task_processed:
2419 self.time_last_task_processed = now
2421 try:
2422 while True:
2423 """
2424 # Log RO tasks only when loglevel is DEBUG
2425 if self.logger.getEffectiveLevel() == logging.DEBUG:
2426 self._log_ro_task(
2427 None,
2428 None,
2429 None,
2430 "TASK_WF",
2431 "task_locked_time="
2432 + str(self.task_locked_time)
2433 + " "
2434 + "time_last_task_processed="
2435 + str(self.time_last_task_processed)
2436 + " "
2437 + "now="
2438 + str(now),
2439 )
2440 """
2441 locked = self.db.set_one(
2442 "ro_tasks",
2443 q_filter={
2444 "target_id": self.vim_targets,
2445 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2446 "locked_at.lt": now - self.task_locked_time,
2447 "to_check_at.lt": self.time_last_task_processed,
2448 "to_check_at.gt": -1,
2449 },
2450 update_dict={"locked_by": self.my_id, "locked_at": now},
2451 fail_on_empty=False,
2452 )
2454 if locked:
2455 # read and return
2456 ro_task = self.db.get_one(
2457 "ro_tasks",
2458 q_filter={
2459 "target_id": self.vim_targets,
2460 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2461 "locked_at": now,
2462 },
2463 )
2464 return ro_task
2466 if self.time_last_task_processed == now:
2467 self.time_last_task_processed = None
2468 return None
2469 else:
2470 self.time_last_task_processed = now
2471 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2473 except DbException as e:
2474 self.logger.error("Database exception at _get_db_task: {}".format(e))
2475 except Exception as e:
2476 self.logger.critical(
2477 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
2478 )
2480 return None
2482 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2483 """
2484 Determine if this task need to be done or superseded
2485 :return: None
2486 """
2487 my_task = ro_task["tasks"][task_index]
2488 task_id = my_task["task_id"]
2489 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2490 "created_items", False
2491 )
2493 self.logger.debug("Needed delete: {}".format(needed_delete))
2494 if my_task["status"] == "FAILED":
2495 return None, None # TODO need to be retry??
2497 try:
2498 for index, task in enumerate(ro_task["tasks"]):
2499 if index == task_index or not task:
2500 continue # own task
2502 if (
2503 my_task["target_record"] == task["target_record"]
2504 and task["action"] == "CREATE"
2505 ):
2506 # set to finished
2507 db_update["tasks.{}.status".format(index)] = task["status"] = (
2508 "FINISHED"
2509 )
2510 elif task["action"] == "CREATE" and task["status"] not in (
2511 "FINISHED",
2512 "SUPERSEDED",
2513 ):
2514 needed_delete = False
2516 if needed_delete:
2517 self.logger.debug(
2518 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2519 )
2520 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2521 else:
2522 return "SUPERSEDED", None
2523 except Exception as e:
2524 if not isinstance(e, NsWorkerException):
2525 self.logger.critical(
2526 "Unexpected exception at _delete_task task={}: {}".format(
2527 task_id, e
2528 ),
2529 exc_info=True,
2530 )
2532 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2534 def _create_task(self, ro_task, task_index, task_depends, db_update):
2535 """
2536 Determine if this task need to create something at VIM
2537 :return: None
2538 """
2539 my_task = ro_task["tasks"][task_index]
2540 task_id = my_task["task_id"]
2542 if my_task["status"] == "FAILED":
2543 return None, None # TODO need to be retry??
2544 elif my_task["status"] == "SCHEDULED":
2545 # check if already created by another task
2546 for index, task in enumerate(ro_task["tasks"]):
2547 if index == task_index or not task:
2548 continue # own task
2550 if task["action"] == "CREATE" and task["status"] not in (
2551 "SCHEDULED",
2552 "FINISHED",
2553 "SUPERSEDED",
2554 ):
2555 return task["status"], "COPY_VIM_INFO"
2557 try:
2558 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2559 ro_task, task_index, task_depends
2560 )
2561 # TODO update other CREATE tasks
2562 except Exception as e:
2563 if not isinstance(e, NsWorkerException):
2564 self.logger.error(
2565 "Error executing task={}: {}".format(task_id, e), exc_info=True
2566 )
2568 task_status = "FAILED"
2569 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2570 # TODO update ro_vim_item_update
2572 return task_status, ro_vim_item_update
2573 else:
2574 return None, None
2576 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2577 """
2578 Look for dependency task
2579 :param task_id: Can be one of
2580 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2581 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2582 3. task.task_id: "<action_id>:number"
2583 :param ro_task:
2584 :param target_id:
2585 :return: database ro_task plus index of task
2586 """
2587 if (
2588 task_id.startswith("vim:")
2589 or task_id.startswith("sdn:")
2590 or task_id.startswith("wim:")
2591 ):
2592 target_id, _, task_id = task_id.partition(" ")
2594 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2595 ro_task_dependency = self.db.get_one(
2596 "ro_tasks",
2597 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2598 fail_on_empty=False,
2599 )
2601 if ro_task_dependency:
2602 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2603 if task["target_record_id"] == task_id:
2604 return ro_task_dependency, task_index
2606 else:
2607 if ro_task:
2608 for task_index, task in enumerate(ro_task["tasks"]):
2609 if task and task["task_id"] == task_id:
2610 return ro_task, task_index
2612 ro_task_dependency = self.db.get_one(
2613 "ro_tasks",
2614 q_filter={
2615 "tasks.ANYINDEX.task_id": task_id,
2616 "tasks.ANYINDEX.target_record.ne": None,
2617 },
2618 fail_on_empty=False,
2619 )
2621 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2622 if ro_task_dependency:
2623 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2624 if task["task_id"] == task_id:
2625 return ro_task_dependency, task_index
2626 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2628 def update_vm_refresh(self, ro_task):
2629 """Enables the VM status updates if self.refresh_config.active parameter
2630 is not -1 and then updates the DB accordingly
2632 """
2633 try:
2634 self.logger.debug("Checking if VM status update config")
2635 next_refresh = time.time()
2636 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2638 if next_refresh != -1:
2639 db_ro_task_update = {}
2640 now = time.time()
2641 next_check_at = now + (24 * 60 * 60)
2642 next_check_at = min(next_check_at, next_refresh)
2643 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2644 db_ro_task_update["to_check_at"] = next_check_at
2646 self.logger.debug(
2647 "Finding tasks which to be updated to enable VM status updates"
2648 )
2649 refresh_tasks = self.db.get_list(
2650 "ro_tasks",
2651 q_filter={
2652 "tasks.status": "DONE",
2653 "to_check_at.lt": 0,
2654 },
2655 )
2656 self.logger.debug("Updating tasks to change the to_check_at status")
2657 for task in refresh_tasks:
2658 q_filter = {
2659 "_id": task["_id"],
2660 }
2661 self.db.set_one(
2662 "ro_tasks",
2663 q_filter=q_filter,
2664 update_dict=db_ro_task_update,
2665 fail_on_empty=True,
2666 )
2668 except Exception as e:
2669 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2671 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2672 """Decide the next_refresh according to vim type and refresh config period.
2673 Args:
2674 ro_task (dict): ro_task details
2675 next_refresh (float): next refresh time as epoch format
2677 Returns:
2678 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2679 """
2680 target_vim = ro_task["target_id"]
2681 vim_type = self.db_vims[target_vim]["vim_type"]
2682 if self.refresh_config.active == -1 or vim_type == "openstack":
2683 next_refresh = -1
2684 else:
2685 next_refresh += self.refresh_config.active
2686 return next_refresh
2688 def _process_pending_tasks(self, ro_task):
2689 ro_task_id = ro_task["_id"]
2690 now = time.time()
2691 # one day
2692 next_check_at = now + (24 * 60 * 60)
2693 db_ro_task_update = {}
2695 def _update_refresh(new_status):
2696 # compute next_refresh
2697 nonlocal task
2698 nonlocal next_check_at
2699 nonlocal db_ro_task_update
2700 nonlocal ro_task
2702 next_refresh = time.time()
2704 if task["item"] in ("image", "flavor"):
2705 next_refresh += self.refresh_config.image
2706 elif new_status == "BUILD":
2707 next_refresh += self.refresh_config.build
2708 elif new_status == "DONE":
2709 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2710 else:
2711 next_refresh += self.refresh_config.error
2713 next_check_at = min(next_check_at, next_refresh)
2714 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2715 ro_task["vim_info"]["refresh_at"] = next_refresh
2717 try:
2718 """
2719 # Log RO tasks only when loglevel is DEBUG
2720 if self.logger.getEffectiveLevel() == logging.DEBUG:
2721 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2722 """
2723 # Check if vim status refresh is enabled again
2724 self.update_vm_refresh(ro_task)
2725 # 0: get task_status_create
2726 lock_object = None
2727 task_status_create = None
2728 task_create = next(
2729 (
2730 t
2731 for t in ro_task["tasks"]
2732 if t
2733 and t["action"] == "CREATE"
2734 and t["status"] in ("BUILD", "DONE")
2735 ),
2736 None,
2737 )
2739 if task_create:
2740 task_status_create = task_create["status"]
2742 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2743 for task_action in ("DELETE", "CREATE", "EXEC"):
2744 db_vim_update = None
2745 new_status = None
2747 for task_index, task in enumerate(ro_task["tasks"]):
2748 if not task:
2749 continue # task deleted
2751 task_depends = {}
2752 target_update = None
2754 if (
2755 (
2756 task_action in ("DELETE", "EXEC")
2757 and task["status"] not in ("SCHEDULED", "BUILD")
2758 )
2759 or task["action"] != task_action
2760 or (
2761 task_action == "CREATE"
2762 and task["status"] in ("FINISHED", "SUPERSEDED")
2763 )
2764 ):
2765 continue
2767 task_path = "tasks.{}.status".format(task_index)
2768 try:
2769 db_vim_info_update = None
2770 dependency_ro_task = {}
2772 if task["status"] == "SCHEDULED":
2773 # check if tasks that this depends on have been completed
2774 dependency_not_completed = False
2776 for dependency_task_id in task.get("depends_on") or ():
2777 (
2778 dependency_ro_task,
2779 dependency_task_index,
2780 ) = self._get_dependency(
2781 dependency_task_id, target_id=ro_task["target_id"]
2782 )
2783 dependency_task = dependency_ro_task["tasks"][
2784 dependency_task_index
2785 ]
2786 self.logger.debug(
2787 "dependency_ro_task={} dependency_task_index={}".format(
2788 dependency_ro_task, dependency_task_index
2789 )
2790 )
2792 if dependency_task["status"] == "SCHEDULED":
2793 dependency_not_completed = True
2794 next_check_at = min(
2795 next_check_at, dependency_ro_task["to_check_at"]
2796 )
2797 # must allow dependent task to be processed first
2798 # to do this set time after last_task_processed
2799 next_check_at = max(
2800 self.time_last_task_processed, next_check_at
2801 )
2802 break
2803 elif dependency_task["status"] == "FAILED":
2804 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2805 task["action"],
2806 task["item"],
2807 dependency_task["action"],
2808 dependency_task["item"],
2809 dependency_task_id,
2810 dependency_ro_task["vim_info"].get(
2811 "vim_message"
2812 ),
2813 )
2814 self.logger.error(
2815 "task={} {}".format(task["task_id"], error_text)
2816 )
2817 raise NsWorkerException(error_text)
2819 task_depends[dependency_task_id] = dependency_ro_task[
2820 "vim_info"
2821 ]["vim_id"]
2822 task_depends["TASK-{}".format(dependency_task_id)] = (
2823 dependency_ro_task["vim_info"]["vim_id"]
2824 )
2826 if dependency_not_completed:
2827 self.logger.warning(
2828 "DEPENDENCY NOT COMPLETED {}".format(
2829 dependency_ro_task["vim_info"]["vim_id"]
2830 )
2831 )
2832 # TODO set at vim_info.vim_details that it is waiting
2833 continue
2835 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2836 # the task of renew this locking. It will update database locket_at periodically
2837 if not lock_object:
2838 lock_object = LockRenew.add_lock_object(
2839 "ro_tasks", ro_task, self
2840 )
2841 if task["action"] == "DELETE":
2842 (
2843 new_status,
2844 db_vim_info_update,
2845 ) = self._delete_task(
2846 ro_task, task_index, task_depends, db_ro_task_update
2847 )
2848 new_status = (
2849 "FINISHED" if new_status == "DONE" else new_status
2850 )
2851 # ^with FINISHED instead of DONE it will not be refreshing
2853 if new_status in ("FINISHED", "SUPERSEDED"):
2854 target_update = "DELETE"
2855 elif task["action"] == "EXEC":
2856 (
2857 new_status,
2858 db_vim_info_update,
2859 db_task_update,
2860 ) = self.item2class[task["item"]].exec(
2861 ro_task, task_index, task_depends
2862 )
2863 new_status = (
2864 "FINISHED" if new_status == "DONE" else new_status
2865 )
2866 # ^with FINISHED instead of DONE it will not be refreshing
2868 if db_task_update:
2869 # load into database the modified db_task_update "retries" and "next_retry"
2870 if db_task_update.get("retries"):
2871 db_ro_task_update[
2872 "tasks.{}.retries".format(task_index)
2873 ] = db_task_update["retries"]
2875 next_check_at = time.time() + db_task_update.get(
2876 "next_retry", 60
2877 )
2878 target_update = None
2879 elif task["action"] == "CREATE":
2880 if task["status"] == "SCHEDULED":
2881 if task_status_create:
2882 new_status = task_status_create
2883 target_update = "COPY_VIM_INFO"
2884 else:
2885 new_status, db_vim_info_update = self.item2class[
2886 task["item"]
2887 ].new(ro_task, task_index, task_depends)
2888 _update_refresh(new_status)
2889 else:
2890 refresh_at = ro_task["vim_info"]["refresh_at"]
2891 if refresh_at and refresh_at != -1 and now > refresh_at:
2892 (
2893 new_status,
2894 db_vim_info_update,
2895 ) = self.item2class[
2896 task["item"]
2897 ].refresh(ro_task)
2898 _update_refresh(new_status)
2899 else:
2900 # The refresh is updated to avoid set the value of "refresh_at" to
2901 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2902 # because it can happen that in this case the task is never processed
2903 _update_refresh(task["status"])
2905 except Exception as e:
2906 new_status = "FAILED"
2907 db_vim_info_update = {
2908 "vim_status": "VIM_ERROR",
2909 "vim_message": str(e),
2910 }
2912 if not isinstance(
2913 e, (NsWorkerException, vimconn.VimConnException)
2914 ):
2915 self.logger.error(
2916 "Unexpected exception at _delete_task task={}: {}".format(
2917 task["task_id"], e
2918 ),
2919 exc_info=True,
2920 )
2922 try:
2923 if db_vim_info_update:
2924 db_vim_update = db_vim_info_update.copy()
2925 db_ro_task_update.update(
2926 {
2927 "vim_info." + k: v
2928 for k, v in db_vim_info_update.items()
2929 }
2930 )
2931 ro_task["vim_info"].update(db_vim_info_update)
2933 if new_status:
2934 if task_action == "CREATE":
2935 task_status_create = new_status
2936 db_ro_task_update[task_path] = new_status
2938 if target_update or db_vim_update:
2939 if target_update == "DELETE":
2940 self._update_target(task, None)
2941 elif target_update == "COPY_VIM_INFO":
2942 self._update_target(task, ro_task["vim_info"])
2943 else:
2944 self._update_target(task, db_vim_update)
2946 except Exception as e:
2947 if (
2948 isinstance(e, DbException)
2949 and e.http_code == HTTPStatus.NOT_FOUND
2950 ):
2951 # if the vnfrs or nsrs has been removed from database, this task must be removed
2952 self.logger.debug(
2953 "marking to delete task={}".format(task["task_id"])
2954 )
2955 self.tasks_to_delete.append(task)
2956 else:
2957 self.logger.error(
2958 "Unexpected exception at _update_target task={}: {}".format(
2959 task["task_id"], e
2960 ),
2961 exc_info=True,
2962 )
2964 locked_at = ro_task["locked_at"]
2966 if lock_object:
2967 locked_at = [
2968 lock_object["locked_at"],
2969 lock_object["locked_at"] + self.task_locked_time,
2970 ]
2971 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2972 # contain exactly locked_at + self.task_locked_time
2973 LockRenew.remove_lock_object(lock_object)
2975 q_filter = {
2976 "_id": ro_task["_id"],
2977 "to_check_at": ro_task["to_check_at"],
2978 "locked_at": locked_at,
2979 }
2980 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2981 # outside this task (by ro_nbi) do not update it
2982 db_ro_task_update["locked_by"] = None
2983 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2984 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2985 db_ro_task_update["modified_at"] = now
2986 db_ro_task_update["to_check_at"] = next_check_at
2988 """
2989 # Log RO tasks only when loglevel is DEBUG
2990 if self.logger.getEffectiveLevel() == logging.DEBUG:
2991 db_ro_task_update_log = db_ro_task_update.copy()
2992 db_ro_task_update_log["_id"] = q_filter["_id"]
2993 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2994 """
2996 if not self.db.set_one(
2997 "ro_tasks",
2998 update_dict=db_ro_task_update,
2999 q_filter=q_filter,
3000 fail_on_empty=False,
3001 ):
3002 del db_ro_task_update["to_check_at"]
3003 del q_filter["to_check_at"]
3004 """
3005 # Log RO tasks only when loglevel is DEBUG
3006 if self.logger.getEffectiveLevel() == logging.DEBUG:
3007 self._log_ro_task(
3008 None,
3009 db_ro_task_update_log,
3010 None,
3011 "TASK_WF",
3012 "SET_TASK " + str(q_filter),
3013 )
3014 """
3015 self.db.set_one(
3016 "ro_tasks",
3017 q_filter=q_filter,
3018 update_dict=db_ro_task_update,
3019 fail_on_empty=True,
3020 )
3021 except DbException as e:
3022 self.logger.error(
3023 "ro_task={} Error updating database {}".format(ro_task_id, e)
3024 )
3025 except Exception as e:
3026 self.logger.error(
3027 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
3028 )
3030 def _update_target(self, task, ro_vim_item_update):
3031 table, _, temp = task["target_record"].partition(":")
3032 _id, _, path_vim_status = temp.partition(":")
3033 path_item = path_vim_status[: path_vim_status.rfind(".")]
3034 path_item = path_item[: path_item.rfind(".")]
3035 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
3036 # path_item: dot separated list targeting record information, e.g. "vdur.10"
3038 if ro_vim_item_update:
3039 update_dict = {
3040 path_vim_status + "." + k: v
3041 for k, v in ro_vim_item_update.items()
3042 if k
3043 in (
3044 "vim_id",
3045 "vim_details",
3046 "vim_message",
3047 "vim_name",
3048 "vim_status",
3049 "interfaces",
3050 "interfaces_backup",
3051 )
3052 }
3054 if path_vim_status.startswith("vdur."):
3055 # for backward compatibility, add vdur.name apart from vdur.vim_name
3056 if ro_vim_item_update.get("vim_name"):
3057 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
3059 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
3060 if ro_vim_item_update.get("vim_id"):
3061 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
3063 # update general status
3064 if ro_vim_item_update.get("vim_status"):
3065 update_dict[path_item + ".status"] = ro_vim_item_update[
3066 "vim_status"
3067 ]
3069 if ro_vim_item_update.get("interfaces"):
3070 path_interfaces = path_item + ".interfaces"
3072 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
3073 if iface:
3074 update_dict.update(
3075 {
3076 path_interfaces + ".{}.".format(i) + k: v
3077 for k, v in iface.items()
3078 if k in ("vlan", "compute_node", "pci")
3079 }
3080 )
3082 # put ip_address and mac_address with ip-address and mac-address
3083 if iface.get("ip_address"):
3084 update_dict[
3085 path_interfaces + ".{}.".format(i) + "ip-address"
3086 ] = iface["ip_address"]
3088 if iface.get("mac_address"):
3089 update_dict[
3090 path_interfaces + ".{}.".format(i) + "mac-address"
3091 ] = iface["mac_address"]
3093 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
3094 update_dict["ip-address"] = iface.get("ip_address").split(
3095 ";"
3096 )[0]
3098 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
3099 update_dict[path_item + ".ip-address"] = iface.get(
3100 "ip_address"
3101 ).split(";")[0]
3103 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
3105 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
3106 if ro_vim_item_update.get("interfaces"):
3107 search_key = path_vim_status + ".interfaces"
3108 if update_dict.get(search_key):
3109 interfaces_backup_update = {
3110 path_vim_status + ".interfaces_backup": update_dict[search_key]
3111 }
3113 self.db.set_one(
3114 table,
3115 q_filter={"_id": _id},
3116 update_dict=interfaces_backup_update,
3117 )
3119 else:
3120 update_dict = {path_item + ".status": "DELETED"}
3121 self.db.set_one(
3122 table,
3123 q_filter={"_id": _id},
3124 update_dict=update_dict,
3125 unset={path_vim_status: None},
3126 )
3128 def _process_delete_db_tasks(self):
3129 """
3130 Delete task from database because vnfrs or nsrs or both have been deleted
3131 :return: None. Uses and modify self.tasks_to_delete
3132 """
3133 while self.tasks_to_delete:
3134 task = self.tasks_to_delete[0]
3135 vnfrs_deleted = None
3136 nsr_id = task["nsr_id"]
3138 if task["target_record"].startswith("vnfrs:"):
3139 # check if nsrs is present
3140 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
3141 vnfrs_deleted = task["target_record"].split(":")[1]
3143 try:
3144 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
3145 except Exception as e:
3146 self.logger.error(
3147 "Error deleting task={}: {}".format(task["task_id"], e)
3148 )
3149 self.tasks_to_delete.pop(0)
3151 @staticmethod
3152 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
3153 """
3154 Static method because it is called from osm_ng_ro.ns
3155 :param db: instance of database to use
3156 :param nsr_id: affected nsrs id
3157 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
3158 :return: None, exception is fails
3159 """
3160 retries = 5
3161 for retry in range(retries):
3162 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
3163 now = time.time()
3164 conflict = False
3166 for ro_task in ro_tasks:
3167 db_update = {}
3168 to_delete_ro_task = True
3170 for index, task in enumerate(ro_task["tasks"]):
3171 if not task:
3172 pass
3173 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
3174 vnfrs_deleted
3175 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
3176 ):
3177 db_update["tasks.{}".format(index)] = None
3178 else:
3179 # used by other nsr, ro_task cannot be deleted
3180 to_delete_ro_task = False
3182 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
3183 if to_delete_ro_task:
3184 if not db.del_one(
3185 "ro_tasks",
3186 q_filter={
3187 "_id": ro_task["_id"],
3188 "modified_at": ro_task["modified_at"],
3189 },
3190 fail_on_empty=False,
3191 ):
3192 conflict = True
3193 elif db_update:
3194 db_update["modified_at"] = now
3195 if not db.set_one(
3196 "ro_tasks",
3197 q_filter={
3198 "_id": ro_task["_id"],
3199 "modified_at": ro_task["modified_at"],
3200 },
3201 update_dict=db_update,
3202 fail_on_empty=False,
3203 ):
3204 conflict = True
3205 if not conflict:
3206 return
3207 else:
3208 raise NsWorkerException("Exceeded {} retries".format(retries))
3210 def run(self):
3211 # load database
3212 self.logger.info("Starting")
3213 while True:
3214 # step 1: get commands from queue
3215 try:
3216 if self.vim_targets:
3217 task = self.task_queue.get(block=False)
3218 else:
3219 if not self.idle:
3220 self.logger.debug("enters in idle state")
3221 self.idle = True
3222 task = self.task_queue.get(block=True)
3223 self.idle = False
3225 if task[0] == "terminate":
3226 break
3227 elif task[0] == "load_vim":
3228 self.logger.info("order to load vim {}".format(task[1]))
3229 self._load_vim(task[1])
3230 elif task[0] == "unload_vim":
3231 self.logger.info("order to unload vim {}".format(task[1]))
3232 self._unload_vim(task[1])
3233 elif task[0] == "reload_vim":
3234 self._reload_vim(task[1])
3235 elif task[0] == "check_vim":
3236 self.logger.info("order to check vim {}".format(task[1]))
3237 self._check_vim(task[1])
3238 continue
3239 except Exception as e:
3240 if isinstance(e, queue.Empty):
3241 pass
3242 else:
3243 self.logger.critical(
3244 "Error processing task: {}".format(e), exc_info=True
3245 )
3247 # step 2: process pending_tasks, delete not needed tasks
3248 try:
3249 if self.tasks_to_delete:
3250 self._process_delete_db_tasks()
3251 busy = False
3252 """
3253 # Log RO tasks only when loglevel is DEBUG
3254 if self.logger.getEffectiveLevel() == logging.DEBUG:
3255 _ = self._get_db_all_tasks()
3256 """
3257 ro_task = self._get_db_task()
3258 if ro_task:
3259 self.logger.debug("Task to process: {}".format(ro_task))
3260 time.sleep(1)
3261 self._process_pending_tasks(ro_task)
3262 busy = True
3263 if not busy:
3264 time.sleep(5)
3265 except Exception as e:
3266 self.logger.critical(
3267 "Unexpected exception at run: " + str(e), exc_info=True
3268 )
3270 self.logger.info("Finishing")