1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
30 from os
import makedirs
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
43 from osm_ro_plugin
import vimconn
44 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
45 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
219 "task={} {} new-net={} created={}".format(
220 task_id
, ro_task
["target_id"], vim_net_id
, created
224 return "BUILD", ro_vim_item_update
225 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
227 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
229 ro_vim_item_update
= {
230 "vim_status": "VIM_ERROR",
232 "vim_message": str(e
),
235 return "FAILED", ro_vim_item_update
237 def refresh(self
, ro_task
):
238 """Call VIM to get network status"""
239 ro_task_id
= ro_task
["_id"]
240 target_vim
= self
.my_vims
[ro_task
["target_id"]]
241 vim_id
= ro_task
["vim_info"]["vim_id"]
242 net_to_refresh_list
= [vim_id
]
245 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
246 vim_info
= vim_dict
[vim_id
]
248 if vim_info
["status"] == "ACTIVE":
250 elif vim_info
["status"] == "BUILD":
251 task_status
= "BUILD"
253 task_status
= "FAILED"
254 except vimconn
.VimConnException
as e
:
255 # Mark all tasks at VIM_ERROR status
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id
, ro_task
["target_id"], vim_id
, e
261 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
262 task_status
= "FAILED"
264 ro_vim_item_update
= {}
265 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
266 ro_vim_item_update
["vim_status"] = vim_info
["status"]
268 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
269 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
271 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
273 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
274 elif vim_info
["status"] == "DELETED":
275 ro_vim_item_update
["vim_id"] = None
276 ro_vim_item_update
["vim_message"] = "Deleted externally"
278 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
279 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
281 if ro_vim_item_update
:
283 "ro_task={} {} get-net={}: status={} {}".format(
285 ro_task
["target_id"],
287 ro_vim_item_update
.get("vim_status"),
288 ro_vim_item_update
.get("vim_message")
289 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
294 return task_status
, ro_vim_item_update
296 def delete(self
, ro_task
, task_index
):
297 task
= ro_task
["tasks"][task_index
]
298 task_id
= task
["task_id"]
299 net_vim_id
= ro_task
["vim_info"]["vim_id"]
300 ro_vim_item_update_ok
= {
301 "vim_status": "DELETED",
303 "vim_message": "DELETED",
308 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
309 target_vim
= self
.my_vims
[ro_task
["target_id"]]
310 target_vim
.delete_network(
311 net_vim_id
, ro_task
["vim_info"]["created_items"]
313 except vimconn
.VimConnNotFoundException
:
314 ro_vim_item_update_ok
["vim_message"] = "already deleted"
315 except vimconn
.VimConnException
as e
:
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
321 ro_vim_item_update
= {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e
),
326 return "FAILED", ro_vim_item_update
329 "task={} {} del-net={} {}".format(
331 ro_task
["target_id"],
333 ro_vim_item_update_ok
.get("vim_message", ""),
337 return "DONE", ro_vim_item_update_ok
340 class VimInteractionVdu(VimInteractionBase
):
341 max_retries_inject_ssh_key
= 20 # 20 times
342 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
344 def new(self
, ro_task
, task_index
, task_depends
):
345 task
= ro_task
["tasks"][task_index
]
346 task_id
= task
["task_id"]
348 target_vim
= self
.my_vims
[ro_task
["target_id"]]
351 params
= task
["params"]
352 params_copy
= deepcopy(params
)
353 net_list
= params_copy
["net_list"]
356 # change task_id into network_id
357 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
358 network_id
= task_depends
[net
["net_id"]]
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net
["net_id"])
366 net
["net_id"] = network_id
368 if params_copy
["image_id"].startswith("TASK-"):
369 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
371 if params_copy
["flavor_id"].startswith("TASK-"):
372 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
374 affinity_group_list
= params_copy
["affinity_group_list"]
375 for affinity_group
in affinity_group_list
:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group
and affinity_group
[
379 ].startswith("TASK-"):
380 affinity_group_id
= task_depends
[
381 affinity_group
["affinity_group_id"]
384 if not affinity_group_id
:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group
["affinity_group_id"])
389 affinity_group
["affinity_group_id"] = affinity_group_id
390 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
391 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
393 # add to created items previous_created_volumes (healing)
394 if task
.get("previous_created_volumes"):
395 for k
, v
in task
["previous_created_volumes"].items():
398 ro_vim_item_update
= {
400 "vim_status": "BUILD",
402 "created_items": created_items
,
405 "interfaces_vim_ids": interfaces
,
407 "interfaces_backup": [],
410 "task={} {} new-vm={} created={}".format(
411 task_id
, ro_task
["target_id"], vim_vm_id
, created
415 return "BUILD", ro_vim_item_update
416 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
417 self
.logger
.debug(traceback
.format_exc())
419 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
421 ro_vim_item_update
= {
422 "vim_status": "VIM_ERROR",
424 "vim_message": str(e
),
427 return "FAILED", ro_vim_item_update
429 def delete(self
, ro_task
, task_index
):
430 task
= ro_task
["tasks"][task_index
]
431 task_id
= task
["task_id"]
432 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
433 ro_vim_item_update_ok
= {
434 "vim_status": "DELETED",
436 "vim_message": "DELETED",
442 "delete_vminstance: vm_vim_id={} created_items={}".format(
443 vm_vim_id
, ro_task
["vim_info"]["created_items"]
446 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
447 target_vim
= self
.my_vims
[ro_task
["target_id"]]
448 target_vim
.delete_vminstance(
450 ro_task
["vim_info"]["created_items"],
451 ro_task
["vim_info"].get("volumes_to_hold", []),
453 except vimconn
.VimConnNotFoundException
:
454 ro_vim_item_update_ok
["vim_message"] = "already deleted"
455 except vimconn
.VimConnException
as e
:
457 "ro_task={} vim={} del-vm={}: {}".format(
458 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
461 ro_vim_item_update
= {
462 "vim_status": "VIM_ERROR",
463 "vim_message": "Error while deleting: {}".format(e
),
466 return "FAILED", ro_vim_item_update
469 "task={} {} del-vm={} {}".format(
471 ro_task
["target_id"],
473 ro_vim_item_update_ok
.get("vim_message", ""),
477 return "DONE", ro_vim_item_update_ok
479 def refresh(self
, ro_task
):
480 """Call VIM to get vm status"""
481 ro_task_id
= ro_task
["_id"]
482 target_vim
= self
.my_vims
[ro_task
["target_id"]]
483 vim_id
= ro_task
["vim_info"]["vim_id"]
488 vm_to_refresh_list
= [vim_id
]
490 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
491 vim_info
= vim_dict
[vim_id
]
493 if vim_info
["status"] == "ACTIVE":
495 elif vim_info
["status"] == "BUILD":
496 task_status
= "BUILD"
498 task_status
= "FAILED"
500 # try to load and parse vim_information
502 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
503 if vim_info_info
.get("name"):
504 vim_info
["name"] = vim_info_info
["name"]
505 except Exception as vim_info_error
:
506 self
.logger
.exception(
507 f
"{vim_info_error} occured while getting the vim_info from yaml"
509 except vimconn
.VimConnException
as e
:
510 # Mark all tasks at VIM_ERROR status
512 "ro_task={} vim={} get-vm={}: {}".format(
513 ro_task_id
, ro_task
["target_id"], vim_id
, e
516 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
517 task_status
= "FAILED"
519 ro_vim_item_update
= {}
521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
523 if vim_info
.get("interfaces"):
524 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
528 for iface
in vim_info
["interfaces"]
529 if vim_iface_id
== iface
["vim_interface_id"]
534 # iface.pop("vim_info", None)
535 vim_interfaces
.append(iface
)
539 for t
in ro_task
["tasks"]
540 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
542 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
543 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
547 mgmt_vdu_iface
= task_create
.get(
548 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
551 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
553 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
554 ro_vim_item_update
["interfaces"] = vim_interfaces
556 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
557 ro_vim_item_update
["vim_status"] = vim_info
["status"]
559 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
560 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
562 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
563 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
564 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
565 elif vim_info
["status"] == "DELETED":
566 ro_vim_item_update
["vim_id"] = None
567 ro_vim_item_update
["vim_message"] = "Deleted externally"
569 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
570 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
572 if ro_vim_item_update
:
574 "ro_task={} {} get-vm={}: status={} {}".format(
576 ro_task
["target_id"],
578 ro_vim_item_update
.get("vim_status"),
579 ro_vim_item_update
.get("vim_message")
580 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
585 return task_status
, ro_vim_item_update
587 def exec(self
, ro_task
, task_index
, task_depends
):
588 task
= ro_task
["tasks"][task_index
]
589 task_id
= task
["task_id"]
590 target_vim
= self
.my_vims
[ro_task
["target_id"]]
591 db_task_update
= {"retries": 0}
592 retries
= task
.get("retries", 0)
595 params
= task
["params"]
596 params_copy
= deepcopy(params
)
597 params_copy
["ro_key"] = self
.db
.decrypt(
598 params_copy
.pop("private_key"),
599 params_copy
.pop("schema_version"),
600 params_copy
.pop("salt"),
602 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
603 target_vim
.inject_user_key(**params_copy
)
605 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
612 ) # params_copy["key"]
613 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
616 self
.logger
.debug(traceback
.format_exc())
617 if retries
< self
.max_retries_inject_ssh_key
:
623 "next_retry": self
.time_retries_inject_ssh_key
,
628 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
630 ro_vim_item_update
= {"vim_message": str(e
)}
632 return "FAILED", ro_vim_item_update
, db_task_update
635 class VimInteractionImage(VimInteractionBase
):
636 def new(self
, ro_task
, task_index
, task_depends
):
637 task
= ro_task
["tasks"][task_index
]
638 task_id
= task
["task_id"]
641 target_vim
= self
.my_vims
[ro_task
["target_id"]]
646 if task
.get("find_params"):
647 vim_images
= target_vim
.get_image_list(**task
["find_params"])
650 raise NsWorkerExceptionNotFound(
651 "Image not found with this criteria: '{}'".format(
655 elif len(vim_images
) > 1:
656 raise NsWorkerException(
657 "More than one image found with this criteria: '{}'".format(
662 vim_image_id
= vim_images
[0]["id"]
664 ro_vim_item_update
= {
665 "vim_id": vim_image_id
,
666 "vim_status": "ACTIVE",
668 "created_items": created_items
,
673 "task={} {} new-image={} created={}".format(
674 task_id
, ro_task
["target_id"], vim_image_id
, created
678 return "DONE", ro_vim_item_update
679 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
681 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
683 ro_vim_item_update
= {
684 "vim_status": "VIM_ERROR",
686 "vim_message": str(e
),
689 return "FAILED", ro_vim_item_update
692 class VimInteractionSharedVolume(VimInteractionBase
):
693 def delete(self
, ro_task
, task_index
):
694 task
= ro_task
["tasks"][task_index
]
695 task_id
= task
["task_id"]
696 shared_volume_vim_id
= ro_task
["vim_info"]["vim_id"]
697 created_items
= ro_task
["vim_info"]["created_items"]
698 ro_vim_item_update_ok
= {
699 "vim_status": "DELETED",
701 "vim_message": "DELETED",
704 if created_items
and created_items
.get(shared_volume_vim_id
).get("keep"):
705 ro_vim_item_update_ok
= {
706 "vim_status": "ACTIVE",
710 return "DONE", ro_vim_item_update_ok
712 if shared_volume_vim_id
:
713 target_vim
= self
.my_vims
[ro_task
["target_id"]]
714 target_vim
.delete_shared_volumes(shared_volume_vim_id
)
715 except vimconn
.VimConnNotFoundException
:
716 ro_vim_item_update_ok
["vim_message"] = "already deleted"
717 except vimconn
.VimConnException
as e
:
719 "ro_task={} vim={} del-shared-volume={}: {}".format(
720 ro_task
["_id"], ro_task
["target_id"], shared_volume_vim_id
, e
723 ro_vim_item_update
= {
724 "vim_status": "VIM_ERROR",
725 "vim_message": "Error while deleting: {}".format(e
),
728 return "FAILED", ro_vim_item_update
731 "task={} {} del-shared-volume={} {}".format(
733 ro_task
["target_id"],
734 shared_volume_vim_id
,
735 ro_vim_item_update_ok
.get("vim_message", ""),
739 return "DONE", ro_vim_item_update_ok
741 def new(self
, ro_task
, task_index
, task_depends
):
742 task
= ro_task
["tasks"][task_index
]
743 task_id
= task
["task_id"]
746 target_vim
= self
.my_vims
[ro_task
["target_id"]]
749 shared_volume_vim_id
= None
750 shared_volume_data
= None
752 if task
.get("params"):
753 shared_volume_data
= task
["params"]
755 if shared_volume_data
:
757 f
"Creating the new shared_volume for {shared_volume_data}\n"
761 shared_volume_vim_id
,
762 ) = target_vim
.new_shared_volumes(shared_volume_data
)
764 created_items
[shared_volume_vim_id
] = {
765 "name": shared_volume_name
,
766 "keep": shared_volume_data
.get("keep"),
769 ro_vim_item_update
= {
770 "vim_id": shared_volume_vim_id
,
771 "vim_status": "ACTIVE",
773 "created_items": created_items
,
778 "task={} {} new-shared-volume={} created={}".format(
779 task_id
, ro_task
["target_id"], shared_volume_vim_id
, created
783 return "DONE", ro_vim_item_update
784 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
786 "task={} vim={} new-shared-volume:"
787 " {}".format(task_id
, ro_task
["target_id"], e
)
789 ro_vim_item_update
= {
790 "vim_status": "VIM_ERROR",
792 "vim_message": str(e
),
795 return "FAILED", ro_vim_item_update
798 class VimInteractionFlavor(VimInteractionBase
):
799 def delete(self
, ro_task
, task_index
):
800 task
= ro_task
["tasks"][task_index
]
801 task_id
= task
["task_id"]
802 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
803 ro_vim_item_update_ok
= {
804 "vim_status": "DELETED",
806 "vim_message": "DELETED",
812 target_vim
= self
.my_vims
[ro_task
["target_id"]]
813 target_vim
.delete_flavor(flavor_vim_id
)
814 except vimconn
.VimConnNotFoundException
:
815 ro_vim_item_update_ok
["vim_message"] = "already deleted"
816 except vimconn
.VimConnException
as e
:
818 "ro_task={} vim={} del-flavor={}: {}".format(
819 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
822 ro_vim_item_update
= {
823 "vim_status": "VIM_ERROR",
824 "vim_message": "Error while deleting: {}".format(e
),
827 return "FAILED", ro_vim_item_update
830 "task={} {} del-flavor={} {}".format(
832 ro_task
["target_id"],
834 ro_vim_item_update_ok
.get("vim_message", ""),
838 return "DONE", ro_vim_item_update_ok
840 def new(self
, ro_task
, task_index
, task_depends
):
841 task
= ro_task
["tasks"][task_index
]
842 task_id
= task
["task_id"]
845 target_vim
= self
.my_vims
[ro_task
["target_id"]]
850 if task
.get("find_params"):
852 flavor_data
= task
["find_params"]["flavor_data"]
853 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
854 except vimconn
.VimConnNotFoundException
as flavor_not_found_msg
:
856 f
"VimConnNotFoundException occured: {flavor_not_found_msg}"
859 if not vim_flavor_id
and task
.get("params"):
861 flavor_data
= task
["params"]["flavor_data"]
862 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
865 ro_vim_item_update
= {
866 "vim_id": vim_flavor_id
,
867 "vim_status": "ACTIVE",
869 "created_items": created_items
,
874 "task={} {} new-flavor={} created={}".format(
875 task_id
, ro_task
["target_id"], vim_flavor_id
, created
879 return "DONE", ro_vim_item_update
880 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
882 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
884 ro_vim_item_update
= {
885 "vim_status": "VIM_ERROR",
887 "vim_message": str(e
),
890 return "FAILED", ro_vim_item_update
893 class VimInteractionAffinityGroup(VimInteractionBase
):
894 def delete(self
, ro_task
, task_index
):
895 task
= ro_task
["tasks"][task_index
]
896 task_id
= task
["task_id"]
897 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
898 ro_vim_item_update_ok
= {
899 "vim_status": "DELETED",
901 "vim_message": "DELETED",
906 if affinity_group_vim_id
:
907 target_vim
= self
.my_vims
[ro_task
["target_id"]]
908 target_vim
.delete_affinity_group(affinity_group_vim_id
)
909 except vimconn
.VimConnNotFoundException
:
910 ro_vim_item_update_ok
["vim_message"] = "already deleted"
911 except vimconn
.VimConnException
as e
:
913 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
914 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
917 ro_vim_item_update
= {
918 "vim_status": "VIM_ERROR",
919 "vim_message": "Error while deleting: {}".format(e
),
922 return "FAILED", ro_vim_item_update
925 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
927 ro_task
["target_id"],
928 affinity_group_vim_id
,
929 ro_vim_item_update_ok
.get("vim_message", ""),
933 return "DONE", ro_vim_item_update_ok
935 def new(self
, ro_task
, task_index
, task_depends
):
936 task
= ro_task
["tasks"][task_index
]
937 task_id
= task
["task_id"]
940 target_vim
= self
.my_vims
[ro_task
["target_id"]]
943 affinity_group_vim_id
= None
944 affinity_group_data
= None
945 param_affinity_group_id
= ""
947 if task
.get("params"):
948 affinity_group_data
= task
["params"].get("affinity_group_data")
950 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
952 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
953 "vim-affinity-group-id"
955 affinity_group_vim_id
= target_vim
.get_affinity_group(
956 param_affinity_group_id
958 except vimconn
.VimConnNotFoundException
:
960 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
961 "could not be found at VIM. Creating a new one.".format(
962 task_id
, ro_task
["target_id"], param_affinity_group_id
966 if not affinity_group_vim_id
and affinity_group_data
:
967 affinity_group_vim_id
= target_vim
.new_affinity_group(
972 ro_vim_item_update
= {
973 "vim_id": affinity_group_vim_id
,
974 "vim_status": "ACTIVE",
976 "created_items": created_items
,
981 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
982 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
986 return "DONE", ro_vim_item_update
987 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
989 "task={} vim={} new-affinity-or-anti-affinity-group:"
990 " {}".format(task_id
, ro_task
["target_id"], e
)
992 ro_vim_item_update
= {
993 "vim_status": "VIM_ERROR",
995 "vim_message": str(e
),
998 return "FAILED", ro_vim_item_update
1001 class VimInteractionUpdateVdu(VimInteractionBase
):
1002 def exec(self
, ro_task
, task_index
, task_depends
):
1003 task
= ro_task
["tasks"][task_index
]
1004 task_id
= task
["task_id"]
1005 db_task_update
= {"retries": 0}
1008 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1012 if task
.get("params"):
1013 vim_vm_id
= task
["params"].get("vim_vm_id")
1014 action
= task
["params"].get("action")
1015 context
= {action
: action
}
1016 target_vim
.action_vminstance(vim_vm_id
, context
)
1018 ro_vim_item_update
= {
1019 "vim_id": vim_vm_id
,
1020 "vim_status": "ACTIVE",
1022 "created_items": created_items
,
1023 "vim_details": None,
1024 "vim_message": None,
1027 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1029 return "DONE", ro_vim_item_update
, db_task_update
1030 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1032 "task={} vim={} VM Migration:"
1033 " {}".format(task_id
, ro_task
["target_id"], e
)
1035 ro_vim_item_update
= {
1036 "vim_status": "VIM_ERROR",
1038 "vim_message": str(e
),
1041 return "FAILED", ro_vim_item_update
, db_task_update
1044 class VimInteractionSdnNet(VimInteractionBase
):
1046 def _match_pci(port_pci
, mapping
):
1048 Check if port_pci matches with mapping.
1049 The mapping can have brackets to indicate that several chars are accepted. e.g
1050 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1051 :param port_pci: text
1052 :param mapping: text, can contain brackets to indicate several chars are available
1053 :return: True if matches, False otherwise
1055 if not port_pci
or not mapping
:
1057 if port_pci
== mapping
:
1063 bracket_start
= mapping
.find("[", mapping_index
)
1065 if bracket_start
== -1:
1068 bracket_end
= mapping
.find("]", bracket_start
)
1069 if bracket_end
== -1:
1072 length
= bracket_start
- mapping_index
1075 and port_pci
[pci_index
: pci_index
+ length
]
1076 != mapping
[mapping_index
:bracket_start
]
1081 port_pci
[pci_index
+ length
]
1082 not in mapping
[bracket_start
+ 1 : bracket_end
]
1086 pci_index
+= length
+ 1
1087 mapping_index
= bracket_end
+ 1
1089 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
1094 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
1096 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1097 :param vim_account_id:
1102 for vld
in vlds_to_connect
:
1103 table
, _
, db_id
= vld
.partition(":")
1104 db_id
, _
, vld
= db_id
.partition(":")
1105 _
, _
, vld_id
= vld
.partition(".")
1107 if table
== "vnfrs":
1108 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
1109 iface_key
= "vnf-vld-id"
1110 else: # table == "nsrs"
1111 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1112 iface_key
= "ns-vld-id"
1114 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1116 for db_vnfr
in db_vnfrs
:
1117 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1118 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1119 if interface
.get(iface_key
) == vld_id
and interface
.get(
1121 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1123 interface_
= interface
.copy()
1124 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1125 db_vnfr
["_id"], vdu_index
, iface_index
1128 if vdur
.get("status") == "ERROR":
1129 interface_
["status"] = "ERROR"
1131 interfaces
.append(interface_
)
1135 def refresh(self
, ro_task
):
1136 # look for task create
1137 task_create_index
, _
= next(
1139 for i_t
in enumerate(ro_task
["tasks"])
1141 and i_t
[1]["action"] == "CREATE"
1142 and i_t
[1]["status"] != "FINISHED"
1145 return self
.new(ro_task
, task_create_index
, None)
1147 def new(self
, ro_task
, task_index
, task_depends
):
1148 task
= ro_task
["tasks"][task_index
]
1149 task_id
= task
["task_id"]
1150 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1152 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1154 created_items
= ro_task
["vim_info"].get("created_items")
1155 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1156 new_connected_ports
= []
1157 last_update
= ro_task
["vim_info"].get("last_update", 0)
1158 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1160 created
= ro_task
["vim_info"].get("created", False)
1165 params
= task
["params"]
1166 vlds_to_connect
= params
.get("vlds", [])
1167 associated_vim
= params
.get("target_vim")
1168 # external additional ports
1169 additional_ports
= params
.get("sdn-ports") or ()
1170 _
, _
, vim_account_id
= (
1172 if associated_vim
is None
1173 else associated_vim
.partition(":")
1177 # get associated VIM
1178 if associated_vim
not in self
.db_vims
:
1179 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1180 "vim_accounts", {"_id": vim_account_id
}
1183 db_vim
= self
.db_vims
[associated_vim
]
1185 # look for ports to connect
1186 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1190 pending_ports
= error_ports
= 0
1192 sdn_need_update
= False
1195 vlan_used
= port
.get("vlan") or vlan_used
1197 # TODO. Do not connect if already done
1198 if not port
.get("compute_node") or not port
.get("pci"):
1199 if port
.get("status") == "ERROR":
1206 compute_node_mappings
= next(
1209 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1210 if c
and c
["compute_node"] == port
["compute_node"]
1215 if compute_node_mappings
:
1216 # process port_mapping pci of type 0000:af:1[01].[1357]
1220 for p
in compute_node_mappings
["ports"]
1221 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1227 if not db_vim
["config"].get("mapping_not_needed"):
1229 "Port mapping not found for compute_node={} pci={}".format(
1230 port
["compute_node"], port
["pci"]
1237 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1239 "service_endpoint_id": pmap
.get("service_endpoint_id")
1240 or service_endpoint_id
,
1241 "service_endpoint_encapsulation_type": "dot1q"
1242 if port
["type"] == "SR-IOV"
1244 "service_endpoint_encapsulation_info": {
1245 "vlan": port
.get("vlan"),
1246 "mac": port
.get("mac-address"),
1247 "device_id": pmap
.get("device_id") or port
["compute_node"],
1248 "device_interface_id": pmap
.get("device_interface_id")
1250 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1251 "switch_port": pmap
.get("switch_port"),
1252 "service_mapping_info": pmap
.get("service_mapping_info"),
1257 # if port["modified_at"] > last_update:
1258 # sdn_need_update = True
1259 new_connected_ports
.append(port
["id"]) # TODO
1260 sdn_ports
.append(new_port
)
1264 "{} interfaces have not been created as VDU is on ERROR status".format(
1269 # connect external ports
1270 for index
, additional_port
in enumerate(additional_ports
):
1271 additional_port_id
= additional_port
.get(
1272 "service_endpoint_id"
1273 ) or "external-{}".format(index
)
1276 "service_endpoint_id": additional_port_id
,
1277 "service_endpoint_encapsulation_type": additional_port
.get(
1278 "service_endpoint_encapsulation_type", "dot1q"
1280 "service_endpoint_encapsulation_info": {
1281 "vlan": additional_port
.get("vlan") or vlan_used
,
1282 "mac": additional_port
.get("mac_address"),
1283 "device_id": additional_port
.get("device_id"),
1284 "device_interface_id": additional_port
.get(
1285 "device_interface_id"
1287 "switch_dpid": additional_port
.get("switch_dpid")
1288 or additional_port
.get("switch_id"),
1289 "switch_port": additional_port
.get("switch_port"),
1290 "service_mapping_info": additional_port
.get(
1291 "service_mapping_info"
1296 new_connected_ports
.append(additional_port_id
)
1299 # if there are more ports to connect or they have been modified, call create/update
1301 sdn_status
= "ERROR"
1302 sdn_info
= "; ".join(error_list
)
1303 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1304 last_update
= time
.time()
1307 if len(sdn_ports
) < 2:
1308 sdn_status
= "ACTIVE"
1310 if not pending_ports
:
1312 "task={} {} new-sdn-net done, less than 2 ports".format(
1313 task_id
, ro_task
["target_id"]
1317 net_type
= params
.get("type") or "ELAN"
1321 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1324 "task={} {} new-sdn-net={} created={}".format(
1325 task_id
, ro_task
["target_id"], sdn_net_id
, created
1329 created_items
= target_vim
.edit_connectivity_service(
1330 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1334 "task={} {} update-sdn-net={} created={}".format(
1335 task_id
, ro_task
["target_id"], sdn_net_id
, created
1339 connected_ports
= new_connected_ports
1341 wim_status_dict
= target_vim
.get_connectivity_service_status(
1342 sdn_net_id
, conn_info
=created_items
1344 sdn_status
= wim_status_dict
["sdn_status"]
1346 if wim_status_dict
.get("sdn_info"):
1347 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1349 if wim_status_dict
.get("error_msg"):
1350 sdn_info
= wim_status_dict
.get("error_msg") or ""
1353 if sdn_status
!= "ERROR":
1354 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1355 len(ports
) - pending_ports
, len(ports
)
1358 if sdn_status
== "ACTIVE":
1359 sdn_status
= "BUILD"
1361 ro_vim_item_update
= {
1362 "vim_id": sdn_net_id
,
1363 "vim_status": sdn_status
,
1365 "created_items": created_items
,
1366 "connected_ports": connected_ports
,
1367 "vim_details": sdn_info
,
1368 "vim_message": None,
1369 "last_update": last_update
,
1372 return sdn_status
, ro_vim_item_update
1373 except Exception as e
:
1375 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1376 exc_info
=not isinstance(
1377 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1380 ro_vim_item_update
= {
1381 "vim_status": "VIM_ERROR",
1383 "vim_message": str(e
),
1386 return "FAILED", ro_vim_item_update
1388 def delete(self
, ro_task
, task_index
):
1389 task
= ro_task
["tasks"][task_index
]
1390 task_id
= task
["task_id"]
1391 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1392 ro_vim_item_update_ok
= {
1393 "vim_status": "DELETED",
1395 "vim_message": "DELETED",
1401 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1402 target_vim
.delete_connectivity_service(
1403 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1406 except Exception as e
:
1408 isinstance(e
, sdnconn
.SdnConnectorError
)
1409 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1411 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1414 "ro_task={} vim={} del-sdn-net={}: {}".format(
1415 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1417 exc_info
=not isinstance(
1418 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1421 ro_vim_item_update
= {
1422 "vim_status": "VIM_ERROR",
1423 "vim_message": "Error while deleting: {}".format(e
),
1426 return "FAILED", ro_vim_item_update
1429 "task={} {} del-sdn-net={} {}".format(
1431 ro_task
["target_id"],
1433 ro_vim_item_update_ok
.get("vim_message", ""),
1437 return "DONE", ro_vim_item_update_ok
1440 class VimInteractionMigration(VimInteractionBase
):
1441 def exec(self
, ro_task
, task_index
, task_depends
):
1442 task
= ro_task
["tasks"][task_index
]
1443 task_id
= task
["task_id"]
1444 db_task_update
= {"retries": 0}
1445 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1449 refreshed_vim_info
= {}
1453 if task
.get("params"):
1454 vim_vm_id
= task
["params"].get("vim_vm_id")
1455 migrate_host
= task
["params"].get("migrate_host")
1456 _
, migrated_compute_node
= target_vim
.migrate_instance(
1457 vim_vm_id
, migrate_host
1460 if migrated_compute_node
:
1461 # When VM is migrated, vdu["vim_info"] needs to be updated
1462 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1463 ro_task
["target_id"]
1466 # Refresh VM to get new vim_info
1467 vm_to_refresh_list
= [vim_vm_id
]
1468 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1469 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1471 if refreshed_vim_info
.get("interfaces"):
1472 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1476 for iface
in refreshed_vim_info
["interfaces"]
1477 if old_iface
["vim_interface_id"]
1478 == iface
["vim_interface_id"]
1482 vim_interfaces
.append(iface
)
1484 ro_vim_item_update
= {
1485 "vim_id": vim_vm_id
,
1486 "vim_status": "ACTIVE",
1488 "created_items": created_items
,
1489 "vim_details": None,
1490 "vim_message": None,
1493 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1497 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1500 ro_vim_item_update
["interfaces"] = vim_interfaces
1503 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1506 return "DONE", ro_vim_item_update
, db_task_update
1508 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1510 "task={} vim={} VM Migration:"
1511 " {}".format(task_id
, ro_task
["target_id"], e
)
1513 ro_vim_item_update
= {
1514 "vim_status": "VIM_ERROR",
1516 "vim_message": str(e
),
1519 return "FAILED", ro_vim_item_update
, db_task_update
1522 class VimInteractionResize(VimInteractionBase
):
1523 def exec(self
, ro_task
, task_index
, task_depends
):
1524 task
= ro_task
["tasks"][task_index
]
1525 task_id
= task
["task_id"]
1526 db_task_update
= {"retries": 0}
1528 target_flavor_uuid
= None
1530 refreshed_vim_info
= {}
1531 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1535 if task
.get("params"):
1536 vim_vm_id
= task
["params"].get("vim_vm_id")
1537 flavor_dict
= task
["params"].get("flavor_dict")
1538 self
.logger
.info("flavor_dict %s", flavor_dict
)
1541 target_flavor_uuid
= target_vim
.get_flavor_id_from_data(flavor_dict
)
1542 except Exception as e
:
1543 self
.logger
.info("Cannot find any flavor matching %s.", str(e
))
1545 target_flavor_uuid
= target_vim
.new_flavor(flavor_dict
)
1546 except Exception as e
:
1547 self
.logger
.error("Error creating flavor at VIM %s.", str(e
))
1549 if target_flavor_uuid
is not None:
1550 resized_status
= target_vim
.resize_instance(
1551 vim_vm_id
, target_flavor_uuid
1555 # Refresh VM to get new vim_info
1556 vm_to_refresh_list
= [vim_vm_id
]
1557 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1558 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1560 ro_vim_item_update
= {
1561 "vim_id": vim_vm_id
,
1562 "vim_status": "ACTIVE",
1564 "created_items": created_items
,
1565 "vim_details": None,
1566 "vim_message": None,
1569 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1573 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1576 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1578 return "DONE", ro_vim_item_update
, db_task_update
1579 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1581 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1583 ro_vim_item_update
= {
1584 "vim_status": "VIM_ERROR",
1586 "vim_message": str(e
),
1589 return "FAILED", ro_vim_item_update
, db_task_update
1592 class ConfigValidate
:
1593 def __init__(self
, config
: Dict
):
1598 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1600 self
.conf
["period"]["refresh_active"] >= 60
1601 or self
.conf
["period"]["refresh_active"] == -1
1603 return self
.conf
["period"]["refresh_active"]
1609 return self
.conf
["period"]["refresh_build"]
1613 return self
.conf
["period"]["refresh_image"]
1617 return self
.conf
["period"]["refresh_error"]
1620 def queue_size(self
):
1621 return self
.conf
["period"]["queue_size"]
1624 class NsWorker(threading
.Thread
):
1625 def __init__(self
, worker_index
, config
, plugins
, db
):
1627 :param worker_index: thread index
1628 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1629 :param plugins: global shared dict with the loaded plugins
1630 :param db: database class instance to use
1632 threading
.Thread
.__init
__(self
)
1633 self
.config
= config
1634 self
.plugins
= plugins
1635 self
.plugin_name
= "unknown"
1636 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1637 self
.worker_index
= worker_index
1638 # refresh periods for created items
1639 self
.refresh_config
= ConfigValidate(config
)
1640 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1641 # targetvim: vimplugin class
1643 # targetvim: vim information from database
1646 self
.vim_targets
= []
1647 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1650 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1651 "shared-volumes": VimInteractionSharedVolume(
1652 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1654 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1655 "image": VimInteractionImage(
1656 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1658 "flavor": VimInteractionFlavor(
1659 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1661 "sdn_net": VimInteractionSdnNet(
1662 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1664 "update": VimInteractionUpdateVdu(
1665 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1667 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1668 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1670 "migrate": VimInteractionMigration(
1671 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1673 "verticalscale": VimInteractionResize(
1674 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1677 self
.time_last_task_processed
= None
1678 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1679 self
.tasks_to_delete
= []
1680 # it is idle when there are not vim_targets associated
1682 self
.task_locked_time
= config
["global"]["task_locked_time"]
1684 def insert_task(self
, task
):
1686 self
.task_queue
.put(task
, False)
1689 raise NsWorkerException("timeout inserting a task")
1691 def terminate(self
):
1692 self
.insert_task("exit")
1694 def del_task(self
, task
):
1695 with self
.task_lock
:
1696 if task
["status"] == "SCHEDULED":
1697 task
["status"] = "SUPERSEDED"
1699 else: # task["status"] == "processing"
1700 self
.task_lock
.release()
1703 def _process_vim_config(self
, target_id
: str, db_vim
: dict) -> None:
1705 Process vim config, creating vim configuration files as ca_cert
1706 :param target_id: vim/sdn/wim + id
1707 :param db_vim: Vim dictionary obtained from database
1708 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1710 if not db_vim
.get("config"):
1714 work_dir
= "/app/osm_ro/certs"
1717 if db_vim
["config"].get("ca_cert_content"):
1718 file_name
= f
"{work_dir}/{target_id}:{self.worker_index}"
1720 if not path
.isdir(file_name
):
1723 file_name
= file_name
+ "/ca_cert"
1725 with
open(file_name
, "w") as f
:
1726 f
.write(db_vim
["config"]["ca_cert_content"])
1727 del db_vim
["config"]["ca_cert_content"]
1728 db_vim
["config"]["ca_cert"] = file_name
1729 except Exception as e
:
1730 raise NsWorkerException(
1731 "Error writing to file '{}': {}".format(file_name
, e
)
1734 def _load_plugin(self
, name
, type="vim"):
1735 # type can be vim or sdn
1736 if "rovim_dummy" not in self
.plugins
:
1737 self
.plugins
["rovim_dummy"] = VimDummyConnector
1739 if "rosdn_dummy" not in self
.plugins
:
1740 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1742 if name
in self
.plugins
:
1743 return self
.plugins
[name
]
1746 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1747 self
.plugins
[name
] = ep
.load()
1748 except Exception as e
:
1749 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1751 if name
and name
not in self
.plugins
:
1752 raise NsWorkerException(
1753 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1756 return self
.plugins
[name
]
1758 def _unload_vim(self
, target_id
):
1760 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1761 :param target_id: Contains type:_id; where type can be 'vim', ...
1765 self
.db_vims
.pop(target_id
, None)
1766 self
.my_vims
.pop(target_id
, None)
1768 if target_id
in self
.vim_targets
:
1769 self
.vim_targets
.remove(target_id
)
1771 self
.logger
.info("Unloaded {}".format(target_id
))
1772 except Exception as e
:
1773 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1775 def _check_vim(self
, target_id
):
1777 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1778 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1781 target
, _
, _id
= target_id
.partition(":")
1787 loaded
= target_id
in self
.vim_targets
1798 step
= "Getting {} from db".format(target_id
)
1799 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1801 for op_index
, operation
in enumerate(
1802 db_vim
["_admin"].get("operations", ())
1804 if operation
["operationState"] != "PROCESSING":
1807 locked_at
= operation
.get("locked_at")
1809 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1810 # some other thread is doing this operation
1814 op_text
= "_admin.operations.{}.".format(op_index
)
1816 if not self
.db
.set_one(
1820 op_text
+ "operationState": "PROCESSING",
1821 op_text
+ "locked_at": locked_at
,
1824 op_text
+ "locked_at": now
,
1825 "admin.current_operation": op_index
,
1827 fail_on_empty
=False,
1831 unset_dict
[op_text
+ "locked_at"] = None
1832 unset_dict
["current_operation"] = None
1833 step
= "Loading " + target_id
1834 error_text
= self
._load
_vim
(target_id
)
1837 step
= "Checking connectivity"
1840 self
.my_vims
[target_id
].check_vim_connectivity()
1842 self
.my_vims
[target_id
].check_credentials()
1844 update_dict
["_admin.operationalState"] = "ENABLED"
1845 update_dict
["_admin.detailed-status"] = ""
1846 unset_dict
[op_text
+ "detailed-status"] = None
1847 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1851 except Exception as e
:
1852 error_text
= "{}: {}".format(step
, e
)
1853 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1856 if update_dict
or unset_dict
:
1858 update_dict
[op_text
+ "operationState"] = "FAILED"
1859 update_dict
[op_text
+ "detailed-status"] = error_text
1860 unset_dict
.pop(op_text
+ "detailed-status", None)
1861 update_dict
["_admin.operationalState"] = "ERROR"
1862 update_dict
["_admin.detailed-status"] = error_text
1865 update_dict
[op_text
+ "statusEnteredTime"] = now
1869 q_filter
={"_id": _id
},
1870 update_dict
=update_dict
,
1872 fail_on_empty
=False,
1876 self
._unload
_vim
(target_id
)
1878 def _reload_vim(self
, target_id
):
1879 if target_id
in self
.vim_targets
:
1880 self
._load
_vim
(target_id
)
1882 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1883 # just remove it to force load again next time it is needed
1884 self
.db_vims
.pop(target_id
, None)
1886 def _load_vim(self
, target_id
):
1888 Load or reload a vim_account, sdn_controller or wim_account.
1889 Read content from database, load the plugin if not loaded.
1890 In case of error loading the plugin, it loads a failing VIM_connector
1891 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1892 :param target_id: Contains type:_id; where type can be 'vim', ...
1893 :return: None if ok, descriptive text if error
1895 target
, _
, _id
= target_id
.partition(":")
1905 step
= "Getting {}={} from db".format(target
, _id
)
1908 # TODO process for wim, sdnc, ...
1909 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1911 # if deep_get(vim, "config", "sdn-controller"):
1912 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1913 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1915 step
= "Decrypting password"
1916 schema_version
= vim
.get("schema_version")
1917 self
.db
.encrypt_decrypt_fields(
1920 fields
=("password", "secret"),
1921 schema_version
=schema_version
,
1924 self
._process
_vim
_config
(target_id
, vim
)
1927 plugin_name
= "rovim_" + vim
["vim_type"]
1928 step
= "Loading plugin '{}'".format(plugin_name
)
1929 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1930 step
= "Loading {}'".format(target_id
)
1931 self
.my_vims
[target_id
] = vim_module_conn(
1934 tenant_id
=vim
.get("vim_tenant_id"),
1935 tenant_name
=vim
.get("vim_tenant_name"),
1938 user
=vim
["vim_user"],
1939 passwd
=vim
["vim_password"],
1940 config
=vim
.get("config") or {},
1944 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
1945 step
= "Loading plugin '{}'".format(plugin_name
)
1946 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1947 step
= "Loading {}'".format(target_id
)
1949 wim_config
= wim
.pop("config", {}) or {}
1950 wim
["uuid"] = wim
["_id"]
1951 if "url" in wim
and "wim_url" not in wim
:
1952 wim
["wim_url"] = wim
["url"]
1953 elif "url" not in wim
and "wim_url" in wim
:
1954 wim
["url"] = wim
["wim_url"]
1957 wim_config
["dpid"] = wim
.pop("dpid")
1959 if wim
.get("switch_id"):
1960 wim_config
["switch_id"] = wim
.pop("switch_id")
1962 # wim, wim_account, config
1963 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1964 self
.db_vims
[target_id
] = vim
1965 self
.error_status
= None
1968 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1970 except Exception as e
:
1972 "Cannot load {} plugin={}: {} {}".format(
1973 target_id
, plugin_name
, step
, e
1977 self
.db_vims
[target_id
] = vim
or {}
1978 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1979 error_status
= "{} Error: {}".format(step
, e
)
1983 if target_id
not in self
.vim_targets
:
1984 self
.vim_targets
.append(target_id
)
1986 def _get_db_task(self
):
1988 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1993 if not self
.time_last_task_processed
:
1994 self
.time_last_task_processed
= now
1999 # Log RO tasks only when loglevel is DEBUG
2000 if self.logger.getEffectiveLevel() == logging.DEBUG:
2007 + str(self.task_locked_time)
2009 + "time_last_task_processed="
2010 + str(self.time_last_task_processed)
2016 locked
= self
.db
.set_one(
2019 "target_id": self
.vim_targets
,
2020 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2021 "locked_at.lt": now
- self
.task_locked_time
,
2022 "to_check_at.lt": self
.time_last_task_processed
,
2023 "to_check_at.gt": -1,
2025 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
2026 fail_on_empty
=False,
2031 ro_task
= self
.db
.get_one(
2034 "target_id": self
.vim_targets
,
2035 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2041 if self
.time_last_task_processed
== now
:
2042 self
.time_last_task_processed
= None
2045 self
.time_last_task_processed
= now
2046 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2048 except DbException
as e
:
2049 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
2050 except Exception as e
:
2051 self
.logger
.critical(
2052 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
2057 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2059 Determine if this task need to be done or superseded
2062 my_task
= ro_task
["tasks"][task_index
]
2063 task_id
= my_task
["task_id"]
2064 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
2065 "created_items", False
2068 self
.logger
.debug("Needed delete: {}".format(needed_delete
))
2069 if my_task
["status"] == "FAILED":
2070 return None, None # TODO need to be retry??
2073 for index
, task
in enumerate(ro_task
["tasks"]):
2074 if index
== task_index
or not task
:
2078 my_task
["target_record"] == task
["target_record"]
2079 and task
["action"] == "CREATE"
2082 db_update
["tasks.{}.status".format(index
)] = task
[
2085 elif task
["action"] == "CREATE" and task
["status"] not in (
2089 needed_delete
= False
2093 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
2095 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
2097 return "SUPERSEDED", None
2098 except Exception as e
:
2099 if not isinstance(e
, NsWorkerException
):
2100 self
.logger
.critical(
2101 "Unexpected exception at _delete_task task={}: {}".format(
2107 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2109 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2111 Determine if this task need to create something at VIM
2114 my_task
= ro_task
["tasks"][task_index
]
2115 task_id
= my_task
["task_id"]
2117 if my_task
["status"] == "FAILED":
2118 return None, None # TODO need to be retry??
2119 elif my_task
["status"] == "SCHEDULED":
2120 # check if already created by another task
2121 for index
, task
in enumerate(ro_task
["tasks"]):
2122 if index
== task_index
or not task
:
2125 if task
["action"] == "CREATE" and task
["status"] not in (
2130 return task
["status"], "COPY_VIM_INFO"
2133 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2134 ro_task
, task_index
, task_depends
2136 # TODO update other CREATE tasks
2137 except Exception as e
:
2138 if not isinstance(e
, NsWorkerException
):
2140 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2143 task_status
= "FAILED"
2144 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2145 # TODO update ro_vim_item_update
2147 return task_status
, ro_vim_item_update
2151 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2153 Look for dependency task
2154 :param task_id: Can be one of
2155 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2156 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2157 3. task.task_id: "<action_id>:number"
2160 :return: database ro_task plus index of task
2163 task_id
.startswith("vim:")
2164 or task_id
.startswith("sdn:")
2165 or task_id
.startswith("wim:")
2167 target_id
, _
, task_id
= task_id
.partition(" ")
2169 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2170 ro_task_dependency
= self
.db
.get_one(
2172 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2173 fail_on_empty
=False,
2176 if ro_task_dependency
:
2177 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2178 if task
["target_record_id"] == task_id
:
2179 return ro_task_dependency
, task_index
2183 for task_index
, task
in enumerate(ro_task
["tasks"]):
2184 if task
and task
["task_id"] == task_id
:
2185 return ro_task
, task_index
2187 ro_task_dependency
= self
.db
.get_one(
2190 "tasks.ANYINDEX.task_id": task_id
,
2191 "tasks.ANYINDEX.target_record.ne": None,
2193 fail_on_empty
=False,
2196 self
.logger
.debug("ro_task_dependency={}".format(ro_task_dependency
))
2197 if ro_task_dependency
:
2198 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2199 if task
["task_id"] == task_id
:
2200 return ro_task_dependency
, task_index
2201 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2203 def update_vm_refresh(self
, ro_task
):
2204 """Enables the VM status updates if self.refresh_config.active parameter
2205 is not -1 and then updates the DB accordingly
2209 self
.logger
.debug("Checking if VM status update config")
2210 next_refresh
= time
.time()
2211 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2213 if next_refresh
!= -1:
2214 db_ro_task_update
= {}
2216 next_check_at
= now
+ (24 * 60 * 60)
2217 next_check_at
= min(next_check_at
, next_refresh
)
2218 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2219 db_ro_task_update
["to_check_at"] = next_check_at
2222 "Finding tasks which to be updated to enable VM status updates"
2224 refresh_tasks
= self
.db
.get_list(
2227 "tasks.status": "DONE",
2228 "to_check_at.lt": 0,
2231 self
.logger
.debug("Updating tasks to change the to_check_at status")
2232 for task
in refresh_tasks
:
2239 update_dict
=db_ro_task_update
,
2243 except Exception as e
:
2244 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2246 def _get_next_refresh(self
, ro_task
: dict, next_refresh
: float):
2247 """Decide the next_refresh according to vim type and refresh config period.
2249 ro_task (dict): ro_task details
2250 next_refresh (float): next refresh time as epoch format
2253 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2255 target_vim
= ro_task
["target_id"]
2256 vim_type
= self
.db_vims
[target_vim
]["vim_type"]
2257 if self
.refresh_config
.active
== -1 or vim_type
== "openstack":
2260 next_refresh
+= self
.refresh_config
.active
2263 def _process_pending_tasks(self
, ro_task
):
2264 ro_task_id
= ro_task
["_id"]
2267 next_check_at
= now
+ (24 * 60 * 60)
2268 db_ro_task_update
= {}
2270 def _update_refresh(new_status
):
2271 # compute next_refresh
2273 nonlocal next_check_at
2274 nonlocal db_ro_task_update
2277 next_refresh
= time
.time()
2279 if task
["item"] in ("image", "flavor"):
2280 next_refresh
+= self
.refresh_config
.image
2281 elif new_status
== "BUILD":
2282 next_refresh
+= self
.refresh_config
.build
2283 elif new_status
== "DONE":
2284 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2286 next_refresh
+= self
.refresh_config
.error
2288 next_check_at
= min(next_check_at
, next_refresh
)
2289 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2290 ro_task
["vim_info"]["refresh_at"] = next_refresh
2294 # Log RO tasks only when loglevel is DEBUG
2295 if self.logger.getEffectiveLevel() == logging.DEBUG:
2296 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2298 # Check if vim status refresh is enabled again
2299 self
.update_vm_refresh(ro_task
)
2300 # 0: get task_status_create
2302 task_status_create
= None
2306 for t
in ro_task
["tasks"]
2308 and t
["action"] == "CREATE"
2309 and t
["status"] in ("BUILD", "DONE")
2315 task_status_create
= task_create
["status"]
2317 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2318 for task_action
in ("DELETE", "CREATE", "EXEC"):
2319 db_vim_update
= None
2322 for task_index
, task
in enumerate(ro_task
["tasks"]):
2324 continue # task deleted
2327 target_update
= None
2331 task_action
in ("DELETE", "EXEC")
2332 and task
["status"] not in ("SCHEDULED", "BUILD")
2334 or task
["action"] != task_action
2336 task_action
== "CREATE"
2337 and task
["status"] in ("FINISHED", "SUPERSEDED")
2342 task_path
= "tasks.{}.status".format(task_index
)
2344 db_vim_info_update
= None
2345 dependency_ro_task
= {}
2347 if task
["status"] == "SCHEDULED":
2348 # check if tasks that this depends on have been completed
2349 dependency_not_completed
= False
2351 for dependency_task_id
in task
.get("depends_on") or ():
2354 dependency_task_index
,
2355 ) = self
._get
_dependency
(
2356 dependency_task_id
, target_id
=ro_task
["target_id"]
2358 dependency_task
= dependency_ro_task
["tasks"][
2359 dependency_task_index
2362 "dependency_ro_task={} dependency_task_index={}".format(
2363 dependency_ro_task
, dependency_task_index
2367 if dependency_task
["status"] == "SCHEDULED":
2368 dependency_not_completed
= True
2369 next_check_at
= min(
2370 next_check_at
, dependency_ro_task
["to_check_at"]
2372 # must allow dependent task to be processed first
2373 # to do this set time after last_task_processed
2374 next_check_at
= max(
2375 self
.time_last_task_processed
, next_check_at
2378 elif dependency_task
["status"] == "FAILED":
2379 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2382 dependency_task
["action"],
2383 dependency_task
["item"],
2385 dependency_ro_task
["vim_info"].get(
2390 "task={} {}".format(task
["task_id"], error_text
)
2392 raise NsWorkerException(error_text
)
2394 task_depends
[dependency_task_id
] = dependency_ro_task
[
2398 "TASK-{}".format(dependency_task_id
)
2399 ] = dependency_ro_task
["vim_info"]["vim_id"]
2401 if dependency_not_completed
:
2402 self
.logger
.warning(
2403 "DEPENDENCY NOT COMPLETED {}".format(
2404 dependency_ro_task
["vim_info"]["vim_id"]
2407 # TODO set at vim_info.vim_details that it is waiting
2410 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2411 # the task of renew this locking. It will update database locket_at periodically
2413 lock_object
= LockRenew
.add_lock_object(
2414 "ro_tasks", ro_task
, self
2416 if task
["action"] == "DELETE":
2420 ) = self
._delete
_task
(
2421 ro_task
, task_index
, task_depends
, db_ro_task_update
2424 "FINISHED" if new_status
== "DONE" else new_status
2426 # ^with FINISHED instead of DONE it will not be refreshing
2428 if new_status
in ("FINISHED", "SUPERSEDED"):
2429 target_update
= "DELETE"
2430 elif task
["action"] == "EXEC":
2435 ) = self
.item2class
[task
["item"]].exec(
2436 ro_task
, task_index
, task_depends
2439 "FINISHED" if new_status
== "DONE" else new_status
2441 # ^with FINISHED instead of DONE it will not be refreshing
2444 # load into database the modified db_task_update "retries" and "next_retry"
2445 if db_task_update
.get("retries"):
2447 "tasks.{}.retries".format(task_index
)
2448 ] = db_task_update
["retries"]
2450 next_check_at
= time
.time() + db_task_update
.get(
2453 target_update
= None
2454 elif task
["action"] == "CREATE":
2455 if task
["status"] == "SCHEDULED":
2456 if task_status_create
:
2457 new_status
= task_status_create
2458 target_update
= "COPY_VIM_INFO"
2460 new_status
, db_vim_info_update
= self
.item2class
[
2462 ].new(ro_task
, task_index
, task_depends
)
2463 _update_refresh(new_status
)
2465 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2466 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2470 ) = self
.item2class
[
2473 _update_refresh(new_status
)
2475 # The refresh is updated to avoid set the value of "refresh_at" to
2476 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2477 # because it can happen that in this case the task is never processed
2478 _update_refresh(task
["status"])
2480 except Exception as e
:
2481 new_status
= "FAILED"
2482 db_vim_info_update
= {
2483 "vim_status": "VIM_ERROR",
2484 "vim_message": str(e
),
2488 e
, (NsWorkerException
, vimconn
.VimConnException
)
2491 "Unexpected exception at _delete_task task={}: {}".format(
2498 if db_vim_info_update
:
2499 db_vim_update
= db_vim_info_update
.copy()
2500 db_ro_task_update
.update(
2503 for k
, v
in db_vim_info_update
.items()
2506 ro_task
["vim_info"].update(db_vim_info_update
)
2509 if task_action
== "CREATE":
2510 task_status_create
= new_status
2511 db_ro_task_update
[task_path
] = new_status
2513 if target_update
or db_vim_update
:
2514 if target_update
== "DELETE":
2515 self
._update
_target
(task
, None)
2516 elif target_update
== "COPY_VIM_INFO":
2517 self
._update
_target
(task
, ro_task
["vim_info"])
2519 self
._update
_target
(task
, db_vim_update
)
2521 except Exception as e
:
2523 isinstance(e
, DbException
)
2524 and e
.http_code
== HTTPStatus
.NOT_FOUND
2526 # if the vnfrs or nsrs has been removed from database, this task must be removed
2528 "marking to delete task={}".format(task
["task_id"])
2530 self
.tasks_to_delete
.append(task
)
2533 "Unexpected exception at _update_target task={}: {}".format(
2539 locked_at
= ro_task
["locked_at"]
2543 lock_object
["locked_at"],
2544 lock_object
["locked_at"] + self
.task_locked_time
,
2546 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2547 # contain exactly locked_at + self.task_locked_time
2548 LockRenew
.remove_lock_object(lock_object
)
2551 "_id": ro_task
["_id"],
2552 "to_check_at": ro_task
["to_check_at"],
2553 "locked_at": locked_at
,
2555 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2556 # outside this task (by ro_nbi) do not update it
2557 db_ro_task_update
["locked_by"] = None
2558 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2559 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2560 db_ro_task_update
["modified_at"] = now
2561 db_ro_task_update
["to_check_at"] = next_check_at
2564 # Log RO tasks only when loglevel is DEBUG
2565 if self.logger.getEffectiveLevel() == logging.DEBUG:
2566 db_ro_task_update_log = db_ro_task_update.copy()
2567 db_ro_task_update_log["_id"] = q_filter["_id"]
2568 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2571 if not self
.db
.set_one(
2573 update_dict
=db_ro_task_update
,
2575 fail_on_empty
=False,
2577 del db_ro_task_update
["to_check_at"]
2578 del q_filter
["to_check_at"]
2580 # Log RO tasks only when loglevel is DEBUG
2581 if self.logger.getEffectiveLevel() == logging.DEBUG:
2584 db_ro_task_update_log,
2587 "SET_TASK " + str(q_filter),
2593 update_dict
=db_ro_task_update
,
2596 except DbException
as e
:
2598 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2600 except Exception as e
:
2602 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2605 def _update_target(self
, task
, ro_vim_item_update
):
2606 table
, _
, temp
= task
["target_record"].partition(":")
2607 _id
, _
, path_vim_status
= temp
.partition(":")
2608 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2609 path_item
= path_item
[: path_item
.rfind(".")]
2610 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2611 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2613 if ro_vim_item_update
:
2615 path_vim_status
+ "." + k
: v
2616 for k
, v
in ro_vim_item_update
.items()
2625 "interfaces_backup",
2629 if path_vim_status
.startswith("vdur."):
2630 # for backward compatibility, add vdur.name apart from vdur.vim_name
2631 if ro_vim_item_update
.get("vim_name"):
2632 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2634 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2635 if ro_vim_item_update
.get("vim_id"):
2636 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2638 # update general status
2639 if ro_vim_item_update
.get("vim_status"):
2640 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2644 if ro_vim_item_update
.get("interfaces"):
2645 path_interfaces
= path_item
+ ".interfaces"
2647 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2651 path_interfaces
+ ".{}.".format(i
) + k
: v
2652 for k
, v
in iface
.items()
2653 if k
in ("vlan", "compute_node", "pci")
2657 # put ip_address and mac_address with ip-address and mac-address
2658 if iface
.get("ip_address"):
2660 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2661 ] = iface
["ip_address"]
2663 if iface
.get("mac_address"):
2665 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2666 ] = iface
["mac_address"]
2668 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2669 update_dict
["ip-address"] = iface
.get("ip_address").split(
2673 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2674 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2678 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2680 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2681 if ro_vim_item_update
.get("interfaces"):
2682 search_key
= path_vim_status
+ ".interfaces"
2683 if update_dict
.get(search_key
):
2684 interfaces_backup_update
= {
2685 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
2690 q_filter
={"_id": _id
},
2691 update_dict
=interfaces_backup_update
,
2695 update_dict
= {path_item
+ ".status": "DELETED"}
2698 q_filter
={"_id": _id
},
2699 update_dict
=update_dict
,
2700 unset
={path_vim_status
: None},
2703 def _process_delete_db_tasks(self
):
2705 Delete task from database because vnfrs or nsrs or both have been deleted
2706 :return: None. Uses and modify self.tasks_to_delete
2708 while self
.tasks_to_delete
:
2709 task
= self
.tasks_to_delete
[0]
2710 vnfrs_deleted
= None
2711 nsr_id
= task
["nsr_id"]
2713 if task
["target_record"].startswith("vnfrs:"):
2714 # check if nsrs is present
2715 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2716 vnfrs_deleted
= task
["target_record"].split(":")[1]
2719 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2720 except Exception as e
:
2722 "Error deleting task={}: {}".format(task
["task_id"], e
)
2724 self
.tasks_to_delete
.pop(0)
2727 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2729 Static method because it is called from osm_ng_ro.ns
2730 :param db: instance of database to use
2731 :param nsr_id: affected nsrs id
2732 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2733 :return: None, exception is fails
2736 for retry
in range(retries
):
2737 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2741 for ro_task
in ro_tasks
:
2743 to_delete_ro_task
= True
2745 for index
, task
in enumerate(ro_task
["tasks"]):
2748 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2750 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2752 db_update
["tasks.{}".format(index
)] = None
2754 # used by other nsr, ro_task cannot be deleted
2755 to_delete_ro_task
= False
2757 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2758 if to_delete_ro_task
:
2762 "_id": ro_task
["_id"],
2763 "modified_at": ro_task
["modified_at"],
2765 fail_on_empty
=False,
2769 db_update
["modified_at"] = now
2773 "_id": ro_task
["_id"],
2774 "modified_at": ro_task
["modified_at"],
2776 update_dict
=db_update
,
2777 fail_on_empty
=False,
2783 raise NsWorkerException("Exceeded {} retries".format(retries
))
2787 self
.logger
.info("Starting")
2789 # step 1: get commands from queue
2791 if self
.vim_targets
:
2792 task
= self
.task_queue
.get(block
=False)
2795 self
.logger
.debug("enters in idle state")
2797 task
= self
.task_queue
.get(block
=True)
2800 if task
[0] == "terminate":
2802 elif task
[0] == "load_vim":
2803 self
.logger
.info("order to load vim {}".format(task
[1]))
2804 self
._load
_vim
(task
[1])
2805 elif task
[0] == "unload_vim":
2806 self
.logger
.info("order to unload vim {}".format(task
[1]))
2807 self
._unload
_vim
(task
[1])
2808 elif task
[0] == "reload_vim":
2809 self
._reload
_vim
(task
[1])
2810 elif task
[0] == "check_vim":
2811 self
.logger
.info("order to check vim {}".format(task
[1]))
2812 self
._check
_vim
(task
[1])
2814 except Exception as e
:
2815 if isinstance(e
, queue
.Empty
):
2818 self
.logger
.critical(
2819 "Error processing task: {}".format(e
), exc_info
=True
2822 # step 2: process pending_tasks, delete not needed tasks
2824 if self
.tasks_to_delete
:
2825 self
._process
_delete
_db
_tasks
()
2828 # Log RO tasks only when loglevel is DEBUG
2829 if self.logger.getEffectiveLevel() == logging.DEBUG:
2830 _ = self._get_db_all_tasks()
2832 ro_task
= self
._get
_db
_task
()
2834 self
.logger
.debug("Task to process: {}".format(ro_task
))
2836 self
._process
_pending
_tasks
(ro_task
)
2840 except Exception as e
:
2841 self
.logger
.critical(
2842 "Unexpected exception at run: " + str(e
), exc_info
=True
2845 self
.logger
.info("Finishing")