Coverage for NG-RO/osm_ng_ro/ns_thread.py: 35%
1252 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-06-28 09:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-06-28 09:51 +0000
1# -*- coding: utf-8 -*-
3##
4# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17#
18##
20"""
21This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22The tasks are stored at database in table ro_tasks
23A single ro_task refers to a VIM element (flavor, image, network, ...).
24A ro_task can contain several 'tasks', each one with a target, where to store the results
25"""
27from copy import deepcopy
28from http import HTTPStatus
29import logging
30from os import makedirs
31from os import path
32import queue
33import threading
34import time
35import traceback
36from typing import Dict
37from unittest.mock import Mock
39from importlib_metadata import entry_points
40from osm_common.dbbase import DbException
41from osm_ng_ro.vim_admin import LockRenew
42from osm_ro_plugin import sdnconn
43from osm_ro_plugin import vimconn
44from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45from osm_ro_plugin.vim_dummy import VimDummyConnector
46import yaml
48__author__ = "Alfonso Tierno"
49__date__ = "$28-Sep-2017 12:07:15$"
52def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
68class NsWorkerException(Exception):
69 pass
72class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
89class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
93class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
111 return "DONE", {}
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
121class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
235 return "FAILED", ro_vim_item_update
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 ro_vim_item_update.get("vim_message")
289 if ro_vim_item_update.get("vim_status") != "ACTIVE"
290 else "",
291 )
292 )
294 return task_status, ro_vim_item_update
296 def delete(self, ro_task, task_index):
297 task = ro_task["tasks"][task_index]
298 task_id = task["task_id"]
299 net_vim_id = ro_task["vim_info"]["vim_id"]
300 ro_vim_item_update_ok = {
301 "vim_status": "DELETED",
302 "created": False,
303 "vim_message": "DELETED",
304 "vim_id": None,
305 }
307 try:
308 if net_vim_id or ro_task["vim_info"]["created_items"]:
309 target_vim = self.my_vims[ro_task["target_id"]]
310 target_vim.delete_network(
311 net_vim_id, ro_task["vim_info"]["created_items"]
312 )
313 except vimconn.VimConnNotFoundException:
314 ro_vim_item_update_ok["vim_message"] = "already deleted"
315 except vimconn.VimConnException as e:
316 self.logger.error(
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task["_id"], ro_task["target_id"], net_vim_id, e
319 )
320 )
321 ro_vim_item_update = {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e),
324 }
326 return "FAILED", ro_vim_item_update
328 self.logger.debug(
329 "task={} {} del-net={} {}".format(
330 task_id,
331 ro_task["target_id"],
332 net_vim_id,
333 ro_vim_item_update_ok.get("vim_message", ""),
334 )
335 )
337 return "DONE", ro_vim_item_update_ok
340class VimInteractionVdu(VimInteractionBase):
341 max_retries_inject_ssh_key = 20 # 20 times
342 time_retries_inject_ssh_key = 30 # wevery 30 seconds
344 def new(self, ro_task, task_index, task_depends):
345 task = ro_task["tasks"][task_index]
346 task_id = task["task_id"]
347 created = False
348 target_vim = self.my_vims[ro_task["target_id"]]
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
366 net["net_id"] = network_id
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
389 affinity_group["affinity_group_id"] = affinity_group_id
390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393 # add to created items previous_created_volumes (healing)
394 if task.get("previous_created_volumes"):
395 for k, v in task["previous_created_volumes"].items():
396 created_items[k] = v
398 ro_vim_item_update = {
399 "vim_id": vim_vm_id,
400 "vim_status": "BUILD",
401 "created": created,
402 "created_items": created_items,
403 "vim_details": None,
404 "vim_message": None,
405 "interfaces_vim_ids": interfaces,
406 "interfaces": [],
407 "interfaces_backup": [],
408 }
409 self.logger.debug(
410 "task={} {} new-vm={} created={}".format(
411 task_id, ro_task["target_id"], vim_vm_id, created
412 )
413 )
415 return "BUILD", ro_vim_item_update
416 except (vimconn.VimConnException, NsWorkerException) as e:
417 self.logger.debug(traceback.format_exc())
418 self.logger.error(
419 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
420 )
421 ro_vim_item_update = {
422 "vim_status": "VIM_ERROR",
423 "created": created,
424 "vim_message": str(e),
425 }
427 return "FAILED", ro_vim_item_update
429 def delete(self, ro_task, task_index):
430 task = ro_task["tasks"][task_index]
431 task_id = task["task_id"]
432 vm_vim_id = ro_task["vim_info"]["vim_id"]
433 ro_vim_item_update_ok = {
434 "vim_status": "DELETED",
435 "created": False,
436 "vim_message": "DELETED",
437 "vim_id": None,
438 }
440 try:
441 self.logger.debug(
442 "delete_vminstance: vm_vim_id={} created_items={}".format(
443 vm_vim_id, ro_task["vim_info"]["created_items"]
444 )
445 )
446 if vm_vim_id or ro_task["vim_info"]["created_items"]:
447 target_vim = self.my_vims[ro_task["target_id"]]
448 target_vim.delete_vminstance(
449 vm_vim_id,
450 ro_task["vim_info"]["created_items"],
451 ro_task["vim_info"].get("volumes_to_hold", []),
452 )
453 except vimconn.VimConnNotFoundException:
454 ro_vim_item_update_ok["vim_message"] = "already deleted"
455 except vimconn.VimConnException as e:
456 self.logger.error(
457 "ro_task={} vim={} del-vm={}: {}".format(
458 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
459 )
460 )
461 ro_vim_item_update = {
462 "vim_status": "VIM_ERROR",
463 "vim_message": "Error while deleting: {}".format(e),
464 }
466 return "FAILED", ro_vim_item_update
468 self.logger.debug(
469 "task={} {} del-vm={} {}".format(
470 task_id,
471 ro_task["target_id"],
472 vm_vim_id,
473 ro_vim_item_update_ok.get("vim_message", ""),
474 )
475 )
477 return "DONE", ro_vim_item_update_ok
479 def refresh(self, ro_task):
480 """Call VIM to get vm status"""
481 ro_task_id = ro_task["_id"]
482 target_vim = self.my_vims[ro_task["target_id"]]
483 vim_id = ro_task["vim_info"]["vim_id"]
485 if not vim_id:
486 return None, None
488 vm_to_refresh_list = [vim_id]
489 try:
490 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
491 vim_info = vim_dict[vim_id]
493 if vim_info["status"] == "ACTIVE":
494 task_status = "DONE"
495 elif vim_info["status"] == "BUILD":
496 task_status = "BUILD"
497 else:
498 task_status = "FAILED"
500 # try to load and parse vim_information
501 try:
502 vim_info_info = yaml.safe_load(vim_info["vim_info"])
503 if vim_info_info.get("name"):
504 vim_info["name"] = vim_info_info["name"]
505 except Exception as vim_info_error:
506 self.logger.exception(
507 f"{vim_info_error} occured while getting the vim_info from yaml"
508 )
509 except vimconn.VimConnException as e:
510 # Mark all tasks at VIM_ERROR status
511 self.logger.error(
512 "ro_task={} vim={} get-vm={}: {}".format(
513 ro_task_id, ro_task["target_id"], vim_id, e
514 )
515 )
516 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
517 task_status = "FAILED"
519 ro_vim_item_update = {}
521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
522 vim_interfaces = []
523 if vim_info.get("interfaces"):
524 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
525 iface = next(
526 (
527 iface
528 for iface in vim_info["interfaces"]
529 if vim_iface_id == iface["vim_interface_id"]
530 ),
531 None,
532 )
533 # if iface:
534 # iface.pop("vim_info", None)
535 vim_interfaces.append(iface)
537 task_create = next(
538 t
539 for t in ro_task["tasks"]
540 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
541 )
542 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
543 vim_interfaces[task_create["mgmt_vnf_interface"]][
544 "mgmt_vnf_interface"
545 ] = True
547 mgmt_vdu_iface = task_create.get(
548 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
549 )
550 if vim_interfaces:
551 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
553 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
554 ro_vim_item_update["interfaces"] = vim_interfaces
556 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
557 ro_vim_item_update["vim_status"] = vim_info["status"]
559 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
560 ro_vim_item_update["vim_name"] = vim_info.get("name")
562 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
563 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
564 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
565 elif vim_info["status"] == "DELETED":
566 ro_vim_item_update["vim_id"] = None
567 ro_vim_item_update["vim_message"] = "Deleted externally"
568 else:
569 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
570 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
572 if ro_vim_item_update:
573 self.logger.debug(
574 "ro_task={} {} get-vm={}: status={} {}".format(
575 ro_task_id,
576 ro_task["target_id"],
577 vim_id,
578 ro_vim_item_update.get("vim_status"),
579 ro_vim_item_update.get("vim_message")
580 if ro_vim_item_update.get("vim_status") != "ACTIVE"
581 else "",
582 )
583 )
585 return task_status, ro_vim_item_update
587 def exec(self, ro_task, task_index, task_depends):
588 task = ro_task["tasks"][task_index]
589 task_id = task["task_id"]
590 target_vim = self.my_vims[ro_task["target_id"]]
591 db_task_update = {"retries": 0}
592 retries = task.get("retries", 0)
594 try:
595 params = task["params"]
596 params_copy = deepcopy(params)
597 params_copy["ro_key"] = self.db.decrypt(
598 params_copy.pop("private_key"),
599 params_copy.pop("schema_version"),
600 params_copy.pop("salt"),
601 )
602 params_copy["ip_addr"] = params_copy.pop("ip_address")
603 target_vim.inject_user_key(**params_copy)
604 self.logger.debug(
605 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
606 )
608 return (
609 "DONE",
610 None,
611 db_task_update,
612 ) # params_copy["key"]
613 except (vimconn.VimConnException, NsWorkerException) as e:
614 retries += 1
616 self.logger.debug(traceback.format_exc())
617 if retries < self.max_retries_inject_ssh_key:
618 return (
619 "BUILD",
620 None,
621 {
622 "retries": retries,
623 "next_retry": self.time_retries_inject_ssh_key,
624 },
625 )
627 self.logger.error(
628 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
629 )
630 ro_vim_item_update = {"vim_message": str(e)}
632 return "FAILED", ro_vim_item_update, db_task_update
635class VimInteractionImage(VimInteractionBase):
636 def new(self, ro_task, task_index, task_depends):
637 task = ro_task["tasks"][task_index]
638 task_id = task["task_id"]
639 created = False
640 created_items = {}
641 target_vim = self.my_vims[ro_task["target_id"]]
643 try:
644 # FIND
645 vim_image_id = ""
646 if task.get("find_params"):
647 vim_images = target_vim.get_image_list(**task["find_params"])
649 if not vim_images:
650 raise NsWorkerExceptionNotFound(
651 "Image not found with this criteria: '{}'".format(
652 task["find_params"]
653 )
654 )
655 elif len(vim_images) > 1:
656 raise NsWorkerException(
657 "More than one image found with this criteria: '{}'".format(
658 task["find_params"]
659 )
660 )
661 else:
662 vim_image_id = vim_images[0]["id"]
664 ro_vim_item_update = {
665 "vim_id": vim_image_id,
666 "vim_status": "ACTIVE",
667 "created": created,
668 "created_items": created_items,
669 "vim_details": None,
670 "vim_message": None,
671 }
672 self.logger.debug(
673 "task={} {} new-image={} created={}".format(
674 task_id, ro_task["target_id"], vim_image_id, created
675 )
676 )
678 return "DONE", ro_vim_item_update
679 except (NsWorkerException, vimconn.VimConnException) as e:
680 self.logger.error(
681 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
682 )
683 ro_vim_item_update = {
684 "vim_status": "VIM_ERROR",
685 "created": created,
686 "vim_message": str(e),
687 }
689 return "FAILED", ro_vim_item_update
692class VimInteractionSharedVolume(VimInteractionBase):
693 def delete(self, ro_task, task_index):
694 task = ro_task["tasks"][task_index]
695 task_id = task["task_id"]
696 shared_volume_vim_id = ro_task["vim_info"]["vim_id"]
697 created_items = ro_task["vim_info"]["created_items"]
698 ro_vim_item_update_ok = {
699 "vim_status": "DELETED",
700 "created": False,
701 "vim_message": "DELETED",
702 "vim_id": None,
703 }
704 if created_items and created_items.get(shared_volume_vim_id).get("keep"):
705 ro_vim_item_update_ok = {
706 "vim_status": "ACTIVE",
707 "created": False,
708 "vim_message": None,
709 }
710 return "DONE", ro_vim_item_update_ok
711 try:
712 if shared_volume_vim_id:
713 target_vim = self.my_vims[ro_task["target_id"]]
714 target_vim.delete_shared_volumes(shared_volume_vim_id)
715 except vimconn.VimConnNotFoundException:
716 ro_vim_item_update_ok["vim_message"] = "already deleted"
717 except vimconn.VimConnException as e:
718 self.logger.error(
719 "ro_task={} vim={} del-shared-volume={}: {}".format(
720 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e
721 )
722 )
723 ro_vim_item_update = {
724 "vim_status": "VIM_ERROR",
725 "vim_message": "Error while deleting: {}".format(e),
726 }
728 return "FAILED", ro_vim_item_update
730 self.logger.debug(
731 "task={} {} del-shared-volume={} {}".format(
732 task_id,
733 ro_task["target_id"],
734 shared_volume_vim_id,
735 ro_vim_item_update_ok.get("vim_message", ""),
736 )
737 )
739 return "DONE", ro_vim_item_update_ok
741 def new(self, ro_task, task_index, task_depends):
742 task = ro_task["tasks"][task_index]
743 task_id = task["task_id"]
744 created = False
745 created_items = {}
746 target_vim = self.my_vims[ro_task["target_id"]]
748 try:
749 shared_volume_vim_id = None
750 shared_volume_data = None
752 if task.get("params"):
753 shared_volume_data = task["params"]
755 if shared_volume_data:
756 self.logger.info(
757 f"Creating the new shared_volume for {shared_volume_data}\n"
758 )
759 (
760 shared_volume_name,
761 shared_volume_vim_id,
762 ) = target_vim.new_shared_volumes(shared_volume_data)
763 created = True
764 created_items[shared_volume_vim_id] = {
765 "name": shared_volume_name,
766 "keep": shared_volume_data.get("keep"),
767 }
769 ro_vim_item_update = {
770 "vim_id": shared_volume_vim_id,
771 "vim_status": "ACTIVE",
772 "created": created,
773 "created_items": created_items,
774 "vim_details": None,
775 "vim_message": None,
776 }
777 self.logger.debug(
778 "task={} {} new-shared-volume={} created={}".format(
779 task_id, ro_task["target_id"], shared_volume_vim_id, created
780 )
781 )
783 return "DONE", ro_vim_item_update
784 except (vimconn.VimConnException, NsWorkerException) as e:
785 self.logger.error(
786 "task={} vim={} new-shared-volume:"
787 " {}".format(task_id, ro_task["target_id"], e)
788 )
789 ro_vim_item_update = {
790 "vim_status": "VIM_ERROR",
791 "created": created,
792 "vim_message": str(e),
793 }
795 return "FAILED", ro_vim_item_update
798class VimInteractionFlavor(VimInteractionBase):
799 def delete(self, ro_task, task_index):
800 task = ro_task["tasks"][task_index]
801 task_id = task["task_id"]
802 flavor_vim_id = ro_task["vim_info"]["vim_id"]
803 ro_vim_item_update_ok = {
804 "vim_status": "DELETED",
805 "created": False,
806 "vim_message": "DELETED",
807 "vim_id": None,
808 }
810 try:
811 if flavor_vim_id:
812 target_vim = self.my_vims[ro_task["target_id"]]
813 target_vim.delete_flavor(flavor_vim_id)
814 except vimconn.VimConnNotFoundException:
815 ro_vim_item_update_ok["vim_message"] = "already deleted"
816 except vimconn.VimConnException as e:
817 self.logger.error(
818 "ro_task={} vim={} del-flavor={}: {}".format(
819 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
820 )
821 )
822 ro_vim_item_update = {
823 "vim_status": "VIM_ERROR",
824 "vim_message": "Error while deleting: {}".format(e),
825 }
827 return "FAILED", ro_vim_item_update
829 self.logger.debug(
830 "task={} {} del-flavor={} {}".format(
831 task_id,
832 ro_task["target_id"],
833 flavor_vim_id,
834 ro_vim_item_update_ok.get("vim_message", ""),
835 )
836 )
838 return "DONE", ro_vim_item_update_ok
840 def new(self, ro_task, task_index, task_depends):
841 task = ro_task["tasks"][task_index]
842 task_id = task["task_id"]
843 created = False
844 created_items = {}
845 target_vim = self.my_vims[ro_task["target_id"]]
846 try:
847 # FIND
848 vim_flavor_id = None
850 if task.get("find_params", {}).get("vim_flavor_id"):
851 vim_flavor_id = task["find_params"]["vim_flavor_id"]
852 elif task.get("find_params", {}).get("flavor_data"):
853 try:
854 flavor_data = task["find_params"]["flavor_data"]
855 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
856 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
857 self.logger.warning(
858 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
859 )
861 if not vim_flavor_id and task.get("params"):
862 # CREATE
863 flavor_data = task["params"]["flavor_data"]
864 vim_flavor_id = target_vim.new_flavor(flavor_data)
865 created = True
867 ro_vim_item_update = {
868 "vim_id": vim_flavor_id,
869 "vim_status": "ACTIVE",
870 "created": created,
871 "created_items": created_items,
872 "vim_details": None,
873 "vim_message": None,
874 }
875 self.logger.debug(
876 "task={} {} new-flavor={} created={}".format(
877 task_id, ro_task["target_id"], vim_flavor_id, created
878 )
879 )
881 return "DONE", ro_vim_item_update
882 except (vimconn.VimConnException, NsWorkerException) as e:
883 self.logger.error(
884 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
885 )
886 ro_vim_item_update = {
887 "vim_status": "VIM_ERROR",
888 "created": created,
889 "vim_message": str(e),
890 }
892 return "FAILED", ro_vim_item_update
895class VimInteractionAffinityGroup(VimInteractionBase):
896 def delete(self, ro_task, task_index):
897 task = ro_task["tasks"][task_index]
898 task_id = task["task_id"]
899 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
900 ro_vim_item_update_ok = {
901 "vim_status": "DELETED",
902 "created": False,
903 "vim_message": "DELETED",
904 "vim_id": None,
905 }
907 try:
908 if affinity_group_vim_id:
909 target_vim = self.my_vims[ro_task["target_id"]]
910 target_vim.delete_affinity_group(affinity_group_vim_id)
911 except vimconn.VimConnNotFoundException:
912 ro_vim_item_update_ok["vim_message"] = "already deleted"
913 except vimconn.VimConnException as e:
914 self.logger.error(
915 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
916 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
917 )
918 )
919 ro_vim_item_update = {
920 "vim_status": "VIM_ERROR",
921 "vim_message": "Error while deleting: {}".format(e),
922 }
924 return "FAILED", ro_vim_item_update
926 self.logger.debug(
927 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
928 task_id,
929 ro_task["target_id"],
930 affinity_group_vim_id,
931 ro_vim_item_update_ok.get("vim_message", ""),
932 )
933 )
935 return "DONE", ro_vim_item_update_ok
937 def new(self, ro_task, task_index, task_depends):
938 task = ro_task["tasks"][task_index]
939 task_id = task["task_id"]
940 created = False
941 created_items = {}
942 target_vim = self.my_vims[ro_task["target_id"]]
944 try:
945 affinity_group_vim_id = None
946 affinity_group_data = None
947 param_affinity_group_id = ""
949 if task.get("params"):
950 affinity_group_data = task["params"].get("affinity_group_data")
952 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
953 try:
954 param_affinity_group_id = task["params"]["affinity_group_data"].get(
955 "vim-affinity-group-id"
956 )
957 affinity_group_vim_id = target_vim.get_affinity_group(
958 param_affinity_group_id
959 ).get("id")
960 except vimconn.VimConnNotFoundException:
961 self.logger.error(
962 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
963 "could not be found at VIM. Creating a new one.".format(
964 task_id, ro_task["target_id"], param_affinity_group_id
965 )
966 )
968 if not affinity_group_vim_id and affinity_group_data:
969 affinity_group_vim_id = target_vim.new_affinity_group(
970 affinity_group_data
971 )
972 created = True
974 ro_vim_item_update = {
975 "vim_id": affinity_group_vim_id,
976 "vim_status": "ACTIVE",
977 "created": created,
978 "created_items": created_items,
979 "vim_details": None,
980 "vim_message": None,
981 }
982 self.logger.debug(
983 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
984 task_id, ro_task["target_id"], affinity_group_vim_id, created
985 )
986 )
988 return "DONE", ro_vim_item_update
989 except (vimconn.VimConnException, NsWorkerException) as e:
990 self.logger.error(
991 "task={} vim={} new-affinity-or-anti-affinity-group:"
992 " {}".format(task_id, ro_task["target_id"], e)
993 )
994 ro_vim_item_update = {
995 "vim_status": "VIM_ERROR",
996 "created": created,
997 "vim_message": str(e),
998 }
1000 return "FAILED", ro_vim_item_update
1003class VimInteractionUpdateVdu(VimInteractionBase):
1004 def exec(self, ro_task, task_index, task_depends):
1005 task = ro_task["tasks"][task_index]
1006 task_id = task["task_id"]
1007 db_task_update = {"retries": 0}
1008 target_vim = self.my_vims[ro_task["target_id"]]
1010 try:
1011 vim_vm_id = ""
1012 if task.get("params"):
1013 vim_vm_id = task["params"].get("vim_vm_id")
1014 action = task["params"].get("action")
1015 context = {action: action}
1016 target_vim.action_vminstance(vim_vm_id, context)
1017 # created = True
1018 ro_vim_item_update = {
1019 "vim_id": vim_vm_id,
1020 "vim_status": "ACTIVE",
1021 }
1022 self.logger.debug(
1023 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1024 )
1025 return "DONE", ro_vim_item_update, db_task_update
1026 except (vimconn.VimConnException, NsWorkerException) as e:
1027 self.logger.error(
1028 "task={} vim={} VM Migration:"
1029 " {}".format(task_id, ro_task["target_id"], e)
1030 )
1031 ro_vim_item_update = {
1032 "vim_status": "VIM_ERROR",
1033 "vim_message": str(e),
1034 }
1036 return "FAILED", ro_vim_item_update, db_task_update
1039class VimInteractionSdnNet(VimInteractionBase):
1040 @staticmethod
1041 def _match_pci(port_pci, mapping):
1042 """
1043 Check if port_pci matches with mapping.
1044 The mapping can have brackets to indicate that several chars are accepted. e.g
1045 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1046 :param port_pci: text
1047 :param mapping: text, can contain brackets to indicate several chars are available
1048 :return: True if matches, False otherwise
1049 """
1050 if not port_pci or not mapping:
1051 return False
1052 if port_pci == mapping:
1053 return True
1055 mapping_index = 0
1056 pci_index = 0
1057 while True:
1058 bracket_start = mapping.find("[", mapping_index)
1060 if bracket_start == -1:
1061 break
1063 bracket_end = mapping.find("]", bracket_start)
1064 if bracket_end == -1:
1065 break
1067 length = bracket_start - mapping_index
1068 if (
1069 length
1070 and port_pci[pci_index : pci_index + length]
1071 != mapping[mapping_index:bracket_start]
1072 ):
1073 return False
1075 if (
1076 port_pci[pci_index + length]
1077 not in mapping[bracket_start + 1 : bracket_end]
1078 ):
1079 return False
1081 pci_index += length + 1
1082 mapping_index = bracket_end + 1
1084 if port_pci[pci_index:] != mapping[mapping_index:]:
1085 return False
1087 return True
1089 def _get_interfaces(self, vlds_to_connect, vim_account_id):
1090 """
1091 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1092 :param vim_account_id:
1093 :return:
1094 """
1095 interfaces = []
1097 for vld in vlds_to_connect:
1098 table, _, db_id = vld.partition(":")
1099 db_id, _, vld = db_id.partition(":")
1100 _, _, vld_id = vld.partition(".")
1102 if table == "vnfrs":
1103 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1104 iface_key = "vnf-vld-id"
1105 else: # table == "nsrs"
1106 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1107 iface_key = "ns-vld-id"
1109 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1111 for db_vnfr in db_vnfrs:
1112 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1113 for iface_index, interface in enumerate(vdur["interfaces"]):
1114 if interface.get(iface_key) == vld_id and interface.get(
1115 "type"
1116 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1117 # only SR-IOV o PT
1118 interface_ = interface.copy()
1119 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1120 db_vnfr["_id"], vdu_index, iface_index
1121 )
1123 if vdur.get("status") == "ERROR":
1124 interface_["status"] = "ERROR"
1126 interfaces.append(interface_)
1128 return interfaces
1130 def refresh(self, ro_task):
1131 # look for task create
1132 task_create_index, _ = next(
1133 i_t
1134 for i_t in enumerate(ro_task["tasks"])
1135 if i_t[1]
1136 and i_t[1]["action"] == "CREATE"
1137 and i_t[1]["status"] != "FINISHED"
1138 )
1140 return self.new(ro_task, task_create_index, None)
1142 def new(self, ro_task, task_index, task_depends):
1143 task = ro_task["tasks"][task_index]
1144 task_id = task["task_id"]
1145 target_vim = self.my_vims[ro_task["target_id"]]
1147 sdn_net_id = ro_task["vim_info"]["vim_id"]
1149 created_items = ro_task["vim_info"].get("created_items")
1150 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1151 new_connected_ports = []
1152 last_update = ro_task["vim_info"].get("last_update", 0)
1153 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1154 error_list = []
1155 created = ro_task["vim_info"].get("created", False)
1157 try:
1158 # CREATE
1159 db_vim = {}
1160 params = task["params"]
1161 vlds_to_connect = params.get("vlds", [])
1162 associated_vim = params.get("target_vim")
1163 # external additional ports
1164 additional_ports = params.get("sdn-ports") or ()
1165 _, _, vim_account_id = (
1166 (None, None, None)
1167 if associated_vim is None
1168 else associated_vim.partition(":")
1169 )
1171 if associated_vim:
1172 # get associated VIM
1173 if associated_vim not in self.db_vims:
1174 self.db_vims[associated_vim] = self.db.get_one(
1175 "vim_accounts", {"_id": vim_account_id}
1176 )
1178 db_vim = self.db_vims[associated_vim]
1180 # look for ports to connect
1181 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1182 # print(ports)
1184 sdn_ports = []
1185 pending_ports = error_ports = 0
1186 vlan_used = None
1187 sdn_need_update = False
1189 for port in ports:
1190 vlan_used = port.get("vlan") or vlan_used
1192 # TODO. Do not connect if already done
1193 if not port.get("compute_node") or not port.get("pci"):
1194 if port.get("status") == "ERROR":
1195 error_ports += 1
1196 else:
1197 pending_ports += 1
1198 continue
1200 pmap = None
1201 compute_node_mappings = next(
1202 (
1203 c
1204 for c in db_vim["config"].get("sdn-port-mapping", ())
1205 if c and c["compute_node"] == port["compute_node"]
1206 ),
1207 None,
1208 )
1210 if compute_node_mappings:
1211 # process port_mapping pci of type 0000:af:1[01].[1357]
1212 pmap = next(
1213 (
1214 p
1215 for p in compute_node_mappings["ports"]
1216 if self._match_pci(port["pci"], p.get("pci"))
1217 ),
1218 None,
1219 )
1221 if not pmap:
1222 if not db_vim["config"].get("mapping_not_needed"):
1223 error_list.append(
1224 "Port mapping not found for compute_node={} pci={}".format(
1225 port["compute_node"], port["pci"]
1226 )
1227 )
1228 continue
1230 pmap = {}
1232 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1233 new_port = {
1234 "service_endpoint_id": pmap.get("service_endpoint_id")
1235 or service_endpoint_id,
1236 "service_endpoint_encapsulation_type": "dot1q"
1237 if port["type"] == "SR-IOV"
1238 else None,
1239 "service_endpoint_encapsulation_info": {
1240 "vlan": port.get("vlan"),
1241 "mac": port.get("mac-address"),
1242 "device_id": pmap.get("device_id") or port["compute_node"],
1243 "device_interface_id": pmap.get("device_interface_id")
1244 or port["pci"],
1245 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1246 "switch_port": pmap.get("switch_port"),
1247 "service_mapping_info": pmap.get("service_mapping_info"),
1248 },
1249 }
1251 # TODO
1252 # if port["modified_at"] > last_update:
1253 # sdn_need_update = True
1254 new_connected_ports.append(port["id"]) # TODO
1255 sdn_ports.append(new_port)
1257 if error_ports:
1258 error_list.append(
1259 "{} interfaces have not been created as VDU is on ERROR status".format(
1260 error_ports
1261 )
1262 )
1264 # connect external ports
1265 for index, additional_port in enumerate(additional_ports):
1266 additional_port_id = additional_port.get(
1267 "service_endpoint_id"
1268 ) or "external-{}".format(index)
1269 sdn_ports.append(
1270 {
1271 "service_endpoint_id": additional_port_id,
1272 "service_endpoint_encapsulation_type": additional_port.get(
1273 "service_endpoint_encapsulation_type", "dot1q"
1274 ),
1275 "service_endpoint_encapsulation_info": {
1276 "vlan": additional_port.get("vlan") or vlan_used,
1277 "mac": additional_port.get("mac_address"),
1278 "device_id": additional_port.get("device_id"),
1279 "device_interface_id": additional_port.get(
1280 "device_interface_id"
1281 ),
1282 "switch_dpid": additional_port.get("switch_dpid")
1283 or additional_port.get("switch_id"),
1284 "switch_port": additional_port.get("switch_port"),
1285 "service_mapping_info": additional_port.get(
1286 "service_mapping_info"
1287 ),
1288 },
1289 }
1290 )
1291 new_connected_ports.append(additional_port_id)
1292 sdn_info = ""
1294 # if there are more ports to connect or they have been modified, call create/update
1295 if error_list:
1296 sdn_status = "ERROR"
1297 sdn_info = "; ".join(error_list)
1298 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1299 last_update = time.time()
1301 if not sdn_net_id:
1302 if len(sdn_ports) < 2:
1303 sdn_status = "ACTIVE"
1305 if not pending_ports:
1306 self.logger.debug(
1307 "task={} {} new-sdn-net done, less than 2 ports".format(
1308 task_id, ro_task["target_id"]
1309 )
1310 )
1311 else:
1312 net_type = params.get("type") or "ELAN"
1313 (
1314 sdn_net_id,
1315 created_items,
1316 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1317 created = True
1318 self.logger.debug(
1319 "task={} {} new-sdn-net={} created={}".format(
1320 task_id, ro_task["target_id"], sdn_net_id, created
1321 )
1322 )
1323 else:
1324 created_items = target_vim.edit_connectivity_service(
1325 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1326 )
1327 created = True
1328 self.logger.debug(
1329 "task={} {} update-sdn-net={} created={}".format(
1330 task_id, ro_task["target_id"], sdn_net_id, created
1331 )
1332 )
1334 connected_ports = new_connected_ports
1335 elif sdn_net_id:
1336 wim_status_dict = target_vim.get_connectivity_service_status(
1337 sdn_net_id, conn_info=created_items
1338 )
1339 sdn_status = wim_status_dict["sdn_status"]
1341 if wim_status_dict.get("sdn_info"):
1342 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1344 if wim_status_dict.get("error_msg"):
1345 sdn_info = wim_status_dict.get("error_msg") or ""
1347 if pending_ports:
1348 if sdn_status != "ERROR":
1349 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1350 len(ports) - pending_ports, len(ports)
1351 )
1353 if sdn_status == "ACTIVE":
1354 sdn_status = "BUILD"
1356 ro_vim_item_update = {
1357 "vim_id": sdn_net_id,
1358 "vim_status": sdn_status,
1359 "created": created,
1360 "created_items": created_items,
1361 "connected_ports": connected_ports,
1362 "vim_details": sdn_info,
1363 "vim_message": None,
1364 "last_update": last_update,
1365 }
1367 return sdn_status, ro_vim_item_update
1368 except Exception as e:
1369 self.logger.error(
1370 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1371 exc_info=not isinstance(
1372 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1373 ),
1374 )
1375 ro_vim_item_update = {
1376 "vim_status": "VIM_ERROR",
1377 "created": created,
1378 "vim_message": str(e),
1379 }
1381 return "FAILED", ro_vim_item_update
1383 def delete(self, ro_task, task_index):
1384 task = ro_task["tasks"][task_index]
1385 task_id = task["task_id"]
1386 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1387 ro_vim_item_update_ok = {
1388 "vim_status": "DELETED",
1389 "created": False,
1390 "vim_message": "DELETED",
1391 "vim_id": None,
1392 }
1394 try:
1395 if sdn_vim_id:
1396 target_vim = self.my_vims[ro_task["target_id"]]
1397 target_vim.delete_connectivity_service(
1398 sdn_vim_id, ro_task["vim_info"].get("created_items")
1399 )
1401 except Exception as e:
1402 if (
1403 isinstance(e, sdnconn.SdnConnectorError)
1404 and e.http_code == HTTPStatus.NOT_FOUND.value
1405 ):
1406 ro_vim_item_update_ok["vim_message"] = "already deleted"
1407 else:
1408 self.logger.error(
1409 "ro_task={} vim={} del-sdn-net={}: {}".format(
1410 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1411 ),
1412 exc_info=not isinstance(
1413 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1414 ),
1415 )
1416 ro_vim_item_update = {
1417 "vim_status": "VIM_ERROR",
1418 "vim_message": "Error while deleting: {}".format(e),
1419 }
1421 return "FAILED", ro_vim_item_update
1423 self.logger.debug(
1424 "task={} {} del-sdn-net={} {}".format(
1425 task_id,
1426 ro_task["target_id"],
1427 sdn_vim_id,
1428 ro_vim_item_update_ok.get("vim_message", ""),
1429 )
1430 )
1432 return "DONE", ro_vim_item_update_ok
1435class VimInteractionMigration(VimInteractionBase):
1436 def exec(self, ro_task, task_index, task_depends):
1437 task = ro_task["tasks"][task_index]
1438 task_id = task["task_id"]
1439 db_task_update = {"retries": 0}
1440 target_vim = self.my_vims[ro_task["target_id"]]
1441 vim_interfaces = []
1442 refreshed_vim_info = {}
1444 try:
1445 vim_vm_id = ""
1446 if task.get("params"):
1447 vim_vm_id = task["params"].get("vim_vm_id")
1448 migrate_host = task["params"].get("migrate_host")
1449 _, migrated_compute_node = target_vim.migrate_instance(
1450 vim_vm_id, migrate_host
1451 )
1453 if migrated_compute_node:
1454 # When VM is migrated, vdu["vim_info"] needs to be updated
1455 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1456 ro_task["target_id"]
1457 )
1459 # Refresh VM to get new vim_info
1460 vm_to_refresh_list = [vim_vm_id]
1461 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1462 refreshed_vim_info = vim_dict[vim_vm_id]
1464 if refreshed_vim_info.get("interfaces"):
1465 for old_iface in vdu_old_vim_info.get("interfaces"):
1466 iface = next(
1467 (
1468 iface
1469 for iface in refreshed_vim_info["interfaces"]
1470 if old_iface["vim_interface_id"]
1471 == iface["vim_interface_id"]
1472 ),
1473 None,
1474 )
1475 vim_interfaces.append(iface)
1477 ro_vim_item_update = {
1478 "vim_id": vim_vm_id,
1479 "vim_status": "ACTIVE",
1480 "vim_details": None,
1481 "vim_message": None,
1482 }
1484 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1485 "ERROR",
1486 "VIM_ERROR",
1487 ):
1488 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1490 if vim_interfaces:
1491 ro_vim_item_update["interfaces"] = vim_interfaces
1493 self.logger.debug(
1494 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1495 )
1497 return "DONE", ro_vim_item_update, db_task_update
1499 except (vimconn.VimConnException, NsWorkerException) as e:
1500 self.logger.error(
1501 "task={} vim={} VM Migration:"
1502 " {}".format(task_id, ro_task["target_id"], e)
1503 )
1504 ro_vim_item_update = {
1505 "vim_status": "VIM_ERROR",
1506 "vim_message": str(e),
1507 }
1509 return "FAILED", ro_vim_item_update, db_task_update
1512class VimInteractionResize(VimInteractionBase):
1513 def exec(self, ro_task, task_index, task_depends):
1514 task = ro_task["tasks"][task_index]
1515 task_id = task["task_id"]
1516 db_task_update = {"retries": 0}
1517 target_flavor_uuid = None
1518 refreshed_vim_info = {}
1519 target_vim = self.my_vims[ro_task["target_id"]]
1521 try:
1522 vim_vm_id = ""
1523 if task.get("params"):
1524 vim_vm_id = task["params"].get("vim_vm_id")
1525 flavor_dict = task["params"].get("flavor_dict")
1526 self.logger.info("flavor_dict %s", flavor_dict)
1528 try:
1529 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1530 except Exception as e:
1531 self.logger.info("Cannot find any flavor matching %s.", str(e))
1532 try:
1533 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1534 except Exception as e:
1535 self.logger.error("Error creating flavor at VIM %s.", str(e))
1537 if target_flavor_uuid is not None:
1538 resized_status = target_vim.resize_instance(
1539 vim_vm_id, target_flavor_uuid
1540 )
1542 if resized_status:
1543 # Refresh VM to get new vim_info
1544 vm_to_refresh_list = [vim_vm_id]
1545 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1546 refreshed_vim_info = vim_dict[vim_vm_id]
1548 ro_vim_item_update = {
1549 "vim_id": vim_vm_id,
1550 "vim_status": "ACTIVE",
1551 "vim_details": None,
1552 "vim_message": None,
1553 }
1555 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1556 "ERROR",
1557 "VIM_ERROR",
1558 ):
1559 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1561 self.logger.debug(
1562 "task={} {} resize done".format(task_id, ro_task["target_id"])
1563 )
1564 return "DONE", ro_vim_item_update, db_task_update
1565 except (vimconn.VimConnException, NsWorkerException) as e:
1566 self.logger.error(
1567 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1568 )
1569 ro_vim_item_update = {
1570 "vim_status": "VIM_ERROR",
1571 "vim_message": str(e),
1572 }
1574 return "FAILED", ro_vim_item_update, db_task_update
1577class ConfigValidate:
1578 def __init__(self, config: Dict):
1579 self.conf = config
1581 @property
1582 def active(self):
1583 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1584 if (
1585 self.conf["period"]["refresh_active"] >= 60
1586 or self.conf["period"]["refresh_active"] == -1
1587 ):
1588 return self.conf["period"]["refresh_active"]
1590 return 60
1592 @property
1593 def build(self):
1594 return self.conf["period"]["refresh_build"]
1596 @property
1597 def image(self):
1598 return self.conf["period"]["refresh_image"]
1600 @property
1601 def error(self):
1602 return self.conf["period"]["refresh_error"]
1604 @property
1605 def queue_size(self):
1606 return self.conf["period"]["queue_size"]
1609class NsWorker(threading.Thread):
1610 def __init__(self, worker_index, config, plugins, db):
1611 """
1612 :param worker_index: thread index
1613 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1614 :param plugins: global shared dict with the loaded plugins
1615 :param db: database class instance to use
1616 """
1617 threading.Thread.__init__(self)
1618 self.config = config
1619 self.plugins = plugins
1620 self.plugin_name = "unknown"
1621 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1622 self.worker_index = worker_index
1623 # refresh periods for created items
1624 self.refresh_config = ConfigValidate(config)
1625 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1626 # targetvim: vimplugin class
1627 self.my_vims = {}
1628 # targetvim: vim information from database
1629 self.db_vims = {}
1630 # targetvim list
1631 self.vim_targets = []
1632 self.my_id = config["process_id"] + ":" + str(worker_index)
1633 self.db = db
1634 self.item2class = {
1635 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1636 "shared-volumes": VimInteractionSharedVolume(
1637 self.db, self.my_vims, self.db_vims, self.logger
1638 ),
1639 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1640 "image": VimInteractionImage(
1641 self.db, self.my_vims, self.db_vims, self.logger
1642 ),
1643 "flavor": VimInteractionFlavor(
1644 self.db, self.my_vims, self.db_vims, self.logger
1645 ),
1646 "sdn_net": VimInteractionSdnNet(
1647 self.db, self.my_vims, self.db_vims, self.logger
1648 ),
1649 "update": VimInteractionUpdateVdu(
1650 self.db, self.my_vims, self.db_vims, self.logger
1651 ),
1652 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1653 self.db, self.my_vims, self.db_vims, self.logger
1654 ),
1655 "migrate": VimInteractionMigration(
1656 self.db, self.my_vims, self.db_vims, self.logger
1657 ),
1658 "verticalscale": VimInteractionResize(
1659 self.db, self.my_vims, self.db_vims, self.logger
1660 ),
1661 }
1662 self.time_last_task_processed = None
1663 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1664 self.tasks_to_delete = []
1665 # it is idle when there are not vim_targets associated
1666 self.idle = True
1667 self.task_locked_time = config["global"]["task_locked_time"]
1669 def insert_task(self, task):
1670 try:
1671 self.task_queue.put(task, False)
1672 return None
1673 except queue.Full:
1674 raise NsWorkerException("timeout inserting a task")
1676 def terminate(self):
1677 self.insert_task("exit")
1679 def del_task(self, task):
1680 with self.task_lock:
1681 if task["status"] == "SCHEDULED":
1682 task["status"] = "SUPERSEDED"
1683 return True
1684 else: # task["status"] == "processing"
1685 self.task_lock.release()
1686 return False
1688 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1689 """
1690 Process vim config, creating vim configuration files as ca_cert
1691 :param target_id: vim/sdn/wim + id
1692 :param db_vim: Vim dictionary obtained from database
1693 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1694 """
1695 if not db_vim.get("config"):
1696 return
1698 file_name = ""
1699 work_dir = "/app/osm_ro/certs"
1701 try:
1702 if db_vim["config"].get("ca_cert_content"):
1703 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1705 if not path.isdir(file_name):
1706 makedirs(file_name)
1708 file_name = file_name + "/ca_cert"
1710 with open(file_name, "w") as f:
1711 f.write(db_vim["config"]["ca_cert_content"])
1712 del db_vim["config"]["ca_cert_content"]
1713 db_vim["config"]["ca_cert"] = file_name
1714 except Exception as e:
1715 raise NsWorkerException(
1716 "Error writing to file '{}': {}".format(file_name, e)
1717 )
1719 def _load_plugin(self, name, type="vim"):
1720 # type can be vim or sdn
1721 if "rovim_dummy" not in self.plugins:
1722 self.plugins["rovim_dummy"] = VimDummyConnector
1724 if "rosdn_dummy" not in self.plugins:
1725 self.plugins["rosdn_dummy"] = SdnDummyConnector
1727 if name in self.plugins:
1728 return self.plugins[name]
1730 try:
1731 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1732 self.plugins[name] = ep.load()
1733 except Exception as e:
1734 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1736 if name and name not in self.plugins:
1737 raise NsWorkerException(
1738 "Plugin 'osm_{n}' has not been installed".format(n=name)
1739 )
1741 return self.plugins[name]
1743 def _unload_vim(self, target_id):
1744 """
1745 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1746 :param target_id: Contains type:_id; where type can be 'vim', ...
1747 :return: None.
1748 """
1749 try:
1750 self.db_vims.pop(target_id, None)
1751 self.my_vims.pop(target_id, None)
1753 if target_id in self.vim_targets:
1754 self.vim_targets.remove(target_id)
1756 self.logger.info("Unloaded {}".format(target_id))
1757 except Exception as e:
1758 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1760 def _check_vim(self, target_id):
1761 """
1762 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1763 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1764 :return: None.
1765 """
1766 target, _, _id = target_id.partition(":")
1767 now = time.time()
1768 update_dict = {}
1769 unset_dict = {}
1770 op_text = ""
1771 step = ""
1772 loaded = target_id in self.vim_targets
1773 target_database = (
1774 "vim_accounts"
1775 if target == "vim"
1776 else "wim_accounts"
1777 if target == "wim"
1778 else "sdns"
1779 )
1780 error_text = ""
1782 try:
1783 step = "Getting {} from db".format(target_id)
1784 db_vim = self.db.get_one(target_database, {"_id": _id})
1786 for op_index, operation in enumerate(
1787 db_vim["_admin"].get("operations", ())
1788 ):
1789 if operation["operationState"] != "PROCESSING":
1790 continue
1792 locked_at = operation.get("locked_at")
1794 if locked_at is not None and locked_at >= now - self.task_locked_time:
1795 # some other thread is doing this operation
1796 return
1798 # lock
1799 op_text = "_admin.operations.{}.".format(op_index)
1801 if not self.db.set_one(
1802 target_database,
1803 q_filter={
1804 "_id": _id,
1805 op_text + "operationState": "PROCESSING",
1806 op_text + "locked_at": locked_at,
1807 },
1808 update_dict={
1809 op_text + "locked_at": now,
1810 "admin.current_operation": op_index,
1811 },
1812 fail_on_empty=False,
1813 ):
1814 return
1816 unset_dict[op_text + "locked_at"] = None
1817 unset_dict["current_operation"] = None
1818 step = "Loading " + target_id
1819 error_text = self._load_vim(target_id)
1821 if not error_text:
1822 step = "Checking connectivity"
1824 if target == "vim":
1825 self.my_vims[target_id].check_vim_connectivity()
1826 else:
1827 self.my_vims[target_id].check_credentials()
1829 update_dict["_admin.operationalState"] = "ENABLED"
1830 update_dict["_admin.detailed-status"] = ""
1831 unset_dict[op_text + "detailed-status"] = None
1832 update_dict[op_text + "operationState"] = "COMPLETED"
1834 return
1836 except Exception as e:
1837 error_text = "{}: {}".format(step, e)
1838 self.logger.error("{} for {}: {}".format(step, target_id, e))
1840 finally:
1841 if update_dict or unset_dict:
1842 if error_text:
1843 update_dict[op_text + "operationState"] = "FAILED"
1844 update_dict[op_text + "detailed-status"] = error_text
1845 unset_dict.pop(op_text + "detailed-status", None)
1846 update_dict["_admin.operationalState"] = "ERROR"
1847 update_dict["_admin.detailed-status"] = error_text
1849 if op_text:
1850 update_dict[op_text + "statusEnteredTime"] = now
1852 self.db.set_one(
1853 target_database,
1854 q_filter={"_id": _id},
1855 update_dict=update_dict,
1856 unset=unset_dict,
1857 fail_on_empty=False,
1858 )
1860 if not loaded:
1861 self._unload_vim(target_id)
1863 def _reload_vim(self, target_id):
1864 if target_id in self.vim_targets:
1865 self._load_vim(target_id)
1866 else:
1867 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1868 # just remove it to force load again next time it is needed
1869 self.db_vims.pop(target_id, None)
1871 def _load_vim(self, target_id):
1872 """
1873 Load or reload a vim_account, sdn_controller or wim_account.
1874 Read content from database, load the plugin if not loaded.
1875 In case of error loading the plugin, it loads a failing VIM_connector
1876 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1877 :param target_id: Contains type:_id; where type can be 'vim', ...
1878 :return: None if ok, descriptive text if error
1879 """
1880 target, _, _id = target_id.partition(":")
1881 target_database = (
1882 "vim_accounts"
1883 if target == "vim"
1884 else "wim_accounts"
1885 if target == "wim"
1886 else "sdns"
1887 )
1888 plugin_name = ""
1889 vim = None
1890 step = "Getting {}={} from db".format(target, _id)
1892 try:
1893 # TODO process for wim, sdnc, ...
1894 vim = self.db.get_one(target_database, {"_id": _id})
1896 # if deep_get(vim, "config", "sdn-controller"):
1897 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1898 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1900 step = "Decrypting password"
1901 schema_version = vim.get("schema_version")
1902 self.db.encrypt_decrypt_fields(
1903 vim,
1904 "decrypt",
1905 fields=("password", "secret"),
1906 schema_version=schema_version,
1907 salt=_id,
1908 )
1909 self._process_vim_config(target_id, vim)
1911 if target == "vim":
1912 plugin_name = "rovim_" + vim["vim_type"]
1913 step = "Loading plugin '{}'".format(plugin_name)
1914 vim_module_conn = self._load_plugin(plugin_name)
1915 step = "Loading {}'".format(target_id)
1916 self.my_vims[target_id] = vim_module_conn(
1917 uuid=vim["_id"],
1918 name=vim["name"],
1919 tenant_id=vim.get("vim_tenant_id"),
1920 tenant_name=vim.get("vim_tenant_name"),
1921 url=vim["vim_url"],
1922 url_admin=None,
1923 user=vim["vim_user"],
1924 passwd=vim["vim_password"],
1925 config=vim.get("config") or {},
1926 persistent_info={},
1927 )
1928 else: # sdn
1929 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1930 step = "Loading plugin '{}'".format(plugin_name)
1931 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1932 step = "Loading {}'".format(target_id)
1933 wim = deepcopy(vim)
1934 wim_config = wim.pop("config", {}) or {}
1935 wim["uuid"] = wim["_id"]
1936 if "url" in wim and "wim_url" not in wim:
1937 wim["wim_url"] = wim["url"]
1938 elif "url" not in wim and "wim_url" in wim:
1939 wim["url"] = wim["wim_url"]
1941 if wim.get("dpid"):
1942 wim_config["dpid"] = wim.pop("dpid")
1944 if wim.get("switch_id"):
1945 wim_config["switch_id"] = wim.pop("switch_id")
1947 # wim, wim_account, config
1948 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1949 self.db_vims[target_id] = vim
1950 self.error_status = None
1952 self.logger.info(
1953 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1954 )
1955 except Exception as e:
1956 self.logger.error(
1957 "Cannot load {} plugin={}: {} {}".format(
1958 target_id, plugin_name, step, e
1959 )
1960 )
1962 self.db_vims[target_id] = vim or {}
1963 self.db_vims[target_id] = FailingConnector(str(e))
1964 error_status = "{} Error: {}".format(step, e)
1966 return error_status
1967 finally:
1968 if target_id not in self.vim_targets:
1969 self.vim_targets.append(target_id)
1971 def _get_db_task(self):
1972 """
1973 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1974 :return: None
1975 """
1976 now = time.time()
1978 if not self.time_last_task_processed:
1979 self.time_last_task_processed = now
1981 try:
1982 while True:
1983 """
1984 # Log RO tasks only when loglevel is DEBUG
1985 if self.logger.getEffectiveLevel() == logging.DEBUG:
1986 self._log_ro_task(
1987 None,
1988 None,
1989 None,
1990 "TASK_WF",
1991 "task_locked_time="
1992 + str(self.task_locked_time)
1993 + " "
1994 + "time_last_task_processed="
1995 + str(self.time_last_task_processed)
1996 + " "
1997 + "now="
1998 + str(now),
1999 )
2000 """
2001 locked = self.db.set_one(
2002 "ro_tasks",
2003 q_filter={
2004 "target_id": self.vim_targets,
2005 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2006 "locked_at.lt": now - self.task_locked_time,
2007 "to_check_at.lt": self.time_last_task_processed,
2008 "to_check_at.gt": -1,
2009 },
2010 update_dict={"locked_by": self.my_id, "locked_at": now},
2011 fail_on_empty=False,
2012 )
2014 if locked:
2015 # read and return
2016 ro_task = self.db.get_one(
2017 "ro_tasks",
2018 q_filter={
2019 "target_id": self.vim_targets,
2020 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2021 "locked_at": now,
2022 },
2023 )
2024 return ro_task
2026 if self.time_last_task_processed == now:
2027 self.time_last_task_processed = None
2028 return None
2029 else:
2030 self.time_last_task_processed = now
2031 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2033 except DbException as e:
2034 self.logger.error("Database exception at _get_db_task: {}".format(e))
2035 except Exception as e:
2036 self.logger.critical(
2037 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
2038 )
2040 return None
2042 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2043 """
2044 Determine if this task need to be done or superseded
2045 :return: None
2046 """
2047 my_task = ro_task["tasks"][task_index]
2048 task_id = my_task["task_id"]
2049 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2050 "created_items", False
2051 )
2053 self.logger.debug("Needed delete: {}".format(needed_delete))
2054 if my_task["status"] == "FAILED":
2055 return None, None # TODO need to be retry??
2057 try:
2058 for index, task in enumerate(ro_task["tasks"]):
2059 if index == task_index or not task:
2060 continue # own task
2062 if (
2063 my_task["target_record"] == task["target_record"]
2064 and task["action"] == "CREATE"
2065 ):
2066 # set to finished
2067 db_update["tasks.{}.status".format(index)] = task[
2068 "status"
2069 ] = "FINISHED"
2070 elif task["action"] == "CREATE" and task["status"] not in (
2071 "FINISHED",
2072 "SUPERSEDED",
2073 ):
2074 needed_delete = False
2076 if needed_delete:
2077 self.logger.debug(
2078 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2079 )
2080 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2081 else:
2082 return "SUPERSEDED", None
2083 except Exception as e:
2084 if not isinstance(e, NsWorkerException):
2085 self.logger.critical(
2086 "Unexpected exception at _delete_task task={}: {}".format(
2087 task_id, e
2088 ),
2089 exc_info=True,
2090 )
2092 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2094 def _create_task(self, ro_task, task_index, task_depends, db_update):
2095 """
2096 Determine if this task need to create something at VIM
2097 :return: None
2098 """
2099 my_task = ro_task["tasks"][task_index]
2100 task_id = my_task["task_id"]
2102 if my_task["status"] == "FAILED":
2103 return None, None # TODO need to be retry??
2104 elif my_task["status"] == "SCHEDULED":
2105 # check if already created by another task
2106 for index, task in enumerate(ro_task["tasks"]):
2107 if index == task_index or not task:
2108 continue # own task
2110 if task["action"] == "CREATE" and task["status"] not in (
2111 "SCHEDULED",
2112 "FINISHED",
2113 "SUPERSEDED",
2114 ):
2115 return task["status"], "COPY_VIM_INFO"
2117 try:
2118 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2119 ro_task, task_index, task_depends
2120 )
2121 # TODO update other CREATE tasks
2122 except Exception as e:
2123 if not isinstance(e, NsWorkerException):
2124 self.logger.error(
2125 "Error executing task={}: {}".format(task_id, e), exc_info=True
2126 )
2128 task_status = "FAILED"
2129 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2130 # TODO update ro_vim_item_update
2132 return task_status, ro_vim_item_update
2133 else:
2134 return None, None
2136 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2137 """
2138 Look for dependency task
2139 :param task_id: Can be one of
2140 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2141 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2142 3. task.task_id: "<action_id>:number"
2143 :param ro_task:
2144 :param target_id:
2145 :return: database ro_task plus index of task
2146 """
2147 if (
2148 task_id.startswith("vim:")
2149 or task_id.startswith("sdn:")
2150 or task_id.startswith("wim:")
2151 ):
2152 target_id, _, task_id = task_id.partition(" ")
2154 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2155 ro_task_dependency = self.db.get_one(
2156 "ro_tasks",
2157 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2158 fail_on_empty=False,
2159 )
2161 if ro_task_dependency:
2162 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2163 if task["target_record_id"] == task_id:
2164 return ro_task_dependency, task_index
2166 else:
2167 if ro_task:
2168 for task_index, task in enumerate(ro_task["tasks"]):
2169 if task and task["task_id"] == task_id:
2170 return ro_task, task_index
2172 ro_task_dependency = self.db.get_one(
2173 "ro_tasks",
2174 q_filter={
2175 "tasks.ANYINDEX.task_id": task_id,
2176 "tasks.ANYINDEX.target_record.ne": None,
2177 },
2178 fail_on_empty=False,
2179 )
2181 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2182 if ro_task_dependency:
2183 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2184 if task["task_id"] == task_id:
2185 return ro_task_dependency, task_index
2186 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2188 def update_vm_refresh(self, ro_task):
2189 """Enables the VM status updates if self.refresh_config.active parameter
2190 is not -1 and then updates the DB accordingly
2192 """
2193 try:
2194 self.logger.debug("Checking if VM status update config")
2195 next_refresh = time.time()
2196 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2198 if next_refresh != -1:
2199 db_ro_task_update = {}
2200 now = time.time()
2201 next_check_at = now + (24 * 60 * 60)
2202 next_check_at = min(next_check_at, next_refresh)
2203 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2204 db_ro_task_update["to_check_at"] = next_check_at
2206 self.logger.debug(
2207 "Finding tasks which to be updated to enable VM status updates"
2208 )
2209 refresh_tasks = self.db.get_list(
2210 "ro_tasks",
2211 q_filter={
2212 "tasks.status": "DONE",
2213 "to_check_at.lt": 0,
2214 },
2215 )
2216 self.logger.debug("Updating tasks to change the to_check_at status")
2217 for task in refresh_tasks:
2218 q_filter = {
2219 "_id": task["_id"],
2220 }
2221 self.db.set_one(
2222 "ro_tasks",
2223 q_filter=q_filter,
2224 update_dict=db_ro_task_update,
2225 fail_on_empty=True,
2226 )
2228 except Exception as e:
2229 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2231 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2232 """Decide the next_refresh according to vim type and refresh config period.
2233 Args:
2234 ro_task (dict): ro_task details
2235 next_refresh (float): next refresh time as epoch format
2237 Returns:
2238 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2239 """
2240 target_vim = ro_task["target_id"]
2241 vim_type = self.db_vims[target_vim]["vim_type"]
2242 if self.refresh_config.active == -1 or vim_type == "openstack":
2243 next_refresh = -1
2244 else:
2245 next_refresh += self.refresh_config.active
2246 return next_refresh
2248 def _process_pending_tasks(self, ro_task):
2249 ro_task_id = ro_task["_id"]
2250 now = time.time()
2251 # one day
2252 next_check_at = now + (24 * 60 * 60)
2253 db_ro_task_update = {}
2255 def _update_refresh(new_status):
2256 # compute next_refresh
2257 nonlocal task
2258 nonlocal next_check_at
2259 nonlocal db_ro_task_update
2260 nonlocal ro_task
2262 next_refresh = time.time()
2264 if task["item"] in ("image", "flavor"):
2265 next_refresh += self.refresh_config.image
2266 elif new_status == "BUILD":
2267 next_refresh += self.refresh_config.build
2268 elif new_status == "DONE":
2269 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2270 else:
2271 next_refresh += self.refresh_config.error
2273 next_check_at = min(next_check_at, next_refresh)
2274 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2275 ro_task["vim_info"]["refresh_at"] = next_refresh
2277 try:
2278 """
2279 # Log RO tasks only when loglevel is DEBUG
2280 if self.logger.getEffectiveLevel() == logging.DEBUG:
2281 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2282 """
2283 # Check if vim status refresh is enabled again
2284 self.update_vm_refresh(ro_task)
2285 # 0: get task_status_create
2286 lock_object = None
2287 task_status_create = None
2288 task_create = next(
2289 (
2290 t
2291 for t in ro_task["tasks"]
2292 if t
2293 and t["action"] == "CREATE"
2294 and t["status"] in ("BUILD", "DONE")
2295 ),
2296 None,
2297 )
2299 if task_create:
2300 task_status_create = task_create["status"]
2302 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2303 for task_action in ("DELETE", "CREATE", "EXEC"):
2304 db_vim_update = None
2305 new_status = None
2307 for task_index, task in enumerate(ro_task["tasks"]):
2308 if not task:
2309 continue # task deleted
2311 task_depends = {}
2312 target_update = None
2314 if (
2315 (
2316 task_action in ("DELETE", "EXEC")
2317 and task["status"] not in ("SCHEDULED", "BUILD")
2318 )
2319 or task["action"] != task_action
2320 or (
2321 task_action == "CREATE"
2322 and task["status"] in ("FINISHED", "SUPERSEDED")
2323 )
2324 ):
2325 continue
2327 task_path = "tasks.{}.status".format(task_index)
2328 try:
2329 db_vim_info_update = None
2330 dependency_ro_task = {}
2332 if task["status"] == "SCHEDULED":
2333 # check if tasks that this depends on have been completed
2334 dependency_not_completed = False
2336 for dependency_task_id in task.get("depends_on") or ():
2337 (
2338 dependency_ro_task,
2339 dependency_task_index,
2340 ) = self._get_dependency(
2341 dependency_task_id, target_id=ro_task["target_id"]
2342 )
2343 dependency_task = dependency_ro_task["tasks"][
2344 dependency_task_index
2345 ]
2346 self.logger.debug(
2347 "dependency_ro_task={} dependency_task_index={}".format(
2348 dependency_ro_task, dependency_task_index
2349 )
2350 )
2352 if dependency_task["status"] == "SCHEDULED":
2353 dependency_not_completed = True
2354 next_check_at = min(
2355 next_check_at, dependency_ro_task["to_check_at"]
2356 )
2357 # must allow dependent task to be processed first
2358 # to do this set time after last_task_processed
2359 next_check_at = max(
2360 self.time_last_task_processed, next_check_at
2361 )
2362 break
2363 elif dependency_task["status"] == "FAILED":
2364 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2365 task["action"],
2366 task["item"],
2367 dependency_task["action"],
2368 dependency_task["item"],
2369 dependency_task_id,
2370 dependency_ro_task["vim_info"].get(
2371 "vim_message"
2372 ),
2373 )
2374 self.logger.error(
2375 "task={} {}".format(task["task_id"], error_text)
2376 )
2377 raise NsWorkerException(error_text)
2379 task_depends[dependency_task_id] = dependency_ro_task[
2380 "vim_info"
2381 ]["vim_id"]
2382 task_depends[
2383 "TASK-{}".format(dependency_task_id)
2384 ] = dependency_ro_task["vim_info"]["vim_id"]
2386 if dependency_not_completed:
2387 self.logger.warning(
2388 "DEPENDENCY NOT COMPLETED {}".format(
2389 dependency_ro_task["vim_info"]["vim_id"]
2390 )
2391 )
2392 # TODO set at vim_info.vim_details that it is waiting
2393 continue
2395 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2396 # the task of renew this locking. It will update database locket_at periodically
2397 if not lock_object:
2398 lock_object = LockRenew.add_lock_object(
2399 "ro_tasks", ro_task, self
2400 )
2401 if task["action"] == "DELETE":
2402 (
2403 new_status,
2404 db_vim_info_update,
2405 ) = self._delete_task(
2406 ro_task, task_index, task_depends, db_ro_task_update
2407 )
2408 new_status = (
2409 "FINISHED" if new_status == "DONE" else new_status
2410 )
2411 # ^with FINISHED instead of DONE it will not be refreshing
2413 if new_status in ("FINISHED", "SUPERSEDED"):
2414 target_update = "DELETE"
2415 elif task["action"] == "EXEC":
2416 (
2417 new_status,
2418 db_vim_info_update,
2419 db_task_update,
2420 ) = self.item2class[task["item"]].exec(
2421 ro_task, task_index, task_depends
2422 )
2423 new_status = (
2424 "FINISHED" if new_status == "DONE" else new_status
2425 )
2426 # ^with FINISHED instead of DONE it will not be refreshing
2428 if db_task_update:
2429 # load into database the modified db_task_update "retries" and "next_retry"
2430 if db_task_update.get("retries"):
2431 db_ro_task_update[
2432 "tasks.{}.retries".format(task_index)
2433 ] = db_task_update["retries"]
2435 next_check_at = time.time() + db_task_update.get(
2436 "next_retry", 60
2437 )
2438 target_update = None
2439 elif task["action"] == "CREATE":
2440 if task["status"] == "SCHEDULED":
2441 if task_status_create:
2442 new_status = task_status_create
2443 target_update = "COPY_VIM_INFO"
2444 else:
2445 new_status, db_vim_info_update = self.item2class[
2446 task["item"]
2447 ].new(ro_task, task_index, task_depends)
2448 _update_refresh(new_status)
2449 else:
2450 refresh_at = ro_task["vim_info"]["refresh_at"]
2451 if refresh_at and refresh_at != -1 and now > refresh_at:
2452 (
2453 new_status,
2454 db_vim_info_update,
2455 ) = self.item2class[
2456 task["item"]
2457 ].refresh(ro_task)
2458 _update_refresh(new_status)
2459 else:
2460 # The refresh is updated to avoid set the value of "refresh_at" to
2461 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2462 # because it can happen that in this case the task is never processed
2463 _update_refresh(task["status"])
2465 except Exception as e:
2466 new_status = "FAILED"
2467 db_vim_info_update = {
2468 "vim_status": "VIM_ERROR",
2469 "vim_message": str(e),
2470 }
2472 if not isinstance(
2473 e, (NsWorkerException, vimconn.VimConnException)
2474 ):
2475 self.logger.error(
2476 "Unexpected exception at _delete_task task={}: {}".format(
2477 task["task_id"], e
2478 ),
2479 exc_info=True,
2480 )
2482 try:
2483 if db_vim_info_update:
2484 db_vim_update = db_vim_info_update.copy()
2485 db_ro_task_update.update(
2486 {
2487 "vim_info." + k: v
2488 for k, v in db_vim_info_update.items()
2489 }
2490 )
2491 ro_task["vim_info"].update(db_vim_info_update)
2493 if new_status:
2494 if task_action == "CREATE":
2495 task_status_create = new_status
2496 db_ro_task_update[task_path] = new_status
2498 if target_update or db_vim_update:
2499 if target_update == "DELETE":
2500 self._update_target(task, None)
2501 elif target_update == "COPY_VIM_INFO":
2502 self._update_target(task, ro_task["vim_info"])
2503 else:
2504 self._update_target(task, db_vim_update)
2506 except Exception as e:
2507 if (
2508 isinstance(e, DbException)
2509 and e.http_code == HTTPStatus.NOT_FOUND
2510 ):
2511 # if the vnfrs or nsrs has been removed from database, this task must be removed
2512 self.logger.debug(
2513 "marking to delete task={}".format(task["task_id"])
2514 )
2515 self.tasks_to_delete.append(task)
2516 else:
2517 self.logger.error(
2518 "Unexpected exception at _update_target task={}: {}".format(
2519 task["task_id"], e
2520 ),
2521 exc_info=True,
2522 )
2524 locked_at = ro_task["locked_at"]
2526 if lock_object:
2527 locked_at = [
2528 lock_object["locked_at"],
2529 lock_object["locked_at"] + self.task_locked_time,
2530 ]
2531 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2532 # contain exactly locked_at + self.task_locked_time
2533 LockRenew.remove_lock_object(lock_object)
2535 q_filter = {
2536 "_id": ro_task["_id"],
2537 "to_check_at": ro_task["to_check_at"],
2538 "locked_at": locked_at,
2539 }
2540 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2541 # outside this task (by ro_nbi) do not update it
2542 db_ro_task_update["locked_by"] = None
2543 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2544 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2545 db_ro_task_update["modified_at"] = now
2546 db_ro_task_update["to_check_at"] = next_check_at
2548 """
2549 # Log RO tasks only when loglevel is DEBUG
2550 if self.logger.getEffectiveLevel() == logging.DEBUG:
2551 db_ro_task_update_log = db_ro_task_update.copy()
2552 db_ro_task_update_log["_id"] = q_filter["_id"]
2553 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2554 """
2556 if not self.db.set_one(
2557 "ro_tasks",
2558 update_dict=db_ro_task_update,
2559 q_filter=q_filter,
2560 fail_on_empty=False,
2561 ):
2562 del db_ro_task_update["to_check_at"]
2563 del q_filter["to_check_at"]
2564 """
2565 # Log RO tasks only when loglevel is DEBUG
2566 if self.logger.getEffectiveLevel() == logging.DEBUG:
2567 self._log_ro_task(
2568 None,
2569 db_ro_task_update_log,
2570 None,
2571 "TASK_WF",
2572 "SET_TASK " + str(q_filter),
2573 )
2574 """
2575 self.db.set_one(
2576 "ro_tasks",
2577 q_filter=q_filter,
2578 update_dict=db_ro_task_update,
2579 fail_on_empty=True,
2580 )
2581 except DbException as e:
2582 self.logger.error(
2583 "ro_task={} Error updating database {}".format(ro_task_id, e)
2584 )
2585 except Exception as e:
2586 self.logger.error(
2587 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2588 )
2590 def _update_target(self, task, ro_vim_item_update):
2591 table, _, temp = task["target_record"].partition(":")
2592 _id, _, path_vim_status = temp.partition(":")
2593 path_item = path_vim_status[: path_vim_status.rfind(".")]
2594 path_item = path_item[: path_item.rfind(".")]
2595 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2596 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2598 if ro_vim_item_update:
2599 update_dict = {
2600 path_vim_status + "." + k: v
2601 for k, v in ro_vim_item_update.items()
2602 if k
2603 in (
2604 "vim_id",
2605 "vim_details",
2606 "vim_message",
2607 "vim_name",
2608 "vim_status",
2609 "interfaces",
2610 "interfaces_backup",
2611 )
2612 }
2614 if path_vim_status.startswith("vdur."):
2615 # for backward compatibility, add vdur.name apart from vdur.vim_name
2616 if ro_vim_item_update.get("vim_name"):
2617 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2619 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2620 if ro_vim_item_update.get("vim_id"):
2621 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2623 # update general status
2624 if ro_vim_item_update.get("vim_status"):
2625 update_dict[path_item + ".status"] = ro_vim_item_update[
2626 "vim_status"
2627 ]
2629 if ro_vim_item_update.get("interfaces"):
2630 path_interfaces = path_item + ".interfaces"
2632 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2633 if iface:
2634 update_dict.update(
2635 {
2636 path_interfaces + ".{}.".format(i) + k: v
2637 for k, v in iface.items()
2638 if k in ("vlan", "compute_node", "pci")
2639 }
2640 )
2642 # put ip_address and mac_address with ip-address and mac-address
2643 if iface.get("ip_address"):
2644 update_dict[
2645 path_interfaces + ".{}.".format(i) + "ip-address"
2646 ] = iface["ip_address"]
2648 if iface.get("mac_address"):
2649 update_dict[
2650 path_interfaces + ".{}.".format(i) + "mac-address"
2651 ] = iface["mac_address"]
2653 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2654 update_dict["ip-address"] = iface.get("ip_address").split(
2655 ";"
2656 )[0]
2658 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2659 update_dict[path_item + ".ip-address"] = iface.get(
2660 "ip_address"
2661 ).split(";")[0]
2663 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2665 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2666 if ro_vim_item_update.get("interfaces"):
2667 search_key = path_vim_status + ".interfaces"
2668 if update_dict.get(search_key):
2669 interfaces_backup_update = {
2670 path_vim_status + ".interfaces_backup": update_dict[search_key]
2671 }
2673 self.db.set_one(
2674 table,
2675 q_filter={"_id": _id},
2676 update_dict=interfaces_backup_update,
2677 )
2679 else:
2680 update_dict = {path_item + ".status": "DELETED"}
2681 self.db.set_one(
2682 table,
2683 q_filter={"_id": _id},
2684 update_dict=update_dict,
2685 unset={path_vim_status: None},
2686 )
2688 def _process_delete_db_tasks(self):
2689 """
2690 Delete task from database because vnfrs or nsrs or both have been deleted
2691 :return: None. Uses and modify self.tasks_to_delete
2692 """
2693 while self.tasks_to_delete:
2694 task = self.tasks_to_delete[0]
2695 vnfrs_deleted = None
2696 nsr_id = task["nsr_id"]
2698 if task["target_record"].startswith("vnfrs:"):
2699 # check if nsrs is present
2700 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2701 vnfrs_deleted = task["target_record"].split(":")[1]
2703 try:
2704 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2705 except Exception as e:
2706 self.logger.error(
2707 "Error deleting task={}: {}".format(task["task_id"], e)
2708 )
2709 self.tasks_to_delete.pop(0)
2711 @staticmethod
2712 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2713 """
2714 Static method because it is called from osm_ng_ro.ns
2715 :param db: instance of database to use
2716 :param nsr_id: affected nsrs id
2717 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2718 :return: None, exception is fails
2719 """
2720 retries = 5
2721 for retry in range(retries):
2722 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2723 now = time.time()
2724 conflict = False
2726 for ro_task in ro_tasks:
2727 db_update = {}
2728 to_delete_ro_task = True
2730 for index, task in enumerate(ro_task["tasks"]):
2731 if not task:
2732 pass
2733 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2734 vnfrs_deleted
2735 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2736 ):
2737 db_update["tasks.{}".format(index)] = None
2738 else:
2739 # used by other nsr, ro_task cannot be deleted
2740 to_delete_ro_task = False
2742 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2743 if to_delete_ro_task:
2744 if not db.del_one(
2745 "ro_tasks",
2746 q_filter={
2747 "_id": ro_task["_id"],
2748 "modified_at": ro_task["modified_at"],
2749 },
2750 fail_on_empty=False,
2751 ):
2752 conflict = True
2753 elif db_update:
2754 db_update["modified_at"] = now
2755 if not db.set_one(
2756 "ro_tasks",
2757 q_filter={
2758 "_id": ro_task["_id"],
2759 "modified_at": ro_task["modified_at"],
2760 },
2761 update_dict=db_update,
2762 fail_on_empty=False,
2763 ):
2764 conflict = True
2765 if not conflict:
2766 return
2767 else:
2768 raise NsWorkerException("Exceeded {} retries".format(retries))
2770 def run(self):
2771 # load database
2772 self.logger.info("Starting")
2773 while True:
2774 # step 1: get commands from queue
2775 try:
2776 if self.vim_targets:
2777 task = self.task_queue.get(block=False)
2778 else:
2779 if not self.idle:
2780 self.logger.debug("enters in idle state")
2781 self.idle = True
2782 task = self.task_queue.get(block=True)
2783 self.idle = False
2785 if task[0] == "terminate":
2786 break
2787 elif task[0] == "load_vim":
2788 self.logger.info("order to load vim {}".format(task[1]))
2789 self._load_vim(task[1])
2790 elif task[0] == "unload_vim":
2791 self.logger.info("order to unload vim {}".format(task[1]))
2792 self._unload_vim(task[1])
2793 elif task[0] == "reload_vim":
2794 self._reload_vim(task[1])
2795 elif task[0] == "check_vim":
2796 self.logger.info("order to check vim {}".format(task[1]))
2797 self._check_vim(task[1])
2798 continue
2799 except Exception as e:
2800 if isinstance(e, queue.Empty):
2801 pass
2802 else:
2803 self.logger.critical(
2804 "Error processing task: {}".format(e), exc_info=True
2805 )
2807 # step 2: process pending_tasks, delete not needed tasks
2808 try:
2809 if self.tasks_to_delete:
2810 self._process_delete_db_tasks()
2811 busy = False
2812 """
2813 # Log RO tasks only when loglevel is DEBUG
2814 if self.logger.getEffectiveLevel() == logging.DEBUG:
2815 _ = self._get_db_all_tasks()
2816 """
2817 ro_task = self._get_db_task()
2818 if ro_task:
2819 self.logger.debug("Task to process: {}".format(ro_task))
2820 time.sleep(1)
2821 self._process_pending_tasks(ro_task)
2822 busy = True
2823 if not busy:
2824 time.sleep(5)
2825 except Exception as e:
2826 self.logger.critical(
2827 "Unexpected exception at run: " + str(e), exc_info=True
2828 )
2830 self.logger.info("Finishing")