1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
30 from os
import makedirs
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
43 from osm_ro_plugin
import vimconn
44 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
45 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
219 "task={} {} new-net={} created={}".format(
220 task_id
, ro_task
["target_id"], vim_net_id
, created
224 return "BUILD", ro_vim_item_update
225 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
227 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
229 ro_vim_item_update
= {
230 "vim_status": "VIM_ERROR",
232 "vim_message": str(e
),
235 return "FAILED", ro_vim_item_update
237 def refresh(self
, ro_task
):
238 """Call VIM to get network status"""
239 ro_task_id
= ro_task
["_id"]
240 target_vim
= self
.my_vims
[ro_task
["target_id"]]
241 vim_id
= ro_task
["vim_info"]["vim_id"]
242 net_to_refresh_list
= [vim_id
]
245 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
246 vim_info
= vim_dict
[vim_id
]
248 if vim_info
["status"] == "ACTIVE":
250 elif vim_info
["status"] == "BUILD":
251 task_status
= "BUILD"
253 task_status
= "FAILED"
254 except vimconn
.VimConnException
as e
:
255 # Mark all tasks at VIM_ERROR status
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id
, ro_task
["target_id"], vim_id
, e
261 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
262 task_status
= "FAILED"
264 ro_vim_item_update
= {}
265 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
266 ro_vim_item_update
["vim_status"] = vim_info
["status"]
268 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
269 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
271 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
273 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
274 elif vim_info
["status"] == "DELETED":
275 ro_vim_item_update
["vim_id"] = None
276 ro_vim_item_update
["vim_message"] = "Deleted externally"
278 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
279 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
281 if ro_vim_item_update
:
283 "ro_task={} {} get-net={}: status={} {}".format(
285 ro_task
["target_id"],
287 ro_vim_item_update
.get("vim_status"),
288 ro_vim_item_update
.get("vim_message")
289 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
294 return task_status
, ro_vim_item_update
296 def delete(self
, ro_task
, task_index
):
297 task
= ro_task
["tasks"][task_index
]
298 task_id
= task
["task_id"]
299 net_vim_id
= ro_task
["vim_info"]["vim_id"]
300 ro_vim_item_update_ok
= {
301 "vim_status": "DELETED",
303 "vim_message": "DELETED",
308 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
309 target_vim
= self
.my_vims
[ro_task
["target_id"]]
310 target_vim
.delete_network(
311 net_vim_id
, ro_task
["vim_info"]["created_items"]
313 except vimconn
.VimConnNotFoundException
:
314 ro_vim_item_update_ok
["vim_message"] = "already deleted"
315 except vimconn
.VimConnException
as e
:
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
321 ro_vim_item_update
= {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e
),
326 return "FAILED", ro_vim_item_update
329 "task={} {} del-net={} {}".format(
331 ro_task
["target_id"],
333 ro_vim_item_update_ok
.get("vim_message", ""),
337 return "DONE", ro_vim_item_update_ok
340 class VimInteractionVdu(VimInteractionBase
):
341 max_retries_inject_ssh_key
= 20 # 20 times
342 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
344 def new(self
, ro_task
, task_index
, task_depends
):
345 task
= ro_task
["tasks"][task_index
]
346 task_id
= task
["task_id"]
349 target_vim
= self
.my_vims
[ro_task
["target_id"]]
352 params
= task
["params"]
353 params_copy
= deepcopy(params
)
354 net_list
= params_copy
["net_list"]
357 # change task_id into network_id
358 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
359 network_id
= task_depends
[net
["net_id"]]
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net
["net_id"])
367 net
["net_id"] = network_id
369 if params_copy
["image_id"].startswith("TASK-"):
370 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
372 if params_copy
["flavor_id"].startswith("TASK-"):
373 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
375 affinity_group_list
= params_copy
["affinity_group_list"]
376 for affinity_group
in affinity_group_list
:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group
and affinity_group
[
380 ].startswith("TASK-"):
381 affinity_group_id
= task_depends
[
382 affinity_group
["affinity_group_id"]
385 if not affinity_group_id
:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group
["affinity_group_id"])
390 affinity_group
["affinity_group_id"] = affinity_group_id
391 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
392 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
394 # add to created items previous_created_volumes (healing)
395 if task
.get("previous_created_volumes"):
396 for k
, v
in task
["previous_created_volumes"].items():
399 ro_vim_item_update
= {
401 "vim_status": "BUILD",
403 "created_items": created_items
,
406 "interfaces_vim_ids": interfaces
,
408 "interfaces_backup": [],
411 "task={} {} new-vm={} created={}".format(
412 task_id
, ro_task
["target_id"], vim_vm_id
, created
416 return "BUILD", ro_vim_item_update
417 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
418 self
.logger
.debug(traceback
.format_exc())
420 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
422 ro_vim_item_update
= {
423 "vim_status": "VIM_ERROR",
425 "vim_message": str(e
),
428 return "FAILED", ro_vim_item_update
430 def delete(self
, ro_task
, task_index
):
431 task
= ro_task
["tasks"][task_index
]
432 task_id
= task
["task_id"]
433 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
434 ro_vim_item_update_ok
= {
435 "vim_status": "DELETED",
437 "vim_message": "DELETED",
443 "delete_vminstance: vm_vim_id={} created_items={}".format(
444 vm_vim_id
, ro_task
["vim_info"]["created_items"]
447 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
448 target_vim
= self
.my_vims
[ro_task
["target_id"]]
449 target_vim
.delete_vminstance(
451 ro_task
["vim_info"]["created_items"],
452 ro_task
["vim_info"].get("volumes_to_hold", []),
454 except vimconn
.VimConnNotFoundException
:
455 ro_vim_item_update_ok
["vim_message"] = "already deleted"
456 except vimconn
.VimConnException
as e
:
458 "ro_task={} vim={} del-vm={}: {}".format(
459 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
462 ro_vim_item_update
= {
463 "vim_status": "VIM_ERROR",
464 "vim_message": "Error while deleting: {}".format(e
),
467 return "FAILED", ro_vim_item_update
470 "task={} {} del-vm={} {}".format(
472 ro_task
["target_id"],
474 ro_vim_item_update_ok
.get("vim_message", ""),
478 return "DONE", ro_vim_item_update_ok
480 def refresh(self
, ro_task
):
481 """Call VIM to get vm status"""
482 ro_task_id
= ro_task
["_id"]
483 target_vim
= self
.my_vims
[ro_task
["target_id"]]
484 vim_id
= ro_task
["vim_info"]["vim_id"]
489 vm_to_refresh_list
= [vim_id
]
491 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
492 vim_info
= vim_dict
[vim_id
]
494 if vim_info
["status"] == "ACTIVE":
496 elif vim_info
["status"] == "BUILD":
497 task_status
= "BUILD"
499 task_status
= "FAILED"
501 # try to load and parse vim_information
503 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
504 if vim_info_info
.get("name"):
505 vim_info
["name"] = vim_info_info
["name"]
506 except Exception as vim_info_error
:
507 self
.logger
.exception(
508 f
"{vim_info_error} occured while getting the vim_info from yaml"
510 except vimconn
.VimConnException
as e
:
511 # Mark all tasks at VIM_ERROR status
513 "ro_task={} vim={} get-vm={}: {}".format(
514 ro_task_id
, ro_task
["target_id"], vim_id
, e
517 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
518 task_status
= "FAILED"
520 ro_vim_item_update
= {}
522 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
524 if vim_info
.get("interfaces"):
525 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
529 for iface
in vim_info
["interfaces"]
530 if vim_iface_id
== iface
["vim_interface_id"]
535 # iface.pop("vim_info", None)
536 vim_interfaces
.append(iface
)
540 for t
in ro_task
["tasks"]
541 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
543 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
544 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
548 mgmt_vdu_iface
= task_create
.get(
549 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
552 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
554 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
555 ro_vim_item_update
["interfaces"] = vim_interfaces
557 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
558 ro_vim_item_update
["vim_status"] = vim_info
["status"]
560 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
561 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
563 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
564 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
565 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
566 elif vim_info
["status"] == "DELETED":
567 ro_vim_item_update
["vim_id"] = None
568 ro_vim_item_update
["vim_message"] = "Deleted externally"
570 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
571 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
573 if ro_vim_item_update
:
575 "ro_task={} {} get-vm={}: status={} {}".format(
577 ro_task
["target_id"],
579 ro_vim_item_update
.get("vim_status"),
580 ro_vim_item_update
.get("vim_message")
581 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
586 return task_status
, ro_vim_item_update
588 def exec(self
, ro_task
, task_index
, task_depends
):
589 task
= ro_task
["tasks"][task_index
]
590 task_id
= task
["task_id"]
591 target_vim
= self
.my_vims
[ro_task
["target_id"]]
592 db_task_update
= {"retries": 0}
593 retries
= task
.get("retries", 0)
596 params
= task
["params"]
597 params_copy
= deepcopy(params
)
598 params_copy
["ro_key"] = self
.db
.decrypt(
599 params_copy
.pop("private_key"),
600 params_copy
.pop("schema_version"),
601 params_copy
.pop("salt"),
603 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
604 target_vim
.inject_user_key(**params_copy
)
606 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
613 ) # params_copy["key"]
614 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
617 self
.logger
.debug(traceback
.format_exc())
618 if retries
< self
.max_retries_inject_ssh_key
:
624 "next_retry": self
.time_retries_inject_ssh_key
,
629 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
631 ro_vim_item_update
= {"vim_message": str(e
)}
633 return "FAILED", ro_vim_item_update
, db_task_update
636 class VimInteractionImage(VimInteractionBase
):
637 def new(self
, ro_task
, task_index
, task_depends
):
638 task
= ro_task
["tasks"][task_index
]
639 task_id
= task
["task_id"]
642 target_vim
= self
.my_vims
[ro_task
["target_id"]]
646 if task
.get("find_params"):
647 vim_images
= target_vim
.get_image_list(**task
["find_params"])
650 raise NsWorkerExceptionNotFound(
651 "Image not found with this criteria: '{}'".format(
655 elif len(vim_images
) > 1:
656 raise NsWorkerException(
657 "More than one image found with this criteria: '{}'".format(
662 vim_image_id
= vim_images
[0]["id"]
664 ro_vim_item_update
= {
665 "vim_id": vim_image_id
,
666 "vim_status": "DONE",
668 "created_items": created_items
,
673 "task={} {} new-image={} created={}".format(
674 task_id
, ro_task
["target_id"], vim_image_id
, created
678 return "DONE", ro_vim_item_update
679 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
681 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
683 ro_vim_item_update
= {
684 "vim_status": "VIM_ERROR",
686 "vim_message": str(e
),
689 return "FAILED", ro_vim_item_update
692 class VimInteractionSharedVolume(VimInteractionBase
):
693 def delete(self
, ro_task
, task_index
):
694 task
= ro_task
["tasks"][task_index
]
695 task_id
= task
["task_id"]
696 shared_volume_vim_id
= ro_task
["vim_info"]["vim_id"]
697 created_items
= ro_task
["vim_info"]["created_items"]
698 ro_vim_item_update_ok
= {
699 "vim_status": "DELETED",
701 "vim_message": "DELETED",
704 if created_items
and created_items
.get(shared_volume_vim_id
).get("keep"):
705 ro_vim_item_update_ok
= {
706 "vim_status": "ACTIVE",
710 return "DONE", ro_vim_item_update_ok
712 if shared_volume_vim_id
:
713 target_vim
= self
.my_vims
[ro_task
["target_id"]]
714 target_vim
.delete_shared_volumes(shared_volume_vim_id
)
715 except vimconn
.VimConnNotFoundException
:
716 ro_vim_item_update_ok
["vim_message"] = "already deleted"
717 except vimconn
.VimConnException
as e
:
719 "ro_task={} vim={} del-shared-volume={}: {}".format(
720 ro_task
["_id"], ro_task
["target_id"], shared_volume_vim_id
, e
723 ro_vim_item_update
= {
724 "vim_status": "VIM_ERROR",
725 "vim_message": "Error while deleting: {}".format(e
),
728 return "FAILED", ro_vim_item_update
731 "task={} {} del-shared-volume={} {}".format(
733 ro_task
["target_id"],
734 shared_volume_vim_id
,
735 ro_vim_item_update_ok
.get("vim_message", ""),
739 return "DONE", ro_vim_item_update_ok
741 def new(self
, ro_task
, task_index
, task_depends
):
742 task
= ro_task
["tasks"][task_index
]
743 task_id
= task
["task_id"]
746 target_vim
= self
.my_vims
[ro_task
["target_id"]]
749 shared_volume_vim_id
= None
750 shared_volume_data
= None
752 if task
.get("params"):
753 shared_volume_data
= task
["params"]
755 if shared_volume_data
:
757 f
"Creating the new shared_volume for {shared_volume_data}\n"
761 shared_volume_vim_id
,
762 ) = target_vim
.new_shared_volumes(shared_volume_data
)
764 created_items
[shared_volume_vim_id
] = {
765 "name": shared_volume_name
,
766 "keep": shared_volume_data
.get("keep"),
769 ro_vim_item_update
= {
770 "vim_id": shared_volume_vim_id
,
771 "vim_status": "ACTIVE",
773 "created_items": created_items
,
778 "task={} {} new-shared-volume={} created={}".format(
779 task_id
, ro_task
["target_id"], shared_volume_vim_id
, created
783 return "DONE", ro_vim_item_update
784 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
786 "task={} vim={} new-shared-volume:"
787 " {}".format(task_id
, ro_task
["target_id"], e
)
789 ro_vim_item_update
= {
790 "vim_status": "VIM_ERROR",
792 "vim_message": str(e
),
795 return "FAILED", ro_vim_item_update
798 class VimInteractionFlavor(VimInteractionBase
):
799 def delete(self
, ro_task
, task_index
):
800 task
= ro_task
["tasks"][task_index
]
801 task_id
= task
["task_id"]
802 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
803 ro_vim_item_update_ok
= {
804 "vim_status": "DELETED",
806 "vim_message": "DELETED",
812 target_vim
= self
.my_vims
[ro_task
["target_id"]]
813 target_vim
.delete_flavor(flavor_vim_id
)
814 except vimconn
.VimConnNotFoundException
:
815 ro_vim_item_update_ok
["vim_message"] = "already deleted"
816 except vimconn
.VimConnException
as e
:
818 "ro_task={} vim={} del-flavor={}: {}".format(
819 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
822 ro_vim_item_update
= {
823 "vim_status": "VIM_ERROR",
824 "vim_message": "Error while deleting: {}".format(e
),
827 return "FAILED", ro_vim_item_update
830 "task={} {} del-flavor={} {}".format(
832 ro_task
["target_id"],
834 ro_vim_item_update_ok
.get("vim_message", ""),
838 return "DONE", ro_vim_item_update_ok
840 def new(self
, ro_task
, task_index
, task_depends
):
841 task
= ro_task
["tasks"][task_index
]
842 task_id
= task
["task_id"]
845 target_vim
= self
.my_vims
[ro_task
["target_id"]]
850 if task
.get("find_params"):
852 flavor_data
= task
["find_params"]["flavor_data"]
853 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
854 except vimconn
.VimConnNotFoundException
as flavor_not_found_msg
:
856 f
"VimConnNotFoundException occured: {flavor_not_found_msg}"
859 if not vim_flavor_id
and task
.get("params"):
861 flavor_data
= task
["params"]["flavor_data"]
862 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
865 ro_vim_item_update
= {
866 "vim_id": vim_flavor_id
,
867 "vim_status": "DONE",
869 "created_items": created_items
,
874 "task={} {} new-flavor={} created={}".format(
875 task_id
, ro_task
["target_id"], vim_flavor_id
, created
879 return "DONE", ro_vim_item_update
880 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
882 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
884 ro_vim_item_update
= {
885 "vim_status": "VIM_ERROR",
887 "vim_message": str(e
),
890 return "FAILED", ro_vim_item_update
893 class VimInteractionAffinityGroup(VimInteractionBase
):
894 def delete(self
, ro_task
, task_index
):
895 task
= ro_task
["tasks"][task_index
]
896 task_id
= task
["task_id"]
897 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
898 ro_vim_item_update_ok
= {
899 "vim_status": "DELETED",
901 "vim_message": "DELETED",
906 if affinity_group_vim_id
:
907 target_vim
= self
.my_vims
[ro_task
["target_id"]]
908 target_vim
.delete_affinity_group(affinity_group_vim_id
)
909 except vimconn
.VimConnNotFoundException
:
910 ro_vim_item_update_ok
["vim_message"] = "already deleted"
911 except vimconn
.VimConnException
as e
:
913 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
914 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
917 ro_vim_item_update
= {
918 "vim_status": "VIM_ERROR",
919 "vim_message": "Error while deleting: {}".format(e
),
922 return "FAILED", ro_vim_item_update
925 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
927 ro_task
["target_id"],
928 affinity_group_vim_id
,
929 ro_vim_item_update_ok
.get("vim_message", ""),
933 return "DONE", ro_vim_item_update_ok
935 def new(self
, ro_task
, task_index
, task_depends
):
936 task
= ro_task
["tasks"][task_index
]
937 task_id
= task
["task_id"]
940 target_vim
= self
.my_vims
[ro_task
["target_id"]]
943 affinity_group_vim_id
= None
944 affinity_group_data
= None
946 if task
.get("params"):
947 affinity_group_data
= task
["params"].get("affinity_group_data")
949 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
951 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
952 "vim-affinity-group-id"
954 affinity_group_vim_id
= target_vim
.get_affinity_group(
955 param_affinity_group_id
957 except vimconn
.VimConnNotFoundException
:
959 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
960 "could not be found at VIM. Creating a new one.".format(
961 task_id
, ro_task
["target_id"], param_affinity_group_id
965 if not affinity_group_vim_id
and affinity_group_data
:
966 affinity_group_vim_id
= target_vim
.new_affinity_group(
971 ro_vim_item_update
= {
972 "vim_id": affinity_group_vim_id
,
973 "vim_status": "DONE",
975 "created_items": created_items
,
980 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
981 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
985 return "DONE", ro_vim_item_update
986 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
988 "task={} vim={} new-affinity-or-anti-affinity-group:"
989 " {}".format(task_id
, ro_task
["target_id"], e
)
991 ro_vim_item_update
= {
992 "vim_status": "VIM_ERROR",
994 "vim_message": str(e
),
997 return "FAILED", ro_vim_item_update
1000 class VimInteractionUpdateVdu(VimInteractionBase
):
1001 def exec(self
, ro_task
, task_index
, task_depends
):
1002 task
= ro_task
["tasks"][task_index
]
1003 task_id
= task
["task_id"]
1004 db_task_update
= {"retries": 0}
1007 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1010 if task
.get("params"):
1011 vim_vm_id
= task
["params"].get("vim_vm_id")
1012 action
= task
["params"].get("action")
1013 context
= {action
: action
}
1014 target_vim
.action_vminstance(vim_vm_id
, context
)
1016 ro_vim_item_update
= {
1017 "vim_id": vim_vm_id
,
1018 "vim_status": "DONE",
1020 "created_items": created_items
,
1021 "vim_details": None,
1022 "vim_message": None,
1025 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1027 return "DONE", ro_vim_item_update
, db_task_update
1028 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1030 "task={} vim={} VM Migration:"
1031 " {}".format(task_id
, ro_task
["target_id"], e
)
1033 ro_vim_item_update
= {
1034 "vim_status": "VIM_ERROR",
1036 "vim_message": str(e
),
1039 return "FAILED", ro_vim_item_update
, db_task_update
1042 class VimInteractionSdnNet(VimInteractionBase
):
1044 def _match_pci(port_pci
, mapping
):
1046 Check if port_pci matches with mapping
1047 mapping can have brackets to indicate that several chars are accepted. e.g
1048 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1049 :param port_pci: text
1050 :param mapping: text, can contain brackets to indicate several chars are available
1051 :return: True if matches, False otherwise
1053 if not port_pci
or not mapping
:
1055 if port_pci
== mapping
:
1061 bracket_start
= mapping
.find("[", mapping_index
)
1063 if bracket_start
== -1:
1066 bracket_end
= mapping
.find("]", bracket_start
)
1067 if bracket_end
== -1:
1070 length
= bracket_start
- mapping_index
1073 and port_pci
[pci_index
: pci_index
+ length
]
1074 != mapping
[mapping_index
:bracket_start
]
1079 port_pci
[pci_index
+ length
]
1080 not in mapping
[bracket_start
+ 1 : bracket_end
]
1084 pci_index
+= length
+ 1
1085 mapping_index
= bracket_end
+ 1
1087 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
1092 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
1094 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1095 :param vim_account_id:
1100 for vld
in vlds_to_connect
:
1101 table
, _
, db_id
= vld
.partition(":")
1102 db_id
, _
, vld
= db_id
.partition(":")
1103 _
, _
, vld_id
= vld
.partition(".")
1105 if table
== "vnfrs":
1106 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
1107 iface_key
= "vnf-vld-id"
1108 else: # table == "nsrs"
1109 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1110 iface_key
= "ns-vld-id"
1112 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1114 for db_vnfr
in db_vnfrs
:
1115 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1116 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1117 if interface
.get(iface_key
) == vld_id
and interface
.get(
1119 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1121 interface_
= interface
.copy()
1122 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1123 db_vnfr
["_id"], vdu_index
, iface_index
1126 if vdur
.get("status") == "ERROR":
1127 interface_
["status"] = "ERROR"
1129 interfaces
.append(interface_
)
1133 def refresh(self
, ro_task
):
1134 # look for task create
1135 task_create_index
, _
= next(
1137 for i_t
in enumerate(ro_task
["tasks"])
1139 and i_t
[1]["action"] == "CREATE"
1140 and i_t
[1]["status"] != "FINISHED"
1143 return self
.new(ro_task
, task_create_index
, None)
1145 def new(self
, ro_task
, task_index
, task_depends
):
1146 task
= ro_task
["tasks"][task_index
]
1147 task_id
= task
["task_id"]
1148 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1150 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1152 created_items
= ro_task
["vim_info"].get("created_items")
1153 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1154 new_connected_ports
= []
1155 last_update
= ro_task
["vim_info"].get("last_update", 0)
1156 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1158 created
= ro_task
["vim_info"].get("created", False)
1162 params
= task
["params"]
1163 vlds_to_connect
= params
.get("vlds", [])
1164 associated_vim
= params
.get("target_vim")
1165 # external additional ports
1166 additional_ports
= params
.get("sdn-ports") or ()
1167 _
, _
, vim_account_id
= (
1169 if associated_vim
is None
1170 else associated_vim
.partition(":")
1174 # get associated VIM
1175 if associated_vim
not in self
.db_vims
:
1176 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1177 "vim_accounts", {"_id": vim_account_id
}
1180 db_vim
= self
.db_vims
[associated_vim
]
1182 # look for ports to connect
1183 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1187 pending_ports
= error_ports
= 0
1189 sdn_need_update
= False
1192 vlan_used
= port
.get("vlan") or vlan_used
1194 # TODO. Do not connect if already done
1195 if not port
.get("compute_node") or not port
.get("pci"):
1196 if port
.get("status") == "ERROR":
1203 compute_node_mappings
= next(
1206 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1207 if c
and c
["compute_node"] == port
["compute_node"]
1212 if compute_node_mappings
:
1213 # process port_mapping pci of type 0000:af:1[01].[1357]
1217 for p
in compute_node_mappings
["ports"]
1218 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1224 if not db_vim
["config"].get("mapping_not_needed"):
1226 "Port mapping not found for compute_node={} pci={}".format(
1227 port
["compute_node"], port
["pci"]
1234 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1236 "service_endpoint_id": pmap
.get("service_endpoint_id")
1237 or service_endpoint_id
,
1238 "service_endpoint_encapsulation_type": "dot1q"
1239 if port
["type"] == "SR-IOV"
1241 "service_endpoint_encapsulation_info": {
1242 "vlan": port
.get("vlan"),
1243 "mac": port
.get("mac-address"),
1244 "device_id": pmap
.get("device_id") or port
["compute_node"],
1245 "device_interface_id": pmap
.get("device_interface_id")
1247 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1248 "switch_port": pmap
.get("switch_port"),
1249 "service_mapping_info": pmap
.get("service_mapping_info"),
1254 # if port["modified_at"] > last_update:
1255 # sdn_need_update = True
1256 new_connected_ports
.append(port
["id"]) # TODO
1257 sdn_ports
.append(new_port
)
1261 "{} interfaces have not been created as VDU is on ERROR status".format(
1266 # connect external ports
1267 for index
, additional_port
in enumerate(additional_ports
):
1268 additional_port_id
= additional_port
.get(
1269 "service_endpoint_id"
1270 ) or "external-{}".format(index
)
1273 "service_endpoint_id": additional_port_id
,
1274 "service_endpoint_encapsulation_type": additional_port
.get(
1275 "service_endpoint_encapsulation_type", "dot1q"
1277 "service_endpoint_encapsulation_info": {
1278 "vlan": additional_port
.get("vlan") or vlan_used
,
1279 "mac": additional_port
.get("mac_address"),
1280 "device_id": additional_port
.get("device_id"),
1281 "device_interface_id": additional_port
.get(
1282 "device_interface_id"
1284 "switch_dpid": additional_port
.get("switch_dpid")
1285 or additional_port
.get("switch_id"),
1286 "switch_port": additional_port
.get("switch_port"),
1287 "service_mapping_info": additional_port
.get(
1288 "service_mapping_info"
1293 new_connected_ports
.append(additional_port_id
)
1296 # if there are more ports to connect or they have been modified, call create/update
1298 sdn_status
= "ERROR"
1299 sdn_info
= "; ".join(error_list
)
1300 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1301 last_update
= time
.time()
1304 if len(sdn_ports
) < 2:
1305 sdn_status
= "ACTIVE"
1307 if not pending_ports
:
1309 "task={} {} new-sdn-net done, less than 2 ports".format(
1310 task_id
, ro_task
["target_id"]
1314 net_type
= params
.get("type") or "ELAN"
1318 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1321 "task={} {} new-sdn-net={} created={}".format(
1322 task_id
, ro_task
["target_id"], sdn_net_id
, created
1326 created_items
= target_vim
.edit_connectivity_service(
1327 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1331 "task={} {} update-sdn-net={} created={}".format(
1332 task_id
, ro_task
["target_id"], sdn_net_id
, created
1336 connected_ports
= new_connected_ports
1338 wim_status_dict
= target_vim
.get_connectivity_service_status(
1339 sdn_net_id
, conn_info
=created_items
1341 sdn_status
= wim_status_dict
["sdn_status"]
1343 if wim_status_dict
.get("sdn_info"):
1344 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1346 if wim_status_dict
.get("error_msg"):
1347 sdn_info
= wim_status_dict
.get("error_msg") or ""
1350 if sdn_status
!= "ERROR":
1351 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1352 len(ports
) - pending_ports
, len(ports
)
1355 if sdn_status
== "ACTIVE":
1356 sdn_status
= "BUILD"
1358 ro_vim_item_update
= {
1359 "vim_id": sdn_net_id
,
1360 "vim_status": sdn_status
,
1362 "created_items": created_items
,
1363 "connected_ports": connected_ports
,
1364 "vim_details": sdn_info
,
1365 "vim_message": None,
1366 "last_update": last_update
,
1369 return sdn_status
, ro_vim_item_update
1370 except Exception as e
:
1372 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1373 exc_info
=not isinstance(
1374 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1377 ro_vim_item_update
= {
1378 "vim_status": "VIM_ERROR",
1380 "vim_message": str(e
),
1383 return "FAILED", ro_vim_item_update
1385 def delete(self
, ro_task
, task_index
):
1386 task
= ro_task
["tasks"][task_index
]
1387 task_id
= task
["task_id"]
1388 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1389 ro_vim_item_update_ok
= {
1390 "vim_status": "DELETED",
1392 "vim_message": "DELETED",
1398 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1399 target_vim
.delete_connectivity_service(
1400 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1403 except Exception as e
:
1405 isinstance(e
, sdnconn
.SdnConnectorError
)
1406 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1408 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1411 "ro_task={} vim={} del-sdn-net={}: {}".format(
1412 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1414 exc_info
=not isinstance(
1415 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1418 ro_vim_item_update
= {
1419 "vim_status": "VIM_ERROR",
1420 "vim_message": "Error while deleting: {}".format(e
),
1423 return "FAILED", ro_vim_item_update
1426 "task={} {} del-sdn-net={} {}".format(
1428 ro_task
["target_id"],
1430 ro_vim_item_update_ok
.get("vim_message", ""),
1434 return "DONE", ro_vim_item_update_ok
1437 class VimInteractionMigration(VimInteractionBase
):
1438 def exec(self
, ro_task
, task_index
, task_depends
):
1439 task
= ro_task
["tasks"][task_index
]
1440 task_id
= task
["task_id"]
1441 db_task_update
= {"retries": 0}
1442 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1446 refreshed_vim_info
= {}
1449 if task
.get("params"):
1450 vim_vm_id
= task
["params"].get("vim_vm_id")
1451 migrate_host
= task
["params"].get("migrate_host")
1452 _
, migrated_compute_node
= target_vim
.migrate_instance(
1453 vim_vm_id
, migrate_host
1456 if migrated_compute_node
:
1457 # When VM is migrated, vdu["vim_info"] needs to be updated
1458 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1459 ro_task
["target_id"]
1462 # Refresh VM to get new vim_info
1463 vm_to_refresh_list
= [vim_vm_id
]
1464 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1465 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1467 if refreshed_vim_info
.get("interfaces"):
1468 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1472 for iface
in refreshed_vim_info
["interfaces"]
1473 if old_iface
["vim_interface_id"]
1474 == iface
["vim_interface_id"]
1478 vim_interfaces
.append(iface
)
1480 ro_vim_item_update
= {
1481 "vim_id": vim_vm_id
,
1482 "vim_status": "ACTIVE",
1484 "created_items": created_items
,
1485 "vim_details": None,
1486 "vim_message": None,
1489 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1493 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1496 ro_vim_item_update
["interfaces"] = vim_interfaces
1499 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1502 return "DONE", ro_vim_item_update
, db_task_update
1504 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1506 "task={} vim={} VM Migration:"
1507 " {}".format(task_id
, ro_task
["target_id"], e
)
1509 ro_vim_item_update
= {
1510 "vim_status": "VIM_ERROR",
1512 "vim_message": str(e
),
1515 return "FAILED", ro_vim_item_update
, db_task_update
1518 class VimInteractionResize(VimInteractionBase
):
1519 def exec(self
, ro_task
, task_index
, task_depends
):
1520 task
= ro_task
["tasks"][task_index
]
1521 task_id
= task
["task_id"]
1522 db_task_update
= {"retries": 0}
1524 target_flavor_uuid
= None
1526 refreshed_vim_info
= {}
1527 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1530 if task
.get("params"):
1531 vim_vm_id
= task
["params"].get("vim_vm_id")
1532 flavor_dict
= task
["params"].get("flavor_dict")
1533 self
.logger
.info("flavor_dict %s", flavor_dict
)
1536 target_flavor_uuid
= target_vim
.get_flavor_id_from_data(flavor_dict
)
1537 except Exception as e
:
1538 self
.logger
.info("Cannot find any flavor matching %s.", str(e
))
1540 target_flavor_uuid
= target_vim
.new_flavor(flavor_dict
)
1541 except Exception as e
:
1542 self
.logger
.error("Error creating flavor at VIM %s.", str(e
))
1544 if target_flavor_uuid
is not None:
1545 resized_status
= target_vim
.resize_instance(
1546 vim_vm_id
, target_flavor_uuid
1550 # Refresh VM to get new vim_info
1551 vm_to_refresh_list
= [vim_vm_id
]
1552 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1553 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1555 ro_vim_item_update
= {
1556 "vim_id": vim_vm_id
,
1557 "vim_status": "DONE",
1559 "created_items": created_items
,
1560 "vim_details": None,
1561 "vim_message": None,
1564 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1568 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1571 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1573 return "DONE", ro_vim_item_update
, db_task_update
1574 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1576 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1578 ro_vim_item_update
= {
1579 "vim_status": "VIM_ERROR",
1581 "vim_message": str(e
),
1584 return "FAILED", ro_vim_item_update
, db_task_update
1587 class ConfigValidate
:
1588 def __init__(self
, config
: Dict
):
1593 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1595 self
.conf
["period"]["refresh_active"] >= 60
1596 or self
.conf
["period"]["refresh_active"] == -1
1598 return self
.conf
["period"]["refresh_active"]
1604 return self
.conf
["period"]["refresh_build"]
1608 return self
.conf
["period"]["refresh_image"]
1612 return self
.conf
["period"]["refresh_error"]
1615 def queue_size(self
):
1616 return self
.conf
["period"]["queue_size"]
1619 class NsWorker(threading
.Thread
):
1620 def __init__(self
, worker_index
, config
, plugins
, db
):
1622 :param worker_index: thread index
1623 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1624 :param plugins: global shared dict with the loaded plugins
1625 :param db: database class instance to use
1627 threading
.Thread
.__init
__(self
)
1628 self
.config
= config
1629 self
.plugins
= plugins
1630 self
.plugin_name
= "unknown"
1631 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1632 self
.worker_index
= worker_index
1633 # refresh periods for created items
1634 self
.refresh_config
= ConfigValidate(config
)
1635 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1636 # targetvim: vimplugin class
1638 # targetvim: vim information from database
1641 self
.vim_targets
= []
1642 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1645 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1646 "shared-volumes": VimInteractionSharedVolume(
1647 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1649 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1650 "image": VimInteractionImage(
1651 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1653 "flavor": VimInteractionFlavor(
1654 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1656 "sdn_net": VimInteractionSdnNet(
1657 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1659 "update": VimInteractionUpdateVdu(
1660 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1662 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1663 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1665 "migrate": VimInteractionMigration(
1666 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1668 "verticalscale": VimInteractionResize(
1669 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1672 self
.time_last_task_processed
= None
1673 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1674 self
.tasks_to_delete
= []
1675 # it is idle when there are not vim_targets associated
1677 self
.task_locked_time
= config
["global"]["task_locked_time"]
1679 def insert_task(self
, task
):
1681 self
.task_queue
.put(task
, False)
1684 raise NsWorkerException("timeout inserting a task")
1686 def terminate(self
):
1687 self
.insert_task("exit")
1689 def del_task(self
, task
):
1690 with self
.task_lock
:
1691 if task
["status"] == "SCHEDULED":
1692 task
["status"] = "SUPERSEDED"
1694 else: # task["status"] == "processing"
1695 self
.task_lock
.release()
1698 def _process_vim_config(self
, target_id
: str, db_vim
: dict) -> None:
1700 Process vim config, creating vim configuration files as ca_cert
1701 :param target_id: vim/sdn/wim + id
1702 :param db_vim: Vim dictionary obtained from database
1703 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1705 if not db_vim
.get("config"):
1709 work_dir
= "/app/osm_ro/certs"
1712 if db_vim
["config"].get("ca_cert_content"):
1713 file_name
= f
"{work_dir}/{target_id}:{self.worker_index}"
1715 if not path
.isdir(file_name
):
1718 file_name
= file_name
+ "/ca_cert"
1720 with
open(file_name
, "w") as f
:
1721 f
.write(db_vim
["config"]["ca_cert_content"])
1722 del db_vim
["config"]["ca_cert_content"]
1723 db_vim
["config"]["ca_cert"] = file_name
1724 except Exception as e
:
1725 raise NsWorkerException(
1726 "Error writing to file '{}': {}".format(file_name
, e
)
1729 def _load_plugin(self
, name
, type="vim"):
1730 # type can be vim or sdn
1731 if "rovim_dummy" not in self
.plugins
:
1732 self
.plugins
["rovim_dummy"] = VimDummyConnector
1734 if "rosdn_dummy" not in self
.plugins
:
1735 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1737 if name
in self
.plugins
:
1738 return self
.plugins
[name
]
1741 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1742 self
.plugins
[name
] = ep
.load()
1743 except Exception as e
:
1744 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1746 if name
and name
not in self
.plugins
:
1747 raise NsWorkerException(
1748 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1751 return self
.plugins
[name
]
1753 def _unload_vim(self
, target_id
):
1755 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1756 :param target_id: Contains type:_id; where type can be 'vim', ...
1760 self
.db_vims
.pop(target_id
, None)
1761 self
.my_vims
.pop(target_id
, None)
1763 if target_id
in self
.vim_targets
:
1764 self
.vim_targets
.remove(target_id
)
1766 self
.logger
.info("Unloaded {}".format(target_id
))
1767 except Exception as e
:
1768 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1770 def _check_vim(self
, target_id
):
1772 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1773 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1776 target
, _
, _id
= target_id
.partition(":")
1782 loaded
= target_id
in self
.vim_targets
1792 step
= "Getting {} from db".format(target_id
)
1793 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1795 for op_index
, operation
in enumerate(
1796 db_vim
["_admin"].get("operations", ())
1798 if operation
["operationState"] != "PROCESSING":
1801 locked_at
= operation
.get("locked_at")
1803 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1804 # some other thread is doing this operation
1808 op_text
= "_admin.operations.{}.".format(op_index
)
1810 if not self
.db
.set_one(
1814 op_text
+ "operationState": "PROCESSING",
1815 op_text
+ "locked_at": locked_at
,
1818 op_text
+ "locked_at": now
,
1819 "admin.current_operation": op_index
,
1821 fail_on_empty
=False,
1825 unset_dict
[op_text
+ "locked_at"] = None
1826 unset_dict
["current_operation"] = None
1827 step
= "Loading " + target_id
1828 error_text
= self
._load
_vim
(target_id
)
1831 step
= "Checking connectivity"
1834 self
.my_vims
[target_id
].check_vim_connectivity()
1836 self
.my_vims
[target_id
].check_credentials()
1838 update_dict
["_admin.operationalState"] = "ENABLED"
1839 update_dict
["_admin.detailed-status"] = ""
1840 unset_dict
[op_text
+ "detailed-status"] = None
1841 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1845 except Exception as e
:
1846 error_text
= "{}: {}".format(step
, e
)
1847 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1850 if update_dict
or unset_dict
:
1852 update_dict
[op_text
+ "operationState"] = "FAILED"
1853 update_dict
[op_text
+ "detailed-status"] = error_text
1854 unset_dict
.pop(op_text
+ "detailed-status", None)
1855 update_dict
["_admin.operationalState"] = "ERROR"
1856 update_dict
["_admin.detailed-status"] = error_text
1859 update_dict
[op_text
+ "statusEnteredTime"] = now
1863 q_filter
={"_id": _id
},
1864 update_dict
=update_dict
,
1866 fail_on_empty
=False,
1870 self
._unload
_vim
(target_id
)
1872 def _reload_vim(self
, target_id
):
1873 if target_id
in self
.vim_targets
:
1874 self
._load
_vim
(target_id
)
1876 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1877 # just remove it to force load again next time it is needed
1878 self
.db_vims
.pop(target_id
, None)
1880 def _load_vim(self
, target_id
):
1882 Load or reload a vim_account, sdn_controller or wim_account.
1883 Read content from database, load the plugin if not loaded.
1884 In case of error loading the plugin, it load a failing VIM_connector
1885 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1886 :param target_id: Contains type:_id; where type can be 'vim', ...
1887 :return: None if ok, descriptive text if error
1889 target
, _
, _id
= target_id
.partition(":")
1899 step
= "Getting {}={} from db".format(target
, _id
)
1902 # TODO process for wim, sdnc, ...
1903 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1905 # if deep_get(vim, "config", "sdn-controller"):
1906 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1907 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1909 step
= "Decrypting password"
1910 schema_version
= vim
.get("schema_version")
1911 self
.db
.encrypt_decrypt_fields(
1914 fields
=("password", "secret"),
1915 schema_version
=schema_version
,
1918 self
._process
_vim
_config
(target_id
, vim
)
1921 plugin_name
= "rovim_" + vim
["vim_type"]
1922 step
= "Loading plugin '{}'".format(plugin_name
)
1923 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1924 step
= "Loading {}'".format(target_id
)
1925 self
.my_vims
[target_id
] = vim_module_conn(
1928 tenant_id
=vim
.get("vim_tenant_id"),
1929 tenant_name
=vim
.get("vim_tenant_name"),
1932 user
=vim
["vim_user"],
1933 passwd
=vim
["vim_password"],
1934 config
=vim
.get("config") or {},
1938 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
1939 step
= "Loading plugin '{}'".format(plugin_name
)
1940 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1941 step
= "Loading {}'".format(target_id
)
1943 wim_config
= wim
.pop("config", {}) or {}
1944 wim
["uuid"] = wim
["_id"]
1945 if "url" in wim
and "wim_url" not in wim
:
1946 wim
["wim_url"] = wim
["url"]
1947 elif "url" not in wim
and "wim_url" in wim
:
1948 wim
["url"] = wim
["wim_url"]
1951 wim_config
["dpid"] = wim
.pop("dpid")
1953 if wim
.get("switch_id"):
1954 wim_config
["switch_id"] = wim
.pop("switch_id")
1956 # wim, wim_account, config
1957 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1958 self
.db_vims
[target_id
] = vim
1959 self
.error_status
= None
1962 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1964 except Exception as e
:
1966 "Cannot load {} plugin={}: {} {}".format(
1967 target_id
, plugin_name
, step
, e
1971 self
.db_vims
[target_id
] = vim
or {}
1972 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1973 error_status
= "{} Error: {}".format(step
, e
)
1977 if target_id
not in self
.vim_targets
:
1978 self
.vim_targets
.append(target_id
)
1980 def _get_db_task(self
):
1982 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1987 if not self
.time_last_task_processed
:
1988 self
.time_last_task_processed
= now
1993 # Log RO tasks only when loglevel is DEBUG
1994 if self.logger.getEffectiveLevel() == logging.DEBUG:
2001 + str(self.task_locked_time)
2003 + "time_last_task_processed="
2004 + str(self.time_last_task_processed)
2010 locked
= self
.db
.set_one(
2013 "target_id": self
.vim_targets
,
2014 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2015 "locked_at.lt": now
- self
.task_locked_time
,
2016 "to_check_at.lt": self
.time_last_task_processed
,
2017 "to_check_at.gt": -1,
2019 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
2020 fail_on_empty
=False,
2025 ro_task
= self
.db
.get_one(
2028 "target_id": self
.vim_targets
,
2029 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2035 if self
.time_last_task_processed
== now
:
2036 self
.time_last_task_processed
= None
2039 self
.time_last_task_processed
= now
2040 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2042 except DbException
as e
:
2043 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
2044 except Exception as e
:
2045 self
.logger
.critical(
2046 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
2051 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2053 Determine if this task need to be done or superseded
2056 my_task
= ro_task
["tasks"][task_index
]
2057 task_id
= my_task
["task_id"]
2058 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
2059 "created_items", False
2062 self
.logger
.debug("Needed delete: {}".format(needed_delete
))
2063 if my_task
["status"] == "FAILED":
2064 return None, None # TODO need to be retry??
2067 for index
, task
in enumerate(ro_task
["tasks"]):
2068 if index
== task_index
or not task
:
2072 my_task
["target_record"] == task
["target_record"]
2073 and task
["action"] == "CREATE"
2076 db_update
["tasks.{}.status".format(index
)] = task
[
2079 elif task
["action"] == "CREATE" and task
["status"] not in (
2083 needed_delete
= False
2087 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
2089 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
2091 return "SUPERSEDED", None
2092 except Exception as e
:
2093 if not isinstance(e
, NsWorkerException
):
2094 self
.logger
.critical(
2095 "Unexpected exception at _delete_task task={}: {}".format(
2101 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2103 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2105 Determine if this task need to create something at VIM
2108 my_task
= ro_task
["tasks"][task_index
]
2109 task_id
= my_task
["task_id"]
2112 if my_task
["status"] == "FAILED":
2113 return None, None # TODO need to be retry??
2114 elif my_task
["status"] == "SCHEDULED":
2115 # check if already created by another task
2116 for index
, task
in enumerate(ro_task
["tasks"]):
2117 if index
== task_index
or not task
:
2120 if task
["action"] == "CREATE" and task
["status"] not in (
2125 return task
["status"], "COPY_VIM_INFO"
2128 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2129 ro_task
, task_index
, task_depends
2131 # TODO update other CREATE tasks
2132 except Exception as e
:
2133 if not isinstance(e
, NsWorkerException
):
2135 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2138 task_status
= "FAILED"
2139 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2140 # TODO update ro_vim_item_update
2142 return task_status
, ro_vim_item_update
2146 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2148 Look for dependency task
2149 :param task_id: Can be one of
2150 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2151 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2152 3. task.task_id: "<action_id>:number"
2155 :return: database ro_task plus index of task
2158 task_id
.startswith("vim:")
2159 or task_id
.startswith("sdn:")
2160 or task_id
.startswith("wim:")
2162 target_id
, _
, task_id
= task_id
.partition(" ")
2164 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2165 ro_task_dependency
= self
.db
.get_one(
2167 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2168 fail_on_empty
=False,
2171 if ro_task_dependency
:
2172 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2173 if task
["target_record_id"] == task_id
:
2174 return ro_task_dependency
, task_index
2178 for task_index
, task
in enumerate(ro_task
["tasks"]):
2179 if task
and task
["task_id"] == task_id
:
2180 return ro_task
, task_index
2182 ro_task_dependency
= self
.db
.get_one(
2185 "tasks.ANYINDEX.task_id": task_id
,
2186 "tasks.ANYINDEX.target_record.ne": None,
2188 fail_on_empty
=False,
2191 self
.logger
.debug("ro_task_dependency={}".format(ro_task_dependency
))
2192 if ro_task_dependency
:
2193 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2194 if task
["task_id"] == task_id
:
2195 return ro_task_dependency
, task_index
2196 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2198 def update_vm_refresh(self
, ro_task
):
2199 """Enables the VM status updates if self.refresh_config.active parameter
2200 is not -1 and then updates the DB accordingly
2204 self
.logger
.debug("Checking if VM status update config")
2205 next_refresh
= time
.time()
2206 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2208 if next_refresh
!= -1:
2209 db_ro_task_update
= {}
2211 next_check_at
= now
+ (24 * 60 * 60)
2212 next_check_at
= min(next_check_at
, next_refresh
)
2213 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2214 db_ro_task_update
["to_check_at"] = next_check_at
2217 "Finding tasks which to be updated to enable VM status updates"
2219 refresh_tasks
= self
.db
.get_list(
2222 "tasks.status": "DONE",
2223 "to_check_at.lt": 0,
2226 self
.logger
.debug("Updating tasks to change the to_check_at status")
2227 for task
in refresh_tasks
:
2234 update_dict
=db_ro_task_update
,
2238 except Exception as e
:
2239 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2241 def _get_next_refresh(self
, ro_task
: dict, next_refresh
: float):
2242 """Decide the next_refresh according to vim type and refresh config period.
2244 ro_task (dict): ro_task details
2245 next_refresh (float): next refresh time as epoch format
2248 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2250 target_vim
= ro_task
["target_id"]
2251 vim_type
= self
.db_vims
[target_vim
]["vim_type"]
2252 if self
.refresh_config
.active
== -1 or vim_type
== "openstack":
2255 next_refresh
+= self
.refresh_config
.active
2258 def _process_pending_tasks(self
, ro_task
):
2259 ro_task_id
= ro_task
["_id"]
2262 next_check_at
= now
+ (24 * 60 * 60)
2263 db_ro_task_update
= {}
2265 def _update_refresh(new_status
):
2266 # compute next_refresh
2268 nonlocal next_check_at
2269 nonlocal db_ro_task_update
2272 next_refresh
= time
.time()
2274 if task
["item"] in ("image", "flavor"):
2275 next_refresh
+= self
.refresh_config
.image
2276 elif new_status
== "BUILD":
2277 next_refresh
+= self
.refresh_config
.build
2278 elif new_status
== "DONE":
2279 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2281 next_refresh
+= self
.refresh_config
.error
2283 next_check_at
= min(next_check_at
, next_refresh
)
2284 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2285 ro_task
["vim_info"]["refresh_at"] = next_refresh
2289 # Log RO tasks only when loglevel is DEBUG
2290 if self.logger.getEffectiveLevel() == logging.DEBUG:
2291 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2293 # Check if vim status refresh is enabled again
2294 self
.update_vm_refresh(ro_task
)
2295 # 0: get task_status_create
2297 task_status_create
= None
2301 for t
in ro_task
["tasks"]
2303 and t
["action"] == "CREATE"
2304 and t
["status"] in ("BUILD", "DONE")
2310 task_status_create
= task_create
["status"]
2312 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2313 for task_action
in ("DELETE", "CREATE", "EXEC"):
2314 db_vim_update
= None
2317 for task_index
, task
in enumerate(ro_task
["tasks"]):
2319 continue # task deleted
2322 target_update
= None
2326 task_action
in ("DELETE", "EXEC")
2327 and task
["status"] not in ("SCHEDULED", "BUILD")
2329 or task
["action"] != task_action
2331 task_action
== "CREATE"
2332 and task
["status"] in ("FINISHED", "SUPERSEDED")
2337 task_path
= "tasks.{}.status".format(task_index
)
2339 db_vim_info_update
= None
2341 if task
["status"] == "SCHEDULED":
2342 # check if tasks that this depends on have been completed
2343 dependency_not_completed
= False
2345 for dependency_task_id
in task
.get("depends_on") or ():
2348 dependency_task_index
,
2349 ) = self
._get
_dependency
(
2350 dependency_task_id
, target_id
=ro_task
["target_id"]
2352 dependency_task
= dependency_ro_task
["tasks"][
2353 dependency_task_index
2356 "dependency_ro_task={} dependency_task_index={}".format(
2357 dependency_ro_task
, dependency_task_index
2361 if dependency_task
["status"] == "SCHEDULED":
2362 dependency_not_completed
= True
2363 next_check_at
= min(
2364 next_check_at
, dependency_ro_task
["to_check_at"]
2366 # must allow dependent task to be processed first
2367 # to do this set time after last_task_processed
2368 next_check_at
= max(
2369 self
.time_last_task_processed
, next_check_at
2372 elif dependency_task
["status"] == "FAILED":
2373 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2376 dependency_task
["action"],
2377 dependency_task
["item"],
2379 dependency_ro_task
["vim_info"].get(
2384 "task={} {}".format(task
["task_id"], error_text
)
2386 raise NsWorkerException(error_text
)
2388 task_depends
[dependency_task_id
] = dependency_ro_task
[
2392 "TASK-{}".format(dependency_task_id
)
2393 ] = dependency_ro_task
["vim_info"]["vim_id"]
2395 if dependency_not_completed
:
2396 self
.logger
.warning(
2397 "DEPENDENCY NOT COMPLETED {}".format(
2398 dependency_ro_task
["vim_info"]["vim_id"]
2401 # TODO set at vim_info.vim_details that it is waiting
2404 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2405 # the task of renew this locking. It will update database locket_at periodically
2407 lock_object
= LockRenew
.add_lock_object(
2408 "ro_tasks", ro_task
, self
2410 if task
["action"] == "DELETE":
2414 ) = self
._delete
_task
(
2415 ro_task
, task_index
, task_depends
, db_ro_task_update
2418 "FINISHED" if new_status
== "DONE" else new_status
2420 # ^with FINISHED instead of DONE it will not be refreshing
2422 if new_status
in ("FINISHED", "SUPERSEDED"):
2423 target_update
= "DELETE"
2424 elif task
["action"] == "EXEC":
2429 ) = self
.item2class
[task
["item"]].exec(
2430 ro_task
, task_index
, task_depends
2433 "FINISHED" if new_status
== "DONE" else new_status
2435 # ^with FINISHED instead of DONE it will not be refreshing
2438 # load into database the modified db_task_update "retries" and "next_retry"
2439 if db_task_update
.get("retries"):
2441 "tasks.{}.retries".format(task_index
)
2442 ] = db_task_update
["retries"]
2444 next_check_at
= time
.time() + db_task_update
.get(
2447 target_update
= None
2448 elif task
["action"] == "CREATE":
2449 if task
["status"] == "SCHEDULED":
2450 if task_status_create
:
2451 new_status
= task_status_create
2452 target_update
= "COPY_VIM_INFO"
2454 new_status
, db_vim_info_update
= self
.item2class
[
2456 ].new(ro_task
, task_index
, task_depends
)
2457 _update_refresh(new_status
)
2459 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2460 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2464 ) = self
.item2class
[
2467 _update_refresh(new_status
)
2469 # The refresh is updated to avoid set the value of "refresh_at" to
2470 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2471 # because it can happen that in this case the task is never processed
2472 _update_refresh(task
["status"])
2474 except Exception as e
:
2475 new_status
= "FAILED"
2476 db_vim_info_update
= {
2477 "vim_status": "VIM_ERROR",
2478 "vim_message": str(e
),
2482 e
, (NsWorkerException
, vimconn
.VimConnException
)
2485 "Unexpected exception at _delete_task task={}: {}".format(
2492 if db_vim_info_update
:
2493 db_vim_update
= db_vim_info_update
.copy()
2494 db_ro_task_update
.update(
2497 for k
, v
in db_vim_info_update
.items()
2500 ro_task
["vim_info"].update(db_vim_info_update
)
2503 if task_action
== "CREATE":
2504 task_status_create
= new_status
2505 db_ro_task_update
[task_path
] = new_status
2507 if target_update
or db_vim_update
:
2508 if target_update
== "DELETE":
2509 self
._update
_target
(task
, None)
2510 elif target_update
== "COPY_VIM_INFO":
2511 self
._update
_target
(task
, ro_task
["vim_info"])
2513 self
._update
_target
(task
, db_vim_update
)
2515 except Exception as e
:
2517 isinstance(e
, DbException
)
2518 and e
.http_code
== HTTPStatus
.NOT_FOUND
2520 # if the vnfrs or nsrs has been removed from database, this task must be removed
2522 "marking to delete task={}".format(task
["task_id"])
2524 self
.tasks_to_delete
.append(task
)
2527 "Unexpected exception at _update_target task={}: {}".format(
2533 locked_at
= ro_task
["locked_at"]
2537 lock_object
["locked_at"],
2538 lock_object
["locked_at"] + self
.task_locked_time
,
2540 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2541 # contain exactly locked_at + self.task_locked_time
2542 LockRenew
.remove_lock_object(lock_object
)
2545 "_id": ro_task
["_id"],
2546 "to_check_at": ro_task
["to_check_at"],
2547 "locked_at": locked_at
,
2549 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2550 # outside this task (by ro_nbi) do not update it
2551 db_ro_task_update
["locked_by"] = None
2552 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2553 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2554 db_ro_task_update
["modified_at"] = now
2555 db_ro_task_update
["to_check_at"] = next_check_at
2558 # Log RO tasks only when loglevel is DEBUG
2559 if self.logger.getEffectiveLevel() == logging.DEBUG:
2560 db_ro_task_update_log = db_ro_task_update.copy()
2561 db_ro_task_update_log["_id"] = q_filter["_id"]
2562 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2565 if not self
.db
.set_one(
2567 update_dict
=db_ro_task_update
,
2569 fail_on_empty
=False,
2571 del db_ro_task_update
["to_check_at"]
2572 del q_filter
["to_check_at"]
2574 # Log RO tasks only when loglevel is DEBUG
2575 if self.logger.getEffectiveLevel() == logging.DEBUG:
2578 db_ro_task_update_log,
2581 "SET_TASK " + str(q_filter),
2587 update_dict
=db_ro_task_update
,
2590 except DbException
as e
:
2592 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2594 except Exception as e
:
2596 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2599 def _update_target(self
, task
, ro_vim_item_update
):
2600 table
, _
, temp
= task
["target_record"].partition(":")
2601 _id
, _
, path_vim_status
= temp
.partition(":")
2602 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2603 path_item
= path_item
[: path_item
.rfind(".")]
2604 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2605 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2607 if ro_vim_item_update
:
2609 path_vim_status
+ "." + k
: v
2610 for k
, v
in ro_vim_item_update
.items()
2619 "interfaces_backup",
2623 if path_vim_status
.startswith("vdur."):
2624 # for backward compatibility, add vdur.name apart from vdur.vim_name
2625 if ro_vim_item_update
.get("vim_name"):
2626 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2628 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2629 if ro_vim_item_update
.get("vim_id"):
2630 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2632 # update general status
2633 if ro_vim_item_update
.get("vim_status"):
2634 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2638 if ro_vim_item_update
.get("interfaces"):
2639 path_interfaces
= path_item
+ ".interfaces"
2641 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2645 path_interfaces
+ ".{}.".format(i
) + k
: v
2646 for k
, v
in iface
.items()
2647 if k
in ("vlan", "compute_node", "pci")
2651 # put ip_address and mac_address with ip-address and mac-address
2652 if iface
.get("ip_address"):
2654 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2655 ] = iface
["ip_address"]
2657 if iface
.get("mac_address"):
2659 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2660 ] = iface
["mac_address"]
2662 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2663 update_dict
["ip-address"] = iface
.get("ip_address").split(
2667 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2668 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2672 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2674 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2675 if ro_vim_item_update
.get("interfaces"):
2676 search_key
= path_vim_status
+ ".interfaces"
2677 if update_dict
.get(search_key
):
2678 interfaces_backup_update
= {
2679 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
2684 q_filter
={"_id": _id
},
2685 update_dict
=interfaces_backup_update
,
2689 update_dict
= {path_item
+ ".status": "DELETED"}
2692 q_filter
={"_id": _id
},
2693 update_dict
=update_dict
,
2694 unset
={path_vim_status
: None},
2697 def _process_delete_db_tasks(self
):
2699 Delete task from database because vnfrs or nsrs or both have been deleted
2700 :return: None. Uses and modify self.tasks_to_delete
2702 while self
.tasks_to_delete
:
2703 task
= self
.tasks_to_delete
[0]
2704 vnfrs_deleted
= None
2705 nsr_id
= task
["nsr_id"]
2707 if task
["target_record"].startswith("vnfrs:"):
2708 # check if nsrs is present
2709 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2710 vnfrs_deleted
= task
["target_record"].split(":")[1]
2713 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2714 except Exception as e
:
2716 "Error deleting task={}: {}".format(task
["task_id"], e
)
2718 self
.tasks_to_delete
.pop(0)
2721 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2723 Static method because it is called from osm_ng_ro.ns
2724 :param db: instance of database to use
2725 :param nsr_id: affected nsrs id
2726 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2727 :return: None, exception is fails
2730 for retry
in range(retries
):
2731 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2735 for ro_task
in ro_tasks
:
2737 to_delete_ro_task
= True
2739 for index
, task
in enumerate(ro_task
["tasks"]):
2742 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2744 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2746 db_update
["tasks.{}".format(index
)] = None
2748 # used by other nsr, ro_task cannot be deleted
2749 to_delete_ro_task
= False
2751 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2752 if to_delete_ro_task
:
2756 "_id": ro_task
["_id"],
2757 "modified_at": ro_task
["modified_at"],
2759 fail_on_empty
=False,
2763 db_update
["modified_at"] = now
2767 "_id": ro_task
["_id"],
2768 "modified_at": ro_task
["modified_at"],
2770 update_dict
=db_update
,
2771 fail_on_empty
=False,
2777 raise NsWorkerException("Exceeded {} retries".format(retries
))
2781 self
.logger
.info("Starting")
2783 # step 1: get commands from queue
2785 if self
.vim_targets
:
2786 task
= self
.task_queue
.get(block
=False)
2789 self
.logger
.debug("enters in idle state")
2791 task
= self
.task_queue
.get(block
=True)
2794 if task
[0] == "terminate":
2796 elif task
[0] == "load_vim":
2797 self
.logger
.info("order to load vim {}".format(task
[1]))
2798 self
._load
_vim
(task
[1])
2799 elif task
[0] == "unload_vim":
2800 self
.logger
.info("order to unload vim {}".format(task
[1]))
2801 self
._unload
_vim
(task
[1])
2802 elif task
[0] == "reload_vim":
2803 self
._reload
_vim
(task
[1])
2804 elif task
[0] == "check_vim":
2805 self
.logger
.info("order to check vim {}".format(task
[1]))
2806 self
._check
_vim
(task
[1])
2808 except Exception as e
:
2809 if isinstance(e
, queue
.Empty
):
2812 self
.logger
.critical(
2813 "Error processing task: {}".format(e
), exc_info
=True
2816 # step 2: process pending_tasks, delete not needed tasks
2818 if self
.tasks_to_delete
:
2819 self
._process
_delete
_db
_tasks
()
2822 # Log RO tasks only when loglevel is DEBUG
2823 if self.logger.getEffectiveLevel() == logging.DEBUG:
2824 _ = self._get_db_all_tasks()
2826 ro_task
= self
._get
_db
_task
()
2828 self
.logger
.debug("Task to process: {}".format(ro_task
))
2830 self
._process
_pending
_tasks
(ro_task
)
2834 except Exception as e
:
2835 self
.logger
.critical(
2836 "Unexpected exception at run: " + str(e
), exc_info
=True
2839 self
.logger
.info("Finishing")