1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
30 from os
import makedirs
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
43 from osm_ro_plugin
import vimconn
44 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
45 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
219 "task={} {} new-net={} created={}".format(
220 task_id
, ro_task
["target_id"], vim_net_id
, created
224 return "BUILD", ro_vim_item_update
225 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
227 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
229 ro_vim_item_update
= {
230 "vim_status": "VIM_ERROR",
232 "vim_message": str(e
),
235 return "FAILED", ro_vim_item_update
237 def refresh(self
, ro_task
):
238 """Call VIM to get network status"""
239 ro_task_id
= ro_task
["_id"]
240 target_vim
= self
.my_vims
[ro_task
["target_id"]]
241 vim_id
= ro_task
["vim_info"]["vim_id"]
242 net_to_refresh_list
= [vim_id
]
245 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
246 vim_info
= vim_dict
[vim_id
]
248 if vim_info
["status"] == "ACTIVE":
250 elif vim_info
["status"] == "BUILD":
251 task_status
= "BUILD"
253 task_status
= "FAILED"
254 except vimconn
.VimConnException
as e
:
255 # Mark all tasks at VIM_ERROR status
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id
, ro_task
["target_id"], vim_id
, e
261 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
262 task_status
= "FAILED"
264 ro_vim_item_update
= {}
265 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
266 ro_vim_item_update
["vim_status"] = vim_info
["status"]
268 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
269 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
271 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
273 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
274 elif vim_info
["status"] == "DELETED":
275 ro_vim_item_update
["vim_id"] = None
276 ro_vim_item_update
["vim_message"] = "Deleted externally"
278 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
279 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
281 if ro_vim_item_update
:
283 "ro_task={} {} get-net={}: status={} {}".format(
285 ro_task
["target_id"],
287 ro_vim_item_update
.get("vim_status"),
288 ro_vim_item_update
.get("vim_message")
289 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
294 return task_status
, ro_vim_item_update
296 def delete(self
, ro_task
, task_index
):
297 task
= ro_task
["tasks"][task_index
]
298 task_id
= task
["task_id"]
299 net_vim_id
= ro_task
["vim_info"]["vim_id"]
300 ro_vim_item_update_ok
= {
301 "vim_status": "DELETED",
303 "vim_message": "DELETED",
308 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
309 target_vim
= self
.my_vims
[ro_task
["target_id"]]
310 target_vim
.delete_network(
311 net_vim_id
, ro_task
["vim_info"]["created_items"]
313 except vimconn
.VimConnNotFoundException
:
314 ro_vim_item_update_ok
["vim_message"] = "already deleted"
315 except vimconn
.VimConnException
as e
:
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
321 ro_vim_item_update
= {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e
),
326 return "FAILED", ro_vim_item_update
329 "task={} {} del-net={} {}".format(
331 ro_task
["target_id"],
333 ro_vim_item_update_ok
.get("vim_message", ""),
337 return "DONE", ro_vim_item_update_ok
340 class VimInteractionVdu(VimInteractionBase
):
341 max_retries_inject_ssh_key
= 20 # 20 times
342 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
344 def new(self
, ro_task
, task_index
, task_depends
):
345 task
= ro_task
["tasks"][task_index
]
346 task_id
= task
["task_id"]
349 target_vim
= self
.my_vims
[ro_task
["target_id"]]
352 params
= task
["params"]
353 params_copy
= deepcopy(params
)
354 net_list
= params_copy
["net_list"]
357 # change task_id into network_id
358 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
359 network_id
= task_depends
[net
["net_id"]]
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net
["net_id"])
367 net
["net_id"] = network_id
369 if params_copy
["image_id"].startswith("TASK-"):
370 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
372 if params_copy
["flavor_id"].startswith("TASK-"):
373 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
375 affinity_group_list
= params_copy
["affinity_group_list"]
376 for affinity_group
in affinity_group_list
:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group
and affinity_group
[
380 ].startswith("TASK-"):
381 affinity_group_id
= task_depends
[
382 affinity_group
["affinity_group_id"]
385 if not affinity_group_id
:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group
["affinity_group_id"])
390 affinity_group
["affinity_group_id"] = affinity_group_id
391 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
392 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
394 # add to created items previous_created_volumes (healing)
395 if task
.get("previous_created_volumes"):
396 for k
, v
in task
["previous_created_volumes"].items():
399 ro_vim_item_update
= {
401 "vim_status": "BUILD",
403 "created_items": created_items
,
406 "interfaces_vim_ids": interfaces
,
408 "interfaces_backup": [],
411 "task={} {} new-vm={} created={}".format(
412 task_id
, ro_task
["target_id"], vim_vm_id
, created
416 return "BUILD", ro_vim_item_update
417 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
418 self
.logger
.debug(traceback
.format_exc())
420 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
422 ro_vim_item_update
= {
423 "vim_status": "VIM_ERROR",
425 "vim_message": str(e
),
428 return "FAILED", ro_vim_item_update
430 def delete(self
, ro_task
, task_index
):
431 task
= ro_task
["tasks"][task_index
]
432 task_id
= task
["task_id"]
433 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
434 ro_vim_item_update_ok
= {
435 "vim_status": "DELETED",
437 "vim_message": "DELETED",
443 "delete_vminstance: vm_vim_id={} created_items={}".format(
444 vm_vim_id
, ro_task
["vim_info"]["created_items"]
447 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
448 target_vim
= self
.my_vims
[ro_task
["target_id"]]
449 target_vim
.delete_vminstance(
451 ro_task
["vim_info"]["created_items"],
452 ro_task
["vim_info"].get("volumes_to_hold", []),
454 except vimconn
.VimConnNotFoundException
:
455 ro_vim_item_update_ok
["vim_message"] = "already deleted"
456 except vimconn
.VimConnException
as e
:
458 "ro_task={} vim={} del-vm={}: {}".format(
459 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
462 ro_vim_item_update
= {
463 "vim_status": "VIM_ERROR",
464 "vim_message": "Error while deleting: {}".format(e
),
467 return "FAILED", ro_vim_item_update
470 "task={} {} del-vm={} {}".format(
472 ro_task
["target_id"],
474 ro_vim_item_update_ok
.get("vim_message", ""),
478 return "DONE", ro_vim_item_update_ok
480 def refresh(self
, ro_task
):
481 """Call VIM to get vm status"""
482 ro_task_id
= ro_task
["_id"]
483 target_vim
= self
.my_vims
[ro_task
["target_id"]]
484 vim_id
= ro_task
["vim_info"]["vim_id"]
489 vm_to_refresh_list
= [vim_id
]
491 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
492 vim_info
= vim_dict
[vim_id
]
494 if vim_info
["status"] == "ACTIVE":
496 elif vim_info
["status"] == "BUILD":
497 task_status
= "BUILD"
499 task_status
= "FAILED"
501 # try to load and parse vim_information
503 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
504 if vim_info_info
.get("name"):
505 vim_info
["name"] = vim_info_info
["name"]
506 except Exception as vim_info_error
:
507 self
.logger
.exception(
508 f
"{vim_info_error} occured while getting the vim_info from yaml"
510 except vimconn
.VimConnException
as e
:
511 # Mark all tasks at VIM_ERROR status
513 "ro_task={} vim={} get-vm={}: {}".format(
514 ro_task_id
, ro_task
["target_id"], vim_id
, e
517 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
518 task_status
= "FAILED"
520 ro_vim_item_update
= {}
522 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
524 if vim_info
.get("interfaces"):
525 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
529 for iface
in vim_info
["interfaces"]
530 if vim_iface_id
== iface
["vim_interface_id"]
535 # iface.pop("vim_info", None)
536 vim_interfaces
.append(iface
)
540 for t
in ro_task
["tasks"]
541 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
543 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
544 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
548 mgmt_vdu_iface
= task_create
.get(
549 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
552 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
554 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
555 ro_vim_item_update
["interfaces"] = vim_interfaces
557 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
558 ro_vim_item_update
["vim_status"] = vim_info
["status"]
560 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
561 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
563 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
564 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
565 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
566 elif vim_info
["status"] == "DELETED":
567 ro_vim_item_update
["vim_id"] = None
568 ro_vim_item_update
["vim_message"] = "Deleted externally"
570 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
571 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
573 if ro_vim_item_update
:
575 "ro_task={} {} get-vm={}: status={} {}".format(
577 ro_task
["target_id"],
579 ro_vim_item_update
.get("vim_status"),
580 ro_vim_item_update
.get("vim_message")
581 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
586 return task_status
, ro_vim_item_update
588 def exec(self
, ro_task
, task_index
, task_depends
):
589 task
= ro_task
["tasks"][task_index
]
590 task_id
= task
["task_id"]
591 target_vim
= self
.my_vims
[ro_task
["target_id"]]
592 db_task_update
= {"retries": 0}
593 retries
= task
.get("retries", 0)
596 params
= task
["params"]
597 params_copy
= deepcopy(params
)
598 params_copy
["ro_key"] = self
.db
.decrypt(
599 params_copy
.pop("private_key"),
600 params_copy
.pop("schema_version"),
601 params_copy
.pop("salt"),
603 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
604 target_vim
.inject_user_key(**params_copy
)
606 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
613 ) # params_copy["key"]
614 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
617 self
.logger
.debug(traceback
.format_exc())
618 if retries
< self
.max_retries_inject_ssh_key
:
624 "next_retry": self
.time_retries_inject_ssh_key
,
629 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
631 ro_vim_item_update
= {"vim_message": str(e
)}
633 return "FAILED", ro_vim_item_update
, db_task_update
636 class VimInteractionImage(VimInteractionBase
):
637 def new(self
, ro_task
, task_index
, task_depends
):
638 task
= ro_task
["tasks"][task_index
]
639 task_id
= task
["task_id"]
642 target_vim
= self
.my_vims
[ro_task
["target_id"]]
646 if task
.get("find_params"):
647 vim_images
= target_vim
.get_image_list(**task
["find_params"])
650 raise NsWorkerExceptionNotFound(
651 "Image not found with this criteria: '{}'".format(
655 elif len(vim_images
) > 1:
656 raise NsWorkerException(
657 "More than one image found with this criteria: '{}'".format(
662 vim_image_id
= vim_images
[0]["id"]
664 ro_vim_item_update
= {
665 "vim_id": vim_image_id
,
666 "vim_status": "DONE",
668 "created_items": created_items
,
673 "task={} {} new-image={} created={}".format(
674 task_id
, ro_task
["target_id"], vim_image_id
, created
678 return "DONE", ro_vim_item_update
679 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
681 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
683 ro_vim_item_update
= {
684 "vim_status": "VIM_ERROR",
686 "vim_message": str(e
),
689 return "FAILED", ro_vim_item_update
692 class VimInteractionSharedVolume(VimInteractionBase
):
693 def delete(self
, ro_task
, task_index
):
694 task
= ro_task
["tasks"][task_index
]
695 task_id
= task
["task_id"]
696 shared_volume_vim_id
= ro_task
["vim_info"]["vim_id"]
697 ro_vim_item_update_ok
= {
698 "vim_status": "DELETED",
700 "vim_message": "DELETED",
704 if shared_volume_vim_id
:
705 target_vim
= self
.my_vims
[ro_task
["target_id"]]
706 target_vim
.delete_shared_volumes(shared_volume_vim_id
)
707 except vimconn
.VimConnNotFoundException
:
708 ro_vim_item_update_ok
["vim_message"] = "already deleted"
709 except vimconn
.VimConnException
as e
:
711 "ro_task={} vim={} del-shared-volume={}: {}".format(
712 ro_task
["_id"], ro_task
["target_id"], shared_volume_vim_id
, e
715 ro_vim_item_update
= {
716 "vim_status": "VIM_ERROR",
717 "vim_message": "Error while deleting: {}".format(e
),
720 return "FAILED", ro_vim_item_update
723 "task={} {} del-shared-volume={} {}".format(
725 ro_task
["target_id"],
726 shared_volume_vim_id
,
727 ro_vim_item_update_ok
.get("vim_message", ""),
731 return "DONE", ro_vim_item_update_ok
733 def new(self
, ro_task
, task_index
, task_depends
):
734 task
= ro_task
["tasks"][task_index
]
735 task_id
= task
["task_id"]
738 target_vim
= self
.my_vims
[ro_task
["target_id"]]
741 shared_volume_name
= None
742 shared_volume_vim_id
= None
743 shared_volume_data
= None
745 if task
.get("params"):
746 shared_volume_data
= task
["params"]
748 if shared_volume_data
:
750 f
"Creating the new shared_volume for {shared_volume_data}\n"
754 shared_volume_vim_id
,
755 ) = target_vim
.new_shared_volumes(shared_volume_data
)
757 created_items
[shared_volume_vim_id
] = shared_volume_name
759 ro_vim_item_update
= {
760 "vim_id": shared_volume_vim_id
,
761 "vim_status": "DONE",
763 "created_items": created_items
,
768 "task={} {} new-shared-volume={} created={}".format(
769 task_id
, ro_task
["target_id"], shared_volume_vim_id
, created
773 return "DONE", ro_vim_item_update
774 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
776 "task={} vim={} new-shared-volume:"
777 " {}".format(task_id
, ro_task
["target_id"], e
)
779 ro_vim_item_update
= {
780 "vim_status": "VIM_ERROR",
782 "vim_message": str(e
),
785 return "FAILED", ro_vim_item_update
788 class VimInteractionFlavor(VimInteractionBase
):
789 def delete(self
, ro_task
, task_index
):
790 task
= ro_task
["tasks"][task_index
]
791 task_id
= task
["task_id"]
792 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
793 ro_vim_item_update_ok
= {
794 "vim_status": "DELETED",
796 "vim_message": "DELETED",
802 target_vim
= self
.my_vims
[ro_task
["target_id"]]
803 target_vim
.delete_flavor(flavor_vim_id
)
804 except vimconn
.VimConnNotFoundException
:
805 ro_vim_item_update_ok
["vim_message"] = "already deleted"
806 except vimconn
.VimConnException
as e
:
808 "ro_task={} vim={} del-flavor={}: {}".format(
809 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
812 ro_vim_item_update
= {
813 "vim_status": "VIM_ERROR",
814 "vim_message": "Error while deleting: {}".format(e
),
817 return "FAILED", ro_vim_item_update
820 "task={} {} del-flavor={} {}".format(
822 ro_task
["target_id"],
824 ro_vim_item_update_ok
.get("vim_message", ""),
828 return "DONE", ro_vim_item_update_ok
830 def new(self
, ro_task
, task_index
, task_depends
):
831 task
= ro_task
["tasks"][task_index
]
832 task_id
= task
["task_id"]
835 target_vim
= self
.my_vims
[ro_task
["target_id"]]
840 if task
.get("find_params"):
842 flavor_data
= task
["find_params"]["flavor_data"]
843 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
844 except vimconn
.VimConnNotFoundException
as flavor_not_found_msg
:
846 f
"VimConnNotFoundException occured: {flavor_not_found_msg}"
849 if not vim_flavor_id
and task
.get("params"):
851 flavor_data
= task
["params"]["flavor_data"]
852 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
855 ro_vim_item_update
= {
856 "vim_id": vim_flavor_id
,
857 "vim_status": "DONE",
859 "created_items": created_items
,
864 "task={} {} new-flavor={} created={}".format(
865 task_id
, ro_task
["target_id"], vim_flavor_id
, created
869 return "DONE", ro_vim_item_update
870 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
872 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
874 ro_vim_item_update
= {
875 "vim_status": "VIM_ERROR",
877 "vim_message": str(e
),
880 return "FAILED", ro_vim_item_update
883 class VimInteractionAffinityGroup(VimInteractionBase
):
884 def delete(self
, ro_task
, task_index
):
885 task
= ro_task
["tasks"][task_index
]
886 task_id
= task
["task_id"]
887 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
888 ro_vim_item_update_ok
= {
889 "vim_status": "DELETED",
891 "vim_message": "DELETED",
896 if affinity_group_vim_id
:
897 target_vim
= self
.my_vims
[ro_task
["target_id"]]
898 target_vim
.delete_affinity_group(affinity_group_vim_id
)
899 except vimconn
.VimConnNotFoundException
:
900 ro_vim_item_update_ok
["vim_message"] = "already deleted"
901 except vimconn
.VimConnException
as e
:
903 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
904 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
907 ro_vim_item_update
= {
908 "vim_status": "VIM_ERROR",
909 "vim_message": "Error while deleting: {}".format(e
),
912 return "FAILED", ro_vim_item_update
915 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
917 ro_task
["target_id"],
918 affinity_group_vim_id
,
919 ro_vim_item_update_ok
.get("vim_message", ""),
923 return "DONE", ro_vim_item_update_ok
925 def new(self
, ro_task
, task_index
, task_depends
):
926 task
= ro_task
["tasks"][task_index
]
927 task_id
= task
["task_id"]
930 target_vim
= self
.my_vims
[ro_task
["target_id"]]
933 affinity_group_vim_id
= None
934 affinity_group_data
= None
936 if task
.get("params"):
937 affinity_group_data
= task
["params"].get("affinity_group_data")
939 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
941 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
942 "vim-affinity-group-id"
944 affinity_group_vim_id
= target_vim
.get_affinity_group(
945 param_affinity_group_id
947 except vimconn
.VimConnNotFoundException
:
949 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
950 "could not be found at VIM. Creating a new one.".format(
951 task_id
, ro_task
["target_id"], param_affinity_group_id
955 if not affinity_group_vim_id
and affinity_group_data
:
956 affinity_group_vim_id
= target_vim
.new_affinity_group(
961 ro_vim_item_update
= {
962 "vim_id": affinity_group_vim_id
,
963 "vim_status": "DONE",
965 "created_items": created_items
,
970 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
971 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
975 return "DONE", ro_vim_item_update
976 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
978 "task={} vim={} new-affinity-or-anti-affinity-group:"
979 " {}".format(task_id
, ro_task
["target_id"], e
)
981 ro_vim_item_update
= {
982 "vim_status": "VIM_ERROR",
984 "vim_message": str(e
),
987 return "FAILED", ro_vim_item_update
990 class VimInteractionUpdateVdu(VimInteractionBase
):
991 def exec(self
, ro_task
, task_index
, task_depends
):
992 task
= ro_task
["tasks"][task_index
]
993 task_id
= task
["task_id"]
994 db_task_update
= {"retries": 0}
997 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1000 if task
.get("params"):
1001 vim_vm_id
= task
["params"].get("vim_vm_id")
1002 action
= task
["params"].get("action")
1003 context
= {action
: action
}
1004 target_vim
.action_vminstance(vim_vm_id
, context
)
1006 ro_vim_item_update
= {
1007 "vim_id": vim_vm_id
,
1008 "vim_status": "DONE",
1010 "created_items": created_items
,
1011 "vim_details": None,
1012 "vim_message": None,
1015 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1017 return "DONE", ro_vim_item_update
, db_task_update
1018 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1020 "task={} vim={} VM Migration:"
1021 " {}".format(task_id
, ro_task
["target_id"], e
)
1023 ro_vim_item_update
= {
1024 "vim_status": "VIM_ERROR",
1026 "vim_message": str(e
),
1029 return "FAILED", ro_vim_item_update
, db_task_update
1032 class VimInteractionSdnNet(VimInteractionBase
):
1034 def _match_pci(port_pci
, mapping
):
1036 Check if port_pci matches with mapping
1037 mapping can have brackets to indicate that several chars are accepted. e.g
1038 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1039 :param port_pci: text
1040 :param mapping: text, can contain brackets to indicate several chars are available
1041 :return: True if matches, False otherwise
1043 if not port_pci
or not mapping
:
1045 if port_pci
== mapping
:
1051 bracket_start
= mapping
.find("[", mapping_index
)
1053 if bracket_start
== -1:
1056 bracket_end
= mapping
.find("]", bracket_start
)
1057 if bracket_end
== -1:
1060 length
= bracket_start
- mapping_index
1063 and port_pci
[pci_index
: pci_index
+ length
]
1064 != mapping
[mapping_index
:bracket_start
]
1069 port_pci
[pci_index
+ length
]
1070 not in mapping
[bracket_start
+ 1 : bracket_end
]
1074 pci_index
+= length
+ 1
1075 mapping_index
= bracket_end
+ 1
1077 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
1082 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
1084 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1085 :param vim_account_id:
1090 for vld
in vlds_to_connect
:
1091 table
, _
, db_id
= vld
.partition(":")
1092 db_id
, _
, vld
= db_id
.partition(":")
1093 _
, _
, vld_id
= vld
.partition(".")
1095 if table
== "vnfrs":
1096 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
1097 iface_key
= "vnf-vld-id"
1098 else: # table == "nsrs"
1099 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1100 iface_key
= "ns-vld-id"
1102 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1104 for db_vnfr
in db_vnfrs
:
1105 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1106 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1107 if interface
.get(iface_key
) == vld_id
and interface
.get(
1109 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1111 interface_
= interface
.copy()
1112 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1113 db_vnfr
["_id"], vdu_index
, iface_index
1116 if vdur
.get("status") == "ERROR":
1117 interface_
["status"] = "ERROR"
1119 interfaces
.append(interface_
)
1123 def refresh(self
, ro_task
):
1124 # look for task create
1125 task_create_index
, _
= next(
1127 for i_t
in enumerate(ro_task
["tasks"])
1129 and i_t
[1]["action"] == "CREATE"
1130 and i_t
[1]["status"] != "FINISHED"
1133 return self
.new(ro_task
, task_create_index
, None)
1135 def new(self
, ro_task
, task_index
, task_depends
):
1136 task
= ro_task
["tasks"][task_index
]
1137 task_id
= task
["task_id"]
1138 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1140 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1142 created_items
= ro_task
["vim_info"].get("created_items")
1143 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1144 new_connected_ports
= []
1145 last_update
= ro_task
["vim_info"].get("last_update", 0)
1146 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1148 created
= ro_task
["vim_info"].get("created", False)
1152 params
= task
["params"]
1153 vlds_to_connect
= params
.get("vlds", [])
1154 associated_vim
= params
.get("target_vim")
1155 # external additional ports
1156 additional_ports
= params
.get("sdn-ports") or ()
1157 _
, _
, vim_account_id
= (
1159 if associated_vim
is None
1160 else associated_vim
.partition(":")
1164 # get associated VIM
1165 if associated_vim
not in self
.db_vims
:
1166 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1167 "vim_accounts", {"_id": vim_account_id
}
1170 db_vim
= self
.db_vims
[associated_vim
]
1172 # look for ports to connect
1173 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1177 pending_ports
= error_ports
= 0
1179 sdn_need_update
= False
1182 vlan_used
= port
.get("vlan") or vlan_used
1184 # TODO. Do not connect if already done
1185 if not port
.get("compute_node") or not port
.get("pci"):
1186 if port
.get("status") == "ERROR":
1193 compute_node_mappings
= next(
1196 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1197 if c
and c
["compute_node"] == port
["compute_node"]
1202 if compute_node_mappings
:
1203 # process port_mapping pci of type 0000:af:1[01].[1357]
1207 for p
in compute_node_mappings
["ports"]
1208 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1214 if not db_vim
["config"].get("mapping_not_needed"):
1216 "Port mapping not found for compute_node={} pci={}".format(
1217 port
["compute_node"], port
["pci"]
1224 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1226 "service_endpoint_id": pmap
.get("service_endpoint_id")
1227 or service_endpoint_id
,
1228 "service_endpoint_encapsulation_type": "dot1q"
1229 if port
["type"] == "SR-IOV"
1231 "service_endpoint_encapsulation_info": {
1232 "vlan": port
.get("vlan"),
1233 "mac": port
.get("mac-address"),
1234 "device_id": pmap
.get("device_id") or port
["compute_node"],
1235 "device_interface_id": pmap
.get("device_interface_id")
1237 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1238 "switch_port": pmap
.get("switch_port"),
1239 "service_mapping_info": pmap
.get("service_mapping_info"),
1244 # if port["modified_at"] > last_update:
1245 # sdn_need_update = True
1246 new_connected_ports
.append(port
["id"]) # TODO
1247 sdn_ports
.append(new_port
)
1251 "{} interfaces have not been created as VDU is on ERROR status".format(
1256 # connect external ports
1257 for index
, additional_port
in enumerate(additional_ports
):
1258 additional_port_id
= additional_port
.get(
1259 "service_endpoint_id"
1260 ) or "external-{}".format(index
)
1263 "service_endpoint_id": additional_port_id
,
1264 "service_endpoint_encapsulation_type": additional_port
.get(
1265 "service_endpoint_encapsulation_type", "dot1q"
1267 "service_endpoint_encapsulation_info": {
1268 "vlan": additional_port
.get("vlan") or vlan_used
,
1269 "mac": additional_port
.get("mac_address"),
1270 "device_id": additional_port
.get("device_id"),
1271 "device_interface_id": additional_port
.get(
1272 "device_interface_id"
1274 "switch_dpid": additional_port
.get("switch_dpid")
1275 or additional_port
.get("switch_id"),
1276 "switch_port": additional_port
.get("switch_port"),
1277 "service_mapping_info": additional_port
.get(
1278 "service_mapping_info"
1283 new_connected_ports
.append(additional_port_id
)
1286 # if there are more ports to connect or they have been modified, call create/update
1288 sdn_status
= "ERROR"
1289 sdn_info
= "; ".join(error_list
)
1290 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1291 last_update
= time
.time()
1294 if len(sdn_ports
) < 2:
1295 sdn_status
= "ACTIVE"
1297 if not pending_ports
:
1299 "task={} {} new-sdn-net done, less than 2 ports".format(
1300 task_id
, ro_task
["target_id"]
1304 net_type
= params
.get("type") or "ELAN"
1308 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1311 "task={} {} new-sdn-net={} created={}".format(
1312 task_id
, ro_task
["target_id"], sdn_net_id
, created
1316 created_items
= target_vim
.edit_connectivity_service(
1317 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1321 "task={} {} update-sdn-net={} created={}".format(
1322 task_id
, ro_task
["target_id"], sdn_net_id
, created
1326 connected_ports
= new_connected_ports
1328 wim_status_dict
= target_vim
.get_connectivity_service_status(
1329 sdn_net_id
, conn_info
=created_items
1331 sdn_status
= wim_status_dict
["sdn_status"]
1333 if wim_status_dict
.get("sdn_info"):
1334 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1336 if wim_status_dict
.get("error_msg"):
1337 sdn_info
= wim_status_dict
.get("error_msg") or ""
1340 if sdn_status
!= "ERROR":
1341 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1342 len(ports
) - pending_ports
, len(ports
)
1345 if sdn_status
== "ACTIVE":
1346 sdn_status
= "BUILD"
1348 ro_vim_item_update
= {
1349 "vim_id": sdn_net_id
,
1350 "vim_status": sdn_status
,
1352 "created_items": created_items
,
1353 "connected_ports": connected_ports
,
1354 "vim_details": sdn_info
,
1355 "vim_message": None,
1356 "last_update": last_update
,
1359 return sdn_status
, ro_vim_item_update
1360 except Exception as e
:
1362 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1363 exc_info
=not isinstance(
1364 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1367 ro_vim_item_update
= {
1368 "vim_status": "VIM_ERROR",
1370 "vim_message": str(e
),
1373 return "FAILED", ro_vim_item_update
1375 def delete(self
, ro_task
, task_index
):
1376 task
= ro_task
["tasks"][task_index
]
1377 task_id
= task
["task_id"]
1378 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1379 ro_vim_item_update_ok
= {
1380 "vim_status": "DELETED",
1382 "vim_message": "DELETED",
1388 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1389 target_vim
.delete_connectivity_service(
1390 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1393 except Exception as e
:
1395 isinstance(e
, sdnconn
.SdnConnectorError
)
1396 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1398 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1401 "ro_task={} vim={} del-sdn-net={}: {}".format(
1402 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1404 exc_info
=not isinstance(
1405 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1408 ro_vim_item_update
= {
1409 "vim_status": "VIM_ERROR",
1410 "vim_message": "Error while deleting: {}".format(e
),
1413 return "FAILED", ro_vim_item_update
1416 "task={} {} del-sdn-net={} {}".format(
1418 ro_task
["target_id"],
1420 ro_vim_item_update_ok
.get("vim_message", ""),
1424 return "DONE", ro_vim_item_update_ok
1427 class VimInteractionMigration(VimInteractionBase
):
1428 def exec(self
, ro_task
, task_index
, task_depends
):
1429 task
= ro_task
["tasks"][task_index
]
1430 task_id
= task
["task_id"]
1431 db_task_update
= {"retries": 0}
1432 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1436 refreshed_vim_info
= {}
1439 if task
.get("params"):
1440 vim_vm_id
= task
["params"].get("vim_vm_id")
1441 migrate_host
= task
["params"].get("migrate_host")
1442 _
, migrated_compute_node
= target_vim
.migrate_instance(
1443 vim_vm_id
, migrate_host
1446 if migrated_compute_node
:
1447 # When VM is migrated, vdu["vim_info"] needs to be updated
1448 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1449 ro_task
["target_id"]
1452 # Refresh VM to get new vim_info
1453 vm_to_refresh_list
= [vim_vm_id
]
1454 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1455 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1457 if refreshed_vim_info
.get("interfaces"):
1458 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1462 for iface
in refreshed_vim_info
["interfaces"]
1463 if old_iface
["vim_interface_id"]
1464 == iface
["vim_interface_id"]
1468 vim_interfaces
.append(iface
)
1470 ro_vim_item_update
= {
1471 "vim_id": vim_vm_id
,
1472 "vim_status": "ACTIVE",
1474 "created_items": created_items
,
1475 "vim_details": None,
1476 "vim_message": None,
1479 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1483 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1486 ro_vim_item_update
["interfaces"] = vim_interfaces
1489 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1492 return "DONE", ro_vim_item_update
, db_task_update
1494 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1496 "task={} vim={} VM Migration:"
1497 " {}".format(task_id
, ro_task
["target_id"], e
)
1499 ro_vim_item_update
= {
1500 "vim_status": "VIM_ERROR",
1502 "vim_message": str(e
),
1505 return "FAILED", ro_vim_item_update
, db_task_update
1508 class VimInteractionResize(VimInteractionBase
):
1509 def exec(self
, ro_task
, task_index
, task_depends
):
1510 task
= ro_task
["tasks"][task_index
]
1511 task_id
= task
["task_id"]
1512 db_task_update
= {"retries": 0}
1514 target_flavor_uuid
= None
1516 refreshed_vim_info
= {}
1517 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1520 if task
.get("params"):
1521 vim_vm_id
= task
["params"].get("vim_vm_id")
1522 flavor_dict
= task
["params"].get("flavor_dict")
1523 self
.logger
.info("flavor_dict %s", flavor_dict
)
1526 target_flavor_uuid
= target_vim
.get_flavor_id_from_data(flavor_dict
)
1527 except Exception as e
:
1528 self
.logger
.info("Cannot find any flavor matching %s.", str(e
))
1530 target_flavor_uuid
= target_vim
.new_flavor(flavor_dict
)
1531 except Exception as e
:
1532 self
.logger
.error("Error creating flavor at VIM %s.", str(e
))
1534 if target_flavor_uuid
is not None:
1535 resized_status
= target_vim
.resize_instance(
1536 vim_vm_id
, target_flavor_uuid
1540 # Refresh VM to get new vim_info
1541 vm_to_refresh_list
= [vim_vm_id
]
1542 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1543 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1545 ro_vim_item_update
= {
1546 "vim_id": vim_vm_id
,
1547 "vim_status": "DONE",
1549 "created_items": created_items
,
1550 "vim_details": None,
1551 "vim_message": None,
1554 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1558 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1561 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1563 return "DONE", ro_vim_item_update
, db_task_update
1564 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1566 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1568 ro_vim_item_update
= {
1569 "vim_status": "VIM_ERROR",
1571 "vim_message": str(e
),
1574 return "FAILED", ro_vim_item_update
, db_task_update
1577 class ConfigValidate
:
1578 def __init__(self
, config
: Dict
):
1583 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1585 self
.conf
["period"]["refresh_active"] >= 60
1586 or self
.conf
["period"]["refresh_active"] == -1
1588 return self
.conf
["period"]["refresh_active"]
1594 return self
.conf
["period"]["refresh_build"]
1598 return self
.conf
["period"]["refresh_image"]
1602 return self
.conf
["period"]["refresh_error"]
1605 def queue_size(self
):
1606 return self
.conf
["period"]["queue_size"]
1609 class NsWorker(threading
.Thread
):
1610 def __init__(self
, worker_index
, config
, plugins
, db
):
1612 :param worker_index: thread index
1613 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1614 :param plugins: global shared dict with the loaded plugins
1615 :param db: database class instance to use
1617 threading
.Thread
.__init
__(self
)
1618 self
.config
= config
1619 self
.plugins
= plugins
1620 self
.plugin_name
= "unknown"
1621 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1622 self
.worker_index
= worker_index
1623 # refresh periods for created items
1624 self
.refresh_config
= ConfigValidate(config
)
1625 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1626 # targetvim: vimplugin class
1628 # targetvim: vim information from database
1631 self
.vim_targets
= []
1632 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1635 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1636 "shared-volumes": VimInteractionSharedVolume(
1637 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1639 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1640 "image": VimInteractionImage(
1641 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1643 "flavor": VimInteractionFlavor(
1644 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1646 "sdn_net": VimInteractionSdnNet(
1647 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1649 "update": VimInteractionUpdateVdu(
1650 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1652 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1653 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1655 "migrate": VimInteractionMigration(
1656 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1658 "verticalscale": VimInteractionResize(
1659 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1662 self
.time_last_task_processed
= None
1663 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1664 self
.tasks_to_delete
= []
1665 # it is idle when there are not vim_targets associated
1667 self
.task_locked_time
= config
["global"]["task_locked_time"]
1669 def insert_task(self
, task
):
1671 self
.task_queue
.put(task
, False)
1674 raise NsWorkerException("timeout inserting a task")
1676 def terminate(self
):
1677 self
.insert_task("exit")
1679 def del_task(self
, task
):
1680 with self
.task_lock
:
1681 if task
["status"] == "SCHEDULED":
1682 task
["status"] = "SUPERSEDED"
1684 else: # task["status"] == "processing"
1685 self
.task_lock
.release()
1688 def _process_vim_config(self
, target_id
: str, db_vim
: dict) -> None:
1690 Process vim config, creating vim configuration files as ca_cert
1691 :param target_id: vim/sdn/wim + id
1692 :param db_vim: Vim dictionary obtained from database
1693 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1695 if not db_vim
.get("config"):
1699 work_dir
= "/app/osm_ro/certs"
1702 if db_vim
["config"].get("ca_cert_content"):
1703 file_name
= f
"{work_dir}/{target_id}:{self.worker_index}"
1705 if not path
.isdir(file_name
):
1708 file_name
= file_name
+ "/ca_cert"
1710 with
open(file_name
, "w") as f
:
1711 f
.write(db_vim
["config"]["ca_cert_content"])
1712 del db_vim
["config"]["ca_cert_content"]
1713 db_vim
["config"]["ca_cert"] = file_name
1714 except Exception as e
:
1715 raise NsWorkerException(
1716 "Error writing to file '{}': {}".format(file_name
, e
)
1719 def _load_plugin(self
, name
, type="vim"):
1720 # type can be vim or sdn
1721 if "rovim_dummy" not in self
.plugins
:
1722 self
.plugins
["rovim_dummy"] = VimDummyConnector
1724 if "rosdn_dummy" not in self
.plugins
:
1725 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1727 if name
in self
.plugins
:
1728 return self
.plugins
[name
]
1731 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1732 self
.plugins
[name
] = ep
.load()
1733 except Exception as e
:
1734 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1736 if name
and name
not in self
.plugins
:
1737 raise NsWorkerException(
1738 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1741 return self
.plugins
[name
]
1743 def _unload_vim(self
, target_id
):
1745 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1746 :param target_id: Contains type:_id; where type can be 'vim', ...
1750 self
.db_vims
.pop(target_id
, None)
1751 self
.my_vims
.pop(target_id
, None)
1753 if target_id
in self
.vim_targets
:
1754 self
.vim_targets
.remove(target_id
)
1756 self
.logger
.info("Unloaded {}".format(target_id
))
1757 except Exception as e
:
1758 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1760 def _check_vim(self
, target_id
):
1762 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1763 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1766 target
, _
, _id
= target_id
.partition(":")
1772 loaded
= target_id
in self
.vim_targets
1782 step
= "Getting {} from db".format(target_id
)
1783 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1785 for op_index
, operation
in enumerate(
1786 db_vim
["_admin"].get("operations", ())
1788 if operation
["operationState"] != "PROCESSING":
1791 locked_at
= operation
.get("locked_at")
1793 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1794 # some other thread is doing this operation
1798 op_text
= "_admin.operations.{}.".format(op_index
)
1800 if not self
.db
.set_one(
1804 op_text
+ "operationState": "PROCESSING",
1805 op_text
+ "locked_at": locked_at
,
1808 op_text
+ "locked_at": now
,
1809 "admin.current_operation": op_index
,
1811 fail_on_empty
=False,
1815 unset_dict
[op_text
+ "locked_at"] = None
1816 unset_dict
["current_operation"] = None
1817 step
= "Loading " + target_id
1818 error_text
= self
._load
_vim
(target_id
)
1821 step
= "Checking connectivity"
1824 self
.my_vims
[target_id
].check_vim_connectivity()
1826 self
.my_vims
[target_id
].check_credentials()
1828 update_dict
["_admin.operationalState"] = "ENABLED"
1829 update_dict
["_admin.detailed-status"] = ""
1830 unset_dict
[op_text
+ "detailed-status"] = None
1831 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1835 except Exception as e
:
1836 error_text
= "{}: {}".format(step
, e
)
1837 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1840 if update_dict
or unset_dict
:
1842 update_dict
[op_text
+ "operationState"] = "FAILED"
1843 update_dict
[op_text
+ "detailed-status"] = error_text
1844 unset_dict
.pop(op_text
+ "detailed-status", None)
1845 update_dict
["_admin.operationalState"] = "ERROR"
1846 update_dict
["_admin.detailed-status"] = error_text
1849 update_dict
[op_text
+ "statusEnteredTime"] = now
1853 q_filter
={"_id": _id
},
1854 update_dict
=update_dict
,
1856 fail_on_empty
=False,
1860 self
._unload
_vim
(target_id
)
1862 def _reload_vim(self
, target_id
):
1863 if target_id
in self
.vim_targets
:
1864 self
._load
_vim
(target_id
)
1866 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1867 # just remove it to force load again next time it is needed
1868 self
.db_vims
.pop(target_id
, None)
1870 def _load_vim(self
, target_id
):
1872 Load or reload a vim_account, sdn_controller or wim_account.
1873 Read content from database, load the plugin if not loaded.
1874 In case of error loading the plugin, it load a failing VIM_connector
1875 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1876 :param target_id: Contains type:_id; where type can be 'vim', ...
1877 :return: None if ok, descriptive text if error
1879 target
, _
, _id
= target_id
.partition(":")
1889 step
= "Getting {}={} from db".format(target
, _id
)
1892 # TODO process for wim, sdnc, ...
1893 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1895 # if deep_get(vim, "config", "sdn-controller"):
1896 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1897 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1899 step
= "Decrypting password"
1900 schema_version
= vim
.get("schema_version")
1901 self
.db
.encrypt_decrypt_fields(
1904 fields
=("password", "secret"),
1905 schema_version
=schema_version
,
1908 self
._process
_vim
_config
(target_id
, vim
)
1911 plugin_name
= "rovim_" + vim
["vim_type"]
1912 step
= "Loading plugin '{}'".format(plugin_name
)
1913 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1914 step
= "Loading {}'".format(target_id
)
1915 self
.my_vims
[target_id
] = vim_module_conn(
1918 tenant_id
=vim
.get("vim_tenant_id"),
1919 tenant_name
=vim
.get("vim_tenant_name"),
1922 user
=vim
["vim_user"],
1923 passwd
=vim
["vim_password"],
1924 config
=vim
.get("config") or {},
1928 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
1929 step
= "Loading plugin '{}'".format(plugin_name
)
1930 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1931 step
= "Loading {}'".format(target_id
)
1933 wim_config
= wim
.pop("config", {}) or {}
1934 wim
["uuid"] = wim
["_id"]
1935 if "url" in wim
and "wim_url" not in wim
:
1936 wim
["wim_url"] = wim
["url"]
1937 elif "url" not in wim
and "wim_url" in wim
:
1938 wim
["url"] = wim
["wim_url"]
1941 wim_config
["dpid"] = wim
.pop("dpid")
1943 if wim
.get("switch_id"):
1944 wim_config
["switch_id"] = wim
.pop("switch_id")
1946 # wim, wim_account, config
1947 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1948 self
.db_vims
[target_id
] = vim
1949 self
.error_status
= None
1952 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1954 except Exception as e
:
1956 "Cannot load {} plugin={}: {} {}".format(
1957 target_id
, plugin_name
, step
, e
1961 self
.db_vims
[target_id
] = vim
or {}
1962 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1963 error_status
= "{} Error: {}".format(step
, e
)
1967 if target_id
not in self
.vim_targets
:
1968 self
.vim_targets
.append(target_id
)
1970 def _get_db_task(self
):
1972 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1977 if not self
.time_last_task_processed
:
1978 self
.time_last_task_processed
= now
1983 # Log RO tasks only when loglevel is DEBUG
1984 if self.logger.getEffectiveLevel() == logging.DEBUG:
1991 + str(self.task_locked_time)
1993 + "time_last_task_processed="
1994 + str(self.time_last_task_processed)
2000 locked
= self
.db
.set_one(
2003 "target_id": self
.vim_targets
,
2004 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2005 "locked_at.lt": now
- self
.task_locked_time
,
2006 "to_check_at.lt": self
.time_last_task_processed
,
2007 "to_check_at.gt": -1,
2009 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
2010 fail_on_empty
=False,
2015 ro_task
= self
.db
.get_one(
2018 "target_id": self
.vim_targets
,
2019 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2025 if self
.time_last_task_processed
== now
:
2026 self
.time_last_task_processed
= None
2029 self
.time_last_task_processed
= now
2030 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2032 except DbException
as e
:
2033 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
2034 except Exception as e
:
2035 self
.logger
.critical(
2036 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
2041 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2043 Determine if this task need to be done or superseded
2046 my_task
= ro_task
["tasks"][task_index
]
2047 task_id
= my_task
["task_id"]
2048 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
2049 "created_items", False
2052 self
.logger
.debug("Needed delete: {}".format(needed_delete
))
2053 if my_task
["status"] == "FAILED":
2054 return None, None # TODO need to be retry??
2057 for index
, task
in enumerate(ro_task
["tasks"]):
2058 if index
== task_index
or not task
:
2062 my_task
["target_record"] == task
["target_record"]
2063 and task
["action"] == "CREATE"
2066 db_update
["tasks.{}.status".format(index
)] = task
[
2069 elif task
["action"] == "CREATE" and task
["status"] not in (
2073 needed_delete
= False
2077 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
2079 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
2081 return "SUPERSEDED", None
2082 except Exception as e
:
2083 if not isinstance(e
, NsWorkerException
):
2084 self
.logger
.critical(
2085 "Unexpected exception at _delete_task task={}: {}".format(
2091 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2093 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2095 Determine if this task need to create something at VIM
2098 my_task
= ro_task
["tasks"][task_index
]
2099 task_id
= my_task
["task_id"]
2102 if my_task
["status"] == "FAILED":
2103 return None, None # TODO need to be retry??
2104 elif my_task
["status"] == "SCHEDULED":
2105 # check if already created by another task
2106 for index
, task
in enumerate(ro_task
["tasks"]):
2107 if index
== task_index
or not task
:
2110 if task
["action"] == "CREATE" and task
["status"] not in (
2115 return task
["status"], "COPY_VIM_INFO"
2118 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2119 ro_task
, task_index
, task_depends
2121 # TODO update other CREATE tasks
2122 except Exception as e
:
2123 if not isinstance(e
, NsWorkerException
):
2125 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2128 task_status
= "FAILED"
2129 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2130 # TODO update ro_vim_item_update
2132 return task_status
, ro_vim_item_update
2136 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2138 Look for dependency task
2139 :param task_id: Can be one of
2140 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2141 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2142 3. task.task_id: "<action_id>:number"
2145 :return: database ro_task plus index of task
2148 task_id
.startswith("vim:")
2149 or task_id
.startswith("sdn:")
2150 or task_id
.startswith("wim:")
2152 target_id
, _
, task_id
= task_id
.partition(" ")
2154 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2155 ro_task_dependency
= self
.db
.get_one(
2157 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2158 fail_on_empty
=False,
2161 if ro_task_dependency
:
2162 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2163 if task
["target_record_id"] == task_id
:
2164 return ro_task_dependency
, task_index
2168 for task_index
, task
in enumerate(ro_task
["tasks"]):
2169 if task
and task
["task_id"] == task_id
:
2170 return ro_task
, task_index
2172 ro_task_dependency
= self
.db
.get_one(
2175 "tasks.ANYINDEX.task_id": task_id
,
2176 "tasks.ANYINDEX.target_record.ne": None,
2178 fail_on_empty
=False,
2181 self
.logger
.debug("ro_task_dependency={}".format(ro_task_dependency
))
2182 if ro_task_dependency
:
2183 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2184 if task
["task_id"] == task_id
:
2185 return ro_task_dependency
, task_index
2186 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2188 def update_vm_refresh(self
, ro_task
):
2189 """Enables the VM status updates if self.refresh_config.active parameter
2190 is not -1 and then updates the DB accordingly
2194 self
.logger
.debug("Checking if VM status update config")
2195 next_refresh
= time
.time()
2196 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2198 if next_refresh
!= -1:
2199 db_ro_task_update
= {}
2201 next_check_at
= now
+ (24 * 60 * 60)
2202 next_check_at
= min(next_check_at
, next_refresh
)
2203 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2204 db_ro_task_update
["to_check_at"] = next_check_at
2207 "Finding tasks which to be updated to enable VM status updates"
2209 refresh_tasks
= self
.db
.get_list(
2212 "tasks.status": "DONE",
2213 "to_check_at.lt": 0,
2216 self
.logger
.debug("Updating tasks to change the to_check_at status")
2217 for task
in refresh_tasks
:
2224 update_dict
=db_ro_task_update
,
2228 except Exception as e
:
2229 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2231 def _get_next_refresh(self
, ro_task
: dict, next_refresh
: float):
2232 """Decide the next_refresh according to vim type and refresh config period.
2234 ro_task (dict): ro_task details
2235 next_refresh (float): next refresh time as epoch format
2238 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2240 target_vim
= ro_task
["target_id"]
2241 vim_type
= self
.db_vims
[target_vim
]["vim_type"]
2242 if self
.refresh_config
.active
== -1 or vim_type
== "openstack":
2245 next_refresh
+= self
.refresh_config
.active
2248 def _process_pending_tasks(self
, ro_task
):
2249 ro_task_id
= ro_task
["_id"]
2252 next_check_at
= now
+ (24 * 60 * 60)
2253 db_ro_task_update
= {}
2255 def _update_refresh(new_status
):
2256 # compute next_refresh
2258 nonlocal next_check_at
2259 nonlocal db_ro_task_update
2262 next_refresh
= time
.time()
2264 if task
["item"] in ("image", "flavor"):
2265 next_refresh
+= self
.refresh_config
.image
2266 elif new_status
== "BUILD":
2267 next_refresh
+= self
.refresh_config
.build
2268 elif new_status
== "DONE":
2269 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2271 next_refresh
+= self
.refresh_config
.error
2273 next_check_at
= min(next_check_at
, next_refresh
)
2274 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2275 ro_task
["vim_info"]["refresh_at"] = next_refresh
2279 # Log RO tasks only when loglevel is DEBUG
2280 if self.logger.getEffectiveLevel() == logging.DEBUG:
2281 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2283 # Check if vim status refresh is enabled again
2284 self
.update_vm_refresh(ro_task
)
2285 # 0: get task_status_create
2287 task_status_create
= None
2291 for t
in ro_task
["tasks"]
2293 and t
["action"] == "CREATE"
2294 and t
["status"] in ("BUILD", "DONE")
2300 task_status_create
= task_create
["status"]
2302 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2303 for task_action
in ("DELETE", "CREATE", "EXEC"):
2304 db_vim_update
= None
2307 for task_index
, task
in enumerate(ro_task
["tasks"]):
2309 continue # task deleted
2312 target_update
= None
2316 task_action
in ("DELETE", "EXEC")
2317 and task
["status"] not in ("SCHEDULED", "BUILD")
2319 or task
["action"] != task_action
2321 task_action
== "CREATE"
2322 and task
["status"] in ("FINISHED", "SUPERSEDED")
2327 task_path
= "tasks.{}.status".format(task_index
)
2329 db_vim_info_update
= None
2331 if task
["status"] == "SCHEDULED":
2332 # check if tasks that this depends on have been completed
2333 dependency_not_completed
= False
2335 for dependency_task_id
in task
.get("depends_on") or ():
2338 dependency_task_index
,
2339 ) = self
._get
_dependency
(
2340 dependency_task_id
, target_id
=ro_task
["target_id"]
2342 dependency_task
= dependency_ro_task
["tasks"][
2343 dependency_task_index
2346 "dependency_ro_task={} dependency_task_index={}".format(
2347 dependency_ro_task
, dependency_task_index
2351 if dependency_task
["status"] == "SCHEDULED":
2352 dependency_not_completed
= True
2353 next_check_at
= min(
2354 next_check_at
, dependency_ro_task
["to_check_at"]
2356 # must allow dependent task to be processed first
2357 # to do this set time after last_task_processed
2358 next_check_at
= max(
2359 self
.time_last_task_processed
, next_check_at
2362 elif dependency_task
["status"] == "FAILED":
2363 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2366 dependency_task
["action"],
2367 dependency_task
["item"],
2369 dependency_ro_task
["vim_info"].get(
2374 "task={} {}".format(task
["task_id"], error_text
)
2376 raise NsWorkerException(error_text
)
2378 task_depends
[dependency_task_id
] = dependency_ro_task
[
2382 "TASK-{}".format(dependency_task_id
)
2383 ] = dependency_ro_task
["vim_info"]["vim_id"]
2385 if dependency_not_completed
:
2386 self
.logger
.warning(
2387 "DEPENDENCY NOT COMPLETED {}".format(
2388 dependency_ro_task
["vim_info"]["vim_id"]
2391 # TODO set at vim_info.vim_details that it is waiting
2394 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2395 # the task of renew this locking. It will update database locket_at periodically
2397 lock_object
= LockRenew
.add_lock_object(
2398 "ro_tasks", ro_task
, self
2400 if task
["action"] == "DELETE":
2404 ) = self
._delete
_task
(
2405 ro_task
, task_index
, task_depends
, db_ro_task_update
2408 "FINISHED" if new_status
== "DONE" else new_status
2410 # ^with FINISHED instead of DONE it will not be refreshing
2412 if new_status
in ("FINISHED", "SUPERSEDED"):
2413 target_update
= "DELETE"
2414 elif task
["action"] == "EXEC":
2419 ) = self
.item2class
[task
["item"]].exec(
2420 ro_task
, task_index
, task_depends
2423 "FINISHED" if new_status
== "DONE" else new_status
2425 # ^with FINISHED instead of DONE it will not be refreshing
2428 # load into database the modified db_task_update "retries" and "next_retry"
2429 if db_task_update
.get("retries"):
2431 "tasks.{}.retries".format(task_index
)
2432 ] = db_task_update
["retries"]
2434 next_check_at
= time
.time() + db_task_update
.get(
2437 target_update
= None
2438 elif task
["action"] == "CREATE":
2439 if task
["status"] == "SCHEDULED":
2440 if task_status_create
:
2441 new_status
= task_status_create
2442 target_update
= "COPY_VIM_INFO"
2444 new_status
, db_vim_info_update
= self
.item2class
[
2446 ].new(ro_task
, task_index
, task_depends
)
2447 _update_refresh(new_status
)
2449 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2450 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2454 ) = self
.item2class
[
2457 _update_refresh(new_status
)
2459 # The refresh is updated to avoid set the value of "refresh_at" to
2460 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2461 # because it can happen that in this case the task is never processed
2462 _update_refresh(task
["status"])
2464 except Exception as e
:
2465 new_status
= "FAILED"
2466 db_vim_info_update
= {
2467 "vim_status": "VIM_ERROR",
2468 "vim_message": str(e
),
2472 e
, (NsWorkerException
, vimconn
.VimConnException
)
2475 "Unexpected exception at _delete_task task={}: {}".format(
2482 if db_vim_info_update
:
2483 db_vim_update
= db_vim_info_update
.copy()
2484 db_ro_task_update
.update(
2487 for k
, v
in db_vim_info_update
.items()
2490 ro_task
["vim_info"].update(db_vim_info_update
)
2493 if task_action
== "CREATE":
2494 task_status_create
= new_status
2495 db_ro_task_update
[task_path
] = new_status
2497 if target_update
or db_vim_update
:
2498 if target_update
== "DELETE":
2499 self
._update
_target
(task
, None)
2500 elif target_update
== "COPY_VIM_INFO":
2501 self
._update
_target
(task
, ro_task
["vim_info"])
2503 self
._update
_target
(task
, db_vim_update
)
2505 except Exception as e
:
2507 isinstance(e
, DbException
)
2508 and e
.http_code
== HTTPStatus
.NOT_FOUND
2510 # if the vnfrs or nsrs has been removed from database, this task must be removed
2512 "marking to delete task={}".format(task
["task_id"])
2514 self
.tasks_to_delete
.append(task
)
2517 "Unexpected exception at _update_target task={}: {}".format(
2523 locked_at
= ro_task
["locked_at"]
2527 lock_object
["locked_at"],
2528 lock_object
["locked_at"] + self
.task_locked_time
,
2530 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2531 # contain exactly locked_at + self.task_locked_time
2532 LockRenew
.remove_lock_object(lock_object
)
2535 "_id": ro_task
["_id"],
2536 "to_check_at": ro_task
["to_check_at"],
2537 "locked_at": locked_at
,
2539 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2540 # outside this task (by ro_nbi) do not update it
2541 db_ro_task_update
["locked_by"] = None
2542 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2543 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2544 db_ro_task_update
["modified_at"] = now
2545 db_ro_task_update
["to_check_at"] = next_check_at
2548 # Log RO tasks only when loglevel is DEBUG
2549 if self.logger.getEffectiveLevel() == logging.DEBUG:
2550 db_ro_task_update_log = db_ro_task_update.copy()
2551 db_ro_task_update_log["_id"] = q_filter["_id"]
2552 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2555 if not self
.db
.set_one(
2557 update_dict
=db_ro_task_update
,
2559 fail_on_empty
=False,
2561 del db_ro_task_update
["to_check_at"]
2562 del q_filter
["to_check_at"]
2564 # Log RO tasks only when loglevel is DEBUG
2565 if self.logger.getEffectiveLevel() == logging.DEBUG:
2568 db_ro_task_update_log,
2571 "SET_TASK " + str(q_filter),
2577 update_dict
=db_ro_task_update
,
2580 except DbException
as e
:
2582 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2584 except Exception as e
:
2586 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2589 def _update_target(self
, task
, ro_vim_item_update
):
2590 table
, _
, temp
= task
["target_record"].partition(":")
2591 _id
, _
, path_vim_status
= temp
.partition(":")
2592 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2593 path_item
= path_item
[: path_item
.rfind(".")]
2594 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2595 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2597 if ro_vim_item_update
:
2599 path_vim_status
+ "." + k
: v
2600 for k
, v
in ro_vim_item_update
.items()
2609 "interfaces_backup",
2613 if path_vim_status
.startswith("vdur."):
2614 # for backward compatibility, add vdur.name apart from vdur.vim_name
2615 if ro_vim_item_update
.get("vim_name"):
2616 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2618 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2619 if ro_vim_item_update
.get("vim_id"):
2620 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2622 # update general status
2623 if ro_vim_item_update
.get("vim_status"):
2624 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2628 if ro_vim_item_update
.get("interfaces"):
2629 path_interfaces
= path_item
+ ".interfaces"
2631 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2635 path_interfaces
+ ".{}.".format(i
) + k
: v
2636 for k
, v
in iface
.items()
2637 if k
in ("vlan", "compute_node", "pci")
2641 # put ip_address and mac_address with ip-address and mac-address
2642 if iface
.get("ip_address"):
2644 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2645 ] = iface
["ip_address"]
2647 if iface
.get("mac_address"):
2649 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2650 ] = iface
["mac_address"]
2652 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2653 update_dict
["ip-address"] = iface
.get("ip_address").split(
2657 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2658 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2662 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2664 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2665 if ro_vim_item_update
.get("interfaces"):
2666 search_key
= path_vim_status
+ ".interfaces"
2667 if update_dict
.get(search_key
):
2668 interfaces_backup_update
= {
2669 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
2674 q_filter
={"_id": _id
},
2675 update_dict
=interfaces_backup_update
,
2679 update_dict
= {path_item
+ ".status": "DELETED"}
2682 q_filter
={"_id": _id
},
2683 update_dict
=update_dict
,
2684 unset
={path_vim_status
: None},
2687 def _process_delete_db_tasks(self
):
2689 Delete task from database because vnfrs or nsrs or both have been deleted
2690 :return: None. Uses and modify self.tasks_to_delete
2692 while self
.tasks_to_delete
:
2693 task
= self
.tasks_to_delete
[0]
2694 vnfrs_deleted
= None
2695 nsr_id
= task
["nsr_id"]
2697 if task
["target_record"].startswith("vnfrs:"):
2698 # check if nsrs is present
2699 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2700 vnfrs_deleted
= task
["target_record"].split(":")[1]
2703 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2704 except Exception as e
:
2706 "Error deleting task={}: {}".format(task
["task_id"], e
)
2708 self
.tasks_to_delete
.pop(0)
2711 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2713 Static method because it is called from osm_ng_ro.ns
2714 :param db: instance of database to use
2715 :param nsr_id: affected nsrs id
2716 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2717 :return: None, exception is fails
2720 for retry
in range(retries
):
2721 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2725 for ro_task
in ro_tasks
:
2727 to_delete_ro_task
= True
2729 for index
, task
in enumerate(ro_task
["tasks"]):
2732 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2734 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2736 db_update
["tasks.{}".format(index
)] = None
2738 # used by other nsr, ro_task cannot be deleted
2739 to_delete_ro_task
= False
2741 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2742 if to_delete_ro_task
:
2746 "_id": ro_task
["_id"],
2747 "modified_at": ro_task
["modified_at"],
2749 fail_on_empty
=False,
2753 db_update
["modified_at"] = now
2757 "_id": ro_task
["_id"],
2758 "modified_at": ro_task
["modified_at"],
2760 update_dict
=db_update
,
2761 fail_on_empty
=False,
2767 raise NsWorkerException("Exceeded {} retries".format(retries
))
2771 self
.logger
.info("Starting")
2773 # step 1: get commands from queue
2775 if self
.vim_targets
:
2776 task
= self
.task_queue
.get(block
=False)
2779 self
.logger
.debug("enters in idle state")
2781 task
= self
.task_queue
.get(block
=True)
2784 if task
[0] == "terminate":
2786 elif task
[0] == "load_vim":
2787 self
.logger
.info("order to load vim {}".format(task
[1]))
2788 self
._load
_vim
(task
[1])
2789 elif task
[0] == "unload_vim":
2790 self
.logger
.info("order to unload vim {}".format(task
[1]))
2791 self
._unload
_vim
(task
[1])
2792 elif task
[0] == "reload_vim":
2793 self
._reload
_vim
(task
[1])
2794 elif task
[0] == "check_vim":
2795 self
.logger
.info("order to check vim {}".format(task
[1]))
2796 self
._check
_vim
(task
[1])
2798 except Exception as e
:
2799 if isinstance(e
, queue
.Empty
):
2802 self
.logger
.critical(
2803 "Error processing task: {}".format(e
), exc_info
=True
2806 # step 2: process pending_tasks, delete not needed tasks
2808 if self
.tasks_to_delete
:
2809 self
._process
_delete
_db
_tasks
()
2812 # Log RO tasks only when loglevel is DEBUG
2813 if self.logger.getEffectiveLevel() == logging.DEBUG:
2814 _ = self._get_db_all_tasks()
2816 ro_task
= self
._get
_db
_task
()
2818 self
.logger
.debug("Task to process: {}".format(ro_task
))
2820 self
._process
_pending
_tasks
(ro_task
)
2824 except Exception as e
:
2825 self
.logger
.critical(
2826 "Unexpected exception at run: " + str(e
), exc_info
=True
2829 self
.logger
.info("Finishing")