1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
30 from os
import makedirs
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
43 from osm_ro_plugin
import vimconn
44 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
45 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
219 "task={} {} new-net={} created={}".format(
220 task_id
, ro_task
["target_id"], vim_net_id
, created
224 return "BUILD", ro_vim_item_update
225 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
227 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
229 ro_vim_item_update
= {
230 "vim_status": "VIM_ERROR",
232 "vim_message": str(e
),
235 return "FAILED", ro_vim_item_update
237 def refresh(self
, ro_task
):
238 """Call VIM to get network status"""
239 ro_task_id
= ro_task
["_id"]
240 target_vim
= self
.my_vims
[ro_task
["target_id"]]
241 vim_id
= ro_task
["vim_info"]["vim_id"]
242 net_to_refresh_list
= [vim_id
]
245 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
246 vim_info
= vim_dict
[vim_id
]
248 if vim_info
["status"] == "ACTIVE":
250 elif vim_info
["status"] == "BUILD":
251 task_status
= "BUILD"
253 task_status
= "FAILED"
254 except vimconn
.VimConnException
as e
:
255 # Mark all tasks at VIM_ERROR status
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id
, ro_task
["target_id"], vim_id
, e
261 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
262 task_status
= "FAILED"
264 ro_vim_item_update
= {}
265 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
266 ro_vim_item_update
["vim_status"] = vim_info
["status"]
268 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
269 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
271 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
273 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
274 elif vim_info
["status"] == "DELETED":
275 ro_vim_item_update
["vim_id"] = None
276 ro_vim_item_update
["vim_message"] = "Deleted externally"
278 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
279 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
281 if ro_vim_item_update
:
283 "ro_task={} {} get-net={}: status={} {}".format(
285 ro_task
["target_id"],
287 ro_vim_item_update
.get("vim_status"),
288 ro_vim_item_update
.get("vim_message")
289 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
294 return task_status
, ro_vim_item_update
296 def delete(self
, ro_task
, task_index
):
297 task
= ro_task
["tasks"][task_index
]
298 task_id
= task
["task_id"]
299 net_vim_id
= ro_task
["vim_info"]["vim_id"]
300 ro_vim_item_update_ok
= {
301 "vim_status": "DELETED",
303 "vim_message": "DELETED",
308 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
309 target_vim
= self
.my_vims
[ro_task
["target_id"]]
310 target_vim
.delete_network(
311 net_vim_id
, ro_task
["vim_info"]["created_items"]
313 except vimconn
.VimConnNotFoundException
:
314 ro_vim_item_update_ok
["vim_message"] = "already deleted"
315 except vimconn
.VimConnException
as e
:
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
321 ro_vim_item_update
= {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e
),
326 return "FAILED", ro_vim_item_update
329 "task={} {} del-net={} {}".format(
331 ro_task
["target_id"],
333 ro_vim_item_update_ok
.get("vim_message", ""),
337 return "DONE", ro_vim_item_update_ok
340 class VimInteractionVdu(VimInteractionBase
):
341 max_retries_inject_ssh_key
= 20 # 20 times
342 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
344 def new(self
, ro_task
, task_index
, task_depends
):
345 task
= ro_task
["tasks"][task_index
]
346 task_id
= task
["task_id"]
348 target_vim
= self
.my_vims
[ro_task
["target_id"]]
351 params
= task
["params"]
352 params_copy
= deepcopy(params
)
353 net_list
= params_copy
["net_list"]
356 # change task_id into network_id
357 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
358 network_id
= task_depends
[net
["net_id"]]
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net
["net_id"])
366 net
["net_id"] = network_id
368 if params_copy
["image_id"].startswith("TASK-"):
369 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
371 if params_copy
["flavor_id"].startswith("TASK-"):
372 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
374 affinity_group_list
= params_copy
["affinity_group_list"]
375 for affinity_group
in affinity_group_list
:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group
and affinity_group
[
379 ].startswith("TASK-"):
380 affinity_group_id
= task_depends
[
381 affinity_group
["affinity_group_id"]
384 if not affinity_group_id
:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group
["affinity_group_id"])
389 affinity_group
["affinity_group_id"] = affinity_group_id
390 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
391 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
393 # add to created items previous_created_volumes (healing)
394 if task
.get("previous_created_volumes"):
395 for k
, v
in task
["previous_created_volumes"].items():
398 ro_vim_item_update
= {
400 "vim_status": "BUILD",
402 "created_items": created_items
,
405 "interfaces_vim_ids": interfaces
,
407 "interfaces_backup": [],
410 "task={} {} new-vm={} created={}".format(
411 task_id
, ro_task
["target_id"], vim_vm_id
, created
415 return "BUILD", ro_vim_item_update
416 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
417 self
.logger
.debug(traceback
.format_exc())
419 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
421 ro_vim_item_update
= {
422 "vim_status": "VIM_ERROR",
424 "vim_message": str(e
),
427 return "FAILED", ro_vim_item_update
429 def delete(self
, ro_task
, task_index
):
430 task
= ro_task
["tasks"][task_index
]
431 task_id
= task
["task_id"]
432 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
433 ro_vim_item_update_ok
= {
434 "vim_status": "DELETED",
436 "vim_message": "DELETED",
442 "delete_vminstance: vm_vim_id={} created_items={}".format(
443 vm_vim_id
, ro_task
["vim_info"]["created_items"]
446 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
447 target_vim
= self
.my_vims
[ro_task
["target_id"]]
448 target_vim
.delete_vminstance(
450 ro_task
["vim_info"]["created_items"],
451 ro_task
["vim_info"].get("volumes_to_hold", []),
453 except vimconn
.VimConnNotFoundException
:
454 ro_vim_item_update_ok
["vim_message"] = "already deleted"
455 except vimconn
.VimConnException
as e
:
457 "ro_task={} vim={} del-vm={}: {}".format(
458 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
461 ro_vim_item_update
= {
462 "vim_status": "VIM_ERROR",
463 "vim_message": "Error while deleting: {}".format(e
),
466 return "FAILED", ro_vim_item_update
469 "task={} {} del-vm={} {}".format(
471 ro_task
["target_id"],
473 ro_vim_item_update_ok
.get("vim_message", ""),
477 return "DONE", ro_vim_item_update_ok
479 def refresh(self
, ro_task
):
480 """Call VIM to get vm status"""
481 ro_task_id
= ro_task
["_id"]
482 target_vim
= self
.my_vims
[ro_task
["target_id"]]
483 vim_id
= ro_task
["vim_info"]["vim_id"]
488 vm_to_refresh_list
= [vim_id
]
490 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
491 vim_info
= vim_dict
[vim_id
]
493 if vim_info
["status"] == "ACTIVE":
495 elif vim_info
["status"] == "BUILD":
496 task_status
= "BUILD"
498 task_status
= "FAILED"
500 # try to load and parse vim_information
502 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
503 if vim_info_info
.get("name"):
504 vim_info
["name"] = vim_info_info
["name"]
505 except Exception as vim_info_error
:
506 self
.logger
.exception(
507 f
"{vim_info_error} occured while getting the vim_info from yaml"
509 except vimconn
.VimConnException
as e
:
510 # Mark all tasks at VIM_ERROR status
512 "ro_task={} vim={} get-vm={}: {}".format(
513 ro_task_id
, ro_task
["target_id"], vim_id
, e
516 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
517 task_status
= "FAILED"
519 ro_vim_item_update
= {}
521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
523 if vim_info
.get("interfaces"):
524 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
528 for iface
in vim_info
["interfaces"]
529 if vim_iface_id
== iface
["vim_interface_id"]
534 # iface.pop("vim_info", None)
535 vim_interfaces
.append(iface
)
539 for t
in ro_task
["tasks"]
540 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
542 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
543 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
547 mgmt_vdu_iface
= task_create
.get(
548 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
551 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
553 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
554 ro_vim_item_update
["interfaces"] = vim_interfaces
556 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
557 ro_vim_item_update
["vim_status"] = vim_info
["status"]
559 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
560 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
562 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
563 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
564 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
565 elif vim_info
["status"] == "DELETED":
566 ro_vim_item_update
["vim_id"] = None
567 ro_vim_item_update
["vim_message"] = "Deleted externally"
569 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
570 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
572 if ro_vim_item_update
:
574 "ro_task={} {} get-vm={}: status={} {}".format(
576 ro_task
["target_id"],
578 ro_vim_item_update
.get("vim_status"),
579 ro_vim_item_update
.get("vim_message")
580 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
585 return task_status
, ro_vim_item_update
587 def exec(self
, ro_task
, task_index
, task_depends
):
588 task
= ro_task
["tasks"][task_index
]
589 task_id
= task
["task_id"]
590 target_vim
= self
.my_vims
[ro_task
["target_id"]]
591 db_task_update
= {"retries": 0}
592 retries
= task
.get("retries", 0)
595 params
= task
["params"]
596 params_copy
= deepcopy(params
)
597 params_copy
["ro_key"] = self
.db
.decrypt(
598 params_copy
.pop("private_key"),
599 params_copy
.pop("schema_version"),
600 params_copy
.pop("salt"),
602 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
603 target_vim
.inject_user_key(**params_copy
)
605 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
612 ) # params_copy["key"]
613 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
616 self
.logger
.debug(traceback
.format_exc())
617 if retries
< self
.max_retries_inject_ssh_key
:
623 "next_retry": self
.time_retries_inject_ssh_key
,
628 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
630 ro_vim_item_update
= {"vim_message": str(e
)}
632 return "FAILED", ro_vim_item_update
, db_task_update
635 class VimInteractionImage(VimInteractionBase
):
636 def new(self
, ro_task
, task_index
, task_depends
):
637 task
= ro_task
["tasks"][task_index
]
638 task_id
= task
["task_id"]
641 target_vim
= self
.my_vims
[ro_task
["target_id"]]
646 if task
.get("find_params"):
647 vim_images
= target_vim
.get_image_list(**task
["find_params"])
650 raise NsWorkerExceptionNotFound(
651 "Image not found with this criteria: '{}'".format(
655 elif len(vim_images
) > 1:
656 raise NsWorkerException(
657 "More than one image found with this criteria: '{}'".format(
662 vim_image_id
= vim_images
[0]["id"]
664 ro_vim_item_update
= {
665 "vim_id": vim_image_id
,
666 "vim_status": "ACTIVE",
668 "created_items": created_items
,
673 "task={} {} new-image={} created={}".format(
674 task_id
, ro_task
["target_id"], vim_image_id
, created
678 return "DONE", ro_vim_item_update
679 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
681 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
683 ro_vim_item_update
= {
684 "vim_status": "VIM_ERROR",
686 "vim_message": str(e
),
689 return "FAILED", ro_vim_item_update
692 class VimInteractionSharedVolume(VimInteractionBase
):
693 def delete(self
, ro_task
, task_index
):
694 task
= ro_task
["tasks"][task_index
]
695 task_id
= task
["task_id"]
696 shared_volume_vim_id
= ro_task
["vim_info"]["vim_id"]
697 created_items
= ro_task
["vim_info"]["created_items"]
698 ro_vim_item_update_ok
= {
699 "vim_status": "DELETED",
701 "vim_message": "DELETED",
704 if created_items
and created_items
.get(shared_volume_vim_id
).get("keep"):
705 ro_vim_item_update_ok
= {
706 "vim_status": "ACTIVE",
710 return "DONE", ro_vim_item_update_ok
712 if shared_volume_vim_id
:
713 target_vim
= self
.my_vims
[ro_task
["target_id"]]
714 target_vim
.delete_shared_volumes(shared_volume_vim_id
)
715 except vimconn
.VimConnNotFoundException
:
716 ro_vim_item_update_ok
["vim_message"] = "already deleted"
717 except vimconn
.VimConnException
as e
:
719 "ro_task={} vim={} del-shared-volume={}: {}".format(
720 ro_task
["_id"], ro_task
["target_id"], shared_volume_vim_id
, e
723 ro_vim_item_update
= {
724 "vim_status": "VIM_ERROR",
725 "vim_message": "Error while deleting: {}".format(e
),
728 return "FAILED", ro_vim_item_update
731 "task={} {} del-shared-volume={} {}".format(
733 ro_task
["target_id"],
734 shared_volume_vim_id
,
735 ro_vim_item_update_ok
.get("vim_message", ""),
739 return "DONE", ro_vim_item_update_ok
741 def new(self
, ro_task
, task_index
, task_depends
):
742 task
= ro_task
["tasks"][task_index
]
743 task_id
= task
["task_id"]
746 target_vim
= self
.my_vims
[ro_task
["target_id"]]
749 shared_volume_vim_id
= None
750 shared_volume_data
= None
752 if task
.get("params"):
753 shared_volume_data
= task
["params"]
755 if shared_volume_data
:
757 f
"Creating the new shared_volume for {shared_volume_data}\n"
761 shared_volume_vim_id
,
762 ) = target_vim
.new_shared_volumes(shared_volume_data
)
764 created_items
[shared_volume_vim_id
] = {
765 "name": shared_volume_name
,
766 "keep": shared_volume_data
.get("keep"),
769 ro_vim_item_update
= {
770 "vim_id": shared_volume_vim_id
,
771 "vim_status": "ACTIVE",
773 "created_items": created_items
,
778 "task={} {} new-shared-volume={} created={}".format(
779 task_id
, ro_task
["target_id"], shared_volume_vim_id
, created
783 return "DONE", ro_vim_item_update
784 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
786 "task={} vim={} new-shared-volume:"
787 " {}".format(task_id
, ro_task
["target_id"], e
)
789 ro_vim_item_update
= {
790 "vim_status": "VIM_ERROR",
792 "vim_message": str(e
),
795 return "FAILED", ro_vim_item_update
798 class VimInteractionFlavor(VimInteractionBase
):
799 def delete(self
, ro_task
, task_index
):
800 task
= ro_task
["tasks"][task_index
]
801 task_id
= task
["task_id"]
802 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
803 ro_vim_item_update_ok
= {
804 "vim_status": "DELETED",
806 "vim_message": "DELETED",
812 target_vim
= self
.my_vims
[ro_task
["target_id"]]
813 target_vim
.delete_flavor(flavor_vim_id
)
814 except vimconn
.VimConnNotFoundException
:
815 ro_vim_item_update_ok
["vim_message"] = "already deleted"
816 except vimconn
.VimConnException
as e
:
818 "ro_task={} vim={} del-flavor={}: {}".format(
819 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
822 ro_vim_item_update
= {
823 "vim_status": "VIM_ERROR",
824 "vim_message": "Error while deleting: {}".format(e
),
827 return "FAILED", ro_vim_item_update
830 "task={} {} del-flavor={} {}".format(
832 ro_task
["target_id"],
834 ro_vim_item_update_ok
.get("vim_message", ""),
838 return "DONE", ro_vim_item_update_ok
840 def new(self
, ro_task
, task_index
, task_depends
):
841 task
= ro_task
["tasks"][task_index
]
842 task_id
= task
["task_id"]
845 target_vim
= self
.my_vims
[ro_task
["target_id"]]
850 if task
.get("find_params", {}).get("vim_flavor_id"):
851 vim_flavor_id
= task
["find_params"]["vim_flavor_id"]
852 elif task
.get("find_params", {}).get("flavor_data"):
854 flavor_data
= task
["find_params"]["flavor_data"]
855 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
856 except vimconn
.VimConnNotFoundException
as flavor_not_found_msg
:
858 f
"VimConnNotFoundException occured: {flavor_not_found_msg}"
861 if not vim_flavor_id
and task
.get("params"):
863 flavor_data
= task
["params"]["flavor_data"]
864 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
867 ro_vim_item_update
= {
868 "vim_id": vim_flavor_id
,
869 "vim_status": "ACTIVE",
871 "created_items": created_items
,
876 "task={} {} new-flavor={} created={}".format(
877 task_id
, ro_task
["target_id"], vim_flavor_id
, created
881 return "DONE", ro_vim_item_update
882 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
884 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
886 ro_vim_item_update
= {
887 "vim_status": "VIM_ERROR",
889 "vim_message": str(e
),
892 return "FAILED", ro_vim_item_update
895 class VimInteractionAffinityGroup(VimInteractionBase
):
896 def delete(self
, ro_task
, task_index
):
897 task
= ro_task
["tasks"][task_index
]
898 task_id
= task
["task_id"]
899 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
900 ro_vim_item_update_ok
= {
901 "vim_status": "DELETED",
903 "vim_message": "DELETED",
908 if affinity_group_vim_id
:
909 target_vim
= self
.my_vims
[ro_task
["target_id"]]
910 target_vim
.delete_affinity_group(affinity_group_vim_id
)
911 except vimconn
.VimConnNotFoundException
:
912 ro_vim_item_update_ok
["vim_message"] = "already deleted"
913 except vimconn
.VimConnException
as e
:
915 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
916 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
919 ro_vim_item_update
= {
920 "vim_status": "VIM_ERROR",
921 "vim_message": "Error while deleting: {}".format(e
),
924 return "FAILED", ro_vim_item_update
927 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
929 ro_task
["target_id"],
930 affinity_group_vim_id
,
931 ro_vim_item_update_ok
.get("vim_message", ""),
935 return "DONE", ro_vim_item_update_ok
937 def new(self
, ro_task
, task_index
, task_depends
):
938 task
= ro_task
["tasks"][task_index
]
939 task_id
= task
["task_id"]
942 target_vim
= self
.my_vims
[ro_task
["target_id"]]
945 affinity_group_vim_id
= None
946 affinity_group_data
= None
947 param_affinity_group_id
= ""
949 if task
.get("params"):
950 affinity_group_data
= task
["params"].get("affinity_group_data")
952 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
954 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
955 "vim-affinity-group-id"
957 affinity_group_vim_id
= target_vim
.get_affinity_group(
958 param_affinity_group_id
960 except vimconn
.VimConnNotFoundException
:
962 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
963 "could not be found at VIM. Creating a new one.".format(
964 task_id
, ro_task
["target_id"], param_affinity_group_id
968 if not affinity_group_vim_id
and affinity_group_data
:
969 affinity_group_vim_id
= target_vim
.new_affinity_group(
974 ro_vim_item_update
= {
975 "vim_id": affinity_group_vim_id
,
976 "vim_status": "ACTIVE",
978 "created_items": created_items
,
983 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
984 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
988 return "DONE", ro_vim_item_update
989 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
991 "task={} vim={} new-affinity-or-anti-affinity-group:"
992 " {}".format(task_id
, ro_task
["target_id"], e
)
994 ro_vim_item_update
= {
995 "vim_status": "VIM_ERROR",
997 "vim_message": str(e
),
1000 return "FAILED", ro_vim_item_update
1003 class VimInteractionUpdateVdu(VimInteractionBase
):
1004 def exec(self
, ro_task
, task_index
, task_depends
):
1005 task
= ro_task
["tasks"][task_index
]
1006 task_id
= task
["task_id"]
1007 db_task_update
= {"retries": 0}
1010 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1014 if task
.get("params"):
1015 vim_vm_id
= task
["params"].get("vim_vm_id")
1016 action
= task
["params"].get("action")
1017 context
= {action
: action
}
1018 target_vim
.action_vminstance(vim_vm_id
, context
)
1020 ro_vim_item_update
= {
1021 "vim_id": vim_vm_id
,
1022 "vim_status": "ACTIVE",
1024 "created_items": created_items
,
1025 "vim_details": None,
1026 "vim_message": None,
1029 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1031 return "DONE", ro_vim_item_update
, db_task_update
1032 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1034 "task={} vim={} VM Migration:"
1035 " {}".format(task_id
, ro_task
["target_id"], e
)
1037 ro_vim_item_update
= {
1038 "vim_status": "VIM_ERROR",
1040 "vim_message": str(e
),
1043 return "FAILED", ro_vim_item_update
, db_task_update
1046 class VimInteractionSdnNet(VimInteractionBase
):
1048 def _match_pci(port_pci
, mapping
):
1050 Check if port_pci matches with mapping.
1051 The mapping can have brackets to indicate that several chars are accepted. e.g
1052 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1053 :param port_pci: text
1054 :param mapping: text, can contain brackets to indicate several chars are available
1055 :return: True if matches, False otherwise
1057 if not port_pci
or not mapping
:
1059 if port_pci
== mapping
:
1065 bracket_start
= mapping
.find("[", mapping_index
)
1067 if bracket_start
== -1:
1070 bracket_end
= mapping
.find("]", bracket_start
)
1071 if bracket_end
== -1:
1074 length
= bracket_start
- mapping_index
1077 and port_pci
[pci_index
: pci_index
+ length
]
1078 != mapping
[mapping_index
:bracket_start
]
1083 port_pci
[pci_index
+ length
]
1084 not in mapping
[bracket_start
+ 1 : bracket_end
]
1088 pci_index
+= length
+ 1
1089 mapping_index
= bracket_end
+ 1
1091 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
1096 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
1098 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1099 :param vim_account_id:
1104 for vld
in vlds_to_connect
:
1105 table
, _
, db_id
= vld
.partition(":")
1106 db_id
, _
, vld
= db_id
.partition(":")
1107 _
, _
, vld_id
= vld
.partition(".")
1109 if table
== "vnfrs":
1110 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
1111 iface_key
= "vnf-vld-id"
1112 else: # table == "nsrs"
1113 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1114 iface_key
= "ns-vld-id"
1116 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1118 for db_vnfr
in db_vnfrs
:
1119 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1120 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1121 if interface
.get(iface_key
) == vld_id
and interface
.get(
1123 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1125 interface_
= interface
.copy()
1126 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1127 db_vnfr
["_id"], vdu_index
, iface_index
1130 if vdur
.get("status") == "ERROR":
1131 interface_
["status"] = "ERROR"
1133 interfaces
.append(interface_
)
1137 def refresh(self
, ro_task
):
1138 # look for task create
1139 task_create_index
, _
= next(
1141 for i_t
in enumerate(ro_task
["tasks"])
1143 and i_t
[1]["action"] == "CREATE"
1144 and i_t
[1]["status"] != "FINISHED"
1147 return self
.new(ro_task
, task_create_index
, None)
1149 def new(self
, ro_task
, task_index
, task_depends
):
1150 task
= ro_task
["tasks"][task_index
]
1151 task_id
= task
["task_id"]
1152 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1154 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1156 created_items
= ro_task
["vim_info"].get("created_items")
1157 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1158 new_connected_ports
= []
1159 last_update
= ro_task
["vim_info"].get("last_update", 0)
1160 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1162 created
= ro_task
["vim_info"].get("created", False)
1167 params
= task
["params"]
1168 vlds_to_connect
= params
.get("vlds", [])
1169 associated_vim
= params
.get("target_vim")
1170 # external additional ports
1171 additional_ports
= params
.get("sdn-ports") or ()
1172 _
, _
, vim_account_id
= (
1174 if associated_vim
is None
1175 else associated_vim
.partition(":")
1179 # get associated VIM
1180 if associated_vim
not in self
.db_vims
:
1181 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1182 "vim_accounts", {"_id": vim_account_id
}
1185 db_vim
= self
.db_vims
[associated_vim
]
1187 # look for ports to connect
1188 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1192 pending_ports
= error_ports
= 0
1194 sdn_need_update
= False
1197 vlan_used
= port
.get("vlan") or vlan_used
1199 # TODO. Do not connect if already done
1200 if not port
.get("compute_node") or not port
.get("pci"):
1201 if port
.get("status") == "ERROR":
1208 compute_node_mappings
= next(
1211 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1212 if c
and c
["compute_node"] == port
["compute_node"]
1217 if compute_node_mappings
:
1218 # process port_mapping pci of type 0000:af:1[01].[1357]
1222 for p
in compute_node_mappings
["ports"]
1223 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1229 if not db_vim
["config"].get("mapping_not_needed"):
1231 "Port mapping not found for compute_node={} pci={}".format(
1232 port
["compute_node"], port
["pci"]
1239 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1241 "service_endpoint_id": pmap
.get("service_endpoint_id")
1242 or service_endpoint_id
,
1243 "service_endpoint_encapsulation_type": "dot1q"
1244 if port
["type"] == "SR-IOV"
1246 "service_endpoint_encapsulation_info": {
1247 "vlan": port
.get("vlan"),
1248 "mac": port
.get("mac-address"),
1249 "device_id": pmap
.get("device_id") or port
["compute_node"],
1250 "device_interface_id": pmap
.get("device_interface_id")
1252 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1253 "switch_port": pmap
.get("switch_port"),
1254 "service_mapping_info": pmap
.get("service_mapping_info"),
1259 # if port["modified_at"] > last_update:
1260 # sdn_need_update = True
1261 new_connected_ports
.append(port
["id"]) # TODO
1262 sdn_ports
.append(new_port
)
1266 "{} interfaces have not been created as VDU is on ERROR status".format(
1271 # connect external ports
1272 for index
, additional_port
in enumerate(additional_ports
):
1273 additional_port_id
= additional_port
.get(
1274 "service_endpoint_id"
1275 ) or "external-{}".format(index
)
1278 "service_endpoint_id": additional_port_id
,
1279 "service_endpoint_encapsulation_type": additional_port
.get(
1280 "service_endpoint_encapsulation_type", "dot1q"
1282 "service_endpoint_encapsulation_info": {
1283 "vlan": additional_port
.get("vlan") or vlan_used
,
1284 "mac": additional_port
.get("mac_address"),
1285 "device_id": additional_port
.get("device_id"),
1286 "device_interface_id": additional_port
.get(
1287 "device_interface_id"
1289 "switch_dpid": additional_port
.get("switch_dpid")
1290 or additional_port
.get("switch_id"),
1291 "switch_port": additional_port
.get("switch_port"),
1292 "service_mapping_info": additional_port
.get(
1293 "service_mapping_info"
1298 new_connected_ports
.append(additional_port_id
)
1301 # if there are more ports to connect or they have been modified, call create/update
1303 sdn_status
= "ERROR"
1304 sdn_info
= "; ".join(error_list
)
1305 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1306 last_update
= time
.time()
1309 if len(sdn_ports
) < 2:
1310 sdn_status
= "ACTIVE"
1312 if not pending_ports
:
1314 "task={} {} new-sdn-net done, less than 2 ports".format(
1315 task_id
, ro_task
["target_id"]
1319 net_type
= params
.get("type") or "ELAN"
1323 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1326 "task={} {} new-sdn-net={} created={}".format(
1327 task_id
, ro_task
["target_id"], sdn_net_id
, created
1331 created_items
= target_vim
.edit_connectivity_service(
1332 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1336 "task={} {} update-sdn-net={} created={}".format(
1337 task_id
, ro_task
["target_id"], sdn_net_id
, created
1341 connected_ports
= new_connected_ports
1343 wim_status_dict
= target_vim
.get_connectivity_service_status(
1344 sdn_net_id
, conn_info
=created_items
1346 sdn_status
= wim_status_dict
["sdn_status"]
1348 if wim_status_dict
.get("sdn_info"):
1349 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1351 if wim_status_dict
.get("error_msg"):
1352 sdn_info
= wim_status_dict
.get("error_msg") or ""
1355 if sdn_status
!= "ERROR":
1356 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1357 len(ports
) - pending_ports
, len(ports
)
1360 if sdn_status
== "ACTIVE":
1361 sdn_status
= "BUILD"
1363 ro_vim_item_update
= {
1364 "vim_id": sdn_net_id
,
1365 "vim_status": sdn_status
,
1367 "created_items": created_items
,
1368 "connected_ports": connected_ports
,
1369 "vim_details": sdn_info
,
1370 "vim_message": None,
1371 "last_update": last_update
,
1374 return sdn_status
, ro_vim_item_update
1375 except Exception as e
:
1377 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1378 exc_info
=not isinstance(
1379 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1382 ro_vim_item_update
= {
1383 "vim_status": "VIM_ERROR",
1385 "vim_message": str(e
),
1388 return "FAILED", ro_vim_item_update
1390 def delete(self
, ro_task
, task_index
):
1391 task
= ro_task
["tasks"][task_index
]
1392 task_id
= task
["task_id"]
1393 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1394 ro_vim_item_update_ok
= {
1395 "vim_status": "DELETED",
1397 "vim_message": "DELETED",
1403 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1404 target_vim
.delete_connectivity_service(
1405 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1408 except Exception as e
:
1410 isinstance(e
, sdnconn
.SdnConnectorError
)
1411 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1413 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1416 "ro_task={} vim={} del-sdn-net={}: {}".format(
1417 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1419 exc_info
=not isinstance(
1420 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1423 ro_vim_item_update
= {
1424 "vim_status": "VIM_ERROR",
1425 "vim_message": "Error while deleting: {}".format(e
),
1428 return "FAILED", ro_vim_item_update
1431 "task={} {} del-sdn-net={} {}".format(
1433 ro_task
["target_id"],
1435 ro_vim_item_update_ok
.get("vim_message", ""),
1439 return "DONE", ro_vim_item_update_ok
1442 class VimInteractionMigration(VimInteractionBase
):
1443 def exec(self
, ro_task
, task_index
, task_depends
):
1444 task
= ro_task
["tasks"][task_index
]
1445 task_id
= task
["task_id"]
1446 db_task_update
= {"retries": 0}
1447 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1451 refreshed_vim_info
= {}
1455 if task
.get("params"):
1456 vim_vm_id
= task
["params"].get("vim_vm_id")
1457 migrate_host
= task
["params"].get("migrate_host")
1458 _
, migrated_compute_node
= target_vim
.migrate_instance(
1459 vim_vm_id
, migrate_host
1462 if migrated_compute_node
:
1463 # When VM is migrated, vdu["vim_info"] needs to be updated
1464 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1465 ro_task
["target_id"]
1468 # Refresh VM to get new vim_info
1469 vm_to_refresh_list
= [vim_vm_id
]
1470 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1471 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1473 if refreshed_vim_info
.get("interfaces"):
1474 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1478 for iface
in refreshed_vim_info
["interfaces"]
1479 if old_iface
["vim_interface_id"]
1480 == iface
["vim_interface_id"]
1484 vim_interfaces
.append(iface
)
1486 ro_vim_item_update
= {
1487 "vim_id": vim_vm_id
,
1488 "vim_status": "ACTIVE",
1490 "created_items": created_items
,
1491 "vim_details": None,
1492 "vim_message": None,
1495 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1499 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1502 ro_vim_item_update
["interfaces"] = vim_interfaces
1505 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1508 return "DONE", ro_vim_item_update
, db_task_update
1510 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1512 "task={} vim={} VM Migration:"
1513 " {}".format(task_id
, ro_task
["target_id"], e
)
1515 ro_vim_item_update
= {
1516 "vim_status": "VIM_ERROR",
1518 "vim_message": str(e
),
1521 return "FAILED", ro_vim_item_update
, db_task_update
1524 class VimInteractionResize(VimInteractionBase
):
1525 def exec(self
, ro_task
, task_index
, task_depends
):
1526 task
= ro_task
["tasks"][task_index
]
1527 task_id
= task
["task_id"]
1528 db_task_update
= {"retries": 0}
1530 target_flavor_uuid
= None
1532 refreshed_vim_info
= {}
1533 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1537 if task
.get("params"):
1538 vim_vm_id
= task
["params"].get("vim_vm_id")
1539 flavor_dict
= task
["params"].get("flavor_dict")
1540 self
.logger
.info("flavor_dict %s", flavor_dict
)
1543 target_flavor_uuid
= target_vim
.get_flavor_id_from_data(flavor_dict
)
1544 except Exception as e
:
1545 self
.logger
.info("Cannot find any flavor matching %s.", str(e
))
1547 target_flavor_uuid
= target_vim
.new_flavor(flavor_dict
)
1548 except Exception as e
:
1549 self
.logger
.error("Error creating flavor at VIM %s.", str(e
))
1551 if target_flavor_uuid
is not None:
1552 resized_status
= target_vim
.resize_instance(
1553 vim_vm_id
, target_flavor_uuid
1557 # Refresh VM to get new vim_info
1558 vm_to_refresh_list
= [vim_vm_id
]
1559 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1560 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1562 ro_vim_item_update
= {
1563 "vim_id": vim_vm_id
,
1564 "vim_status": "ACTIVE",
1566 "created_items": created_items
,
1567 "vim_details": None,
1568 "vim_message": None,
1571 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1575 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1578 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1580 return "DONE", ro_vim_item_update
, db_task_update
1581 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1583 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1585 ro_vim_item_update
= {
1586 "vim_status": "VIM_ERROR",
1588 "vim_message": str(e
),
1591 return "FAILED", ro_vim_item_update
, db_task_update
1594 class ConfigValidate
:
1595 def __init__(self
, config
: Dict
):
1600 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1602 self
.conf
["period"]["refresh_active"] >= 60
1603 or self
.conf
["period"]["refresh_active"] == -1
1605 return self
.conf
["period"]["refresh_active"]
1611 return self
.conf
["period"]["refresh_build"]
1615 return self
.conf
["period"]["refresh_image"]
1619 return self
.conf
["period"]["refresh_error"]
1622 def queue_size(self
):
1623 return self
.conf
["period"]["queue_size"]
1626 class NsWorker(threading
.Thread
):
1627 def __init__(self
, worker_index
, config
, plugins
, db
):
1629 :param worker_index: thread index
1630 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1631 :param plugins: global shared dict with the loaded plugins
1632 :param db: database class instance to use
1634 threading
.Thread
.__init
__(self
)
1635 self
.config
= config
1636 self
.plugins
= plugins
1637 self
.plugin_name
= "unknown"
1638 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1639 self
.worker_index
= worker_index
1640 # refresh periods for created items
1641 self
.refresh_config
= ConfigValidate(config
)
1642 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1643 # targetvim: vimplugin class
1645 # targetvim: vim information from database
1648 self
.vim_targets
= []
1649 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1652 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1653 "shared-volumes": VimInteractionSharedVolume(
1654 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1656 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1657 "image": VimInteractionImage(
1658 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1660 "flavor": VimInteractionFlavor(
1661 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1663 "sdn_net": VimInteractionSdnNet(
1664 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1666 "update": VimInteractionUpdateVdu(
1667 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1669 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1670 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1672 "migrate": VimInteractionMigration(
1673 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1675 "verticalscale": VimInteractionResize(
1676 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1679 self
.time_last_task_processed
= None
1680 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1681 self
.tasks_to_delete
= []
1682 # it is idle when there are not vim_targets associated
1684 self
.task_locked_time
= config
["global"]["task_locked_time"]
1686 def insert_task(self
, task
):
1688 self
.task_queue
.put(task
, False)
1691 raise NsWorkerException("timeout inserting a task")
1693 def terminate(self
):
1694 self
.insert_task("exit")
1696 def del_task(self
, task
):
1697 with self
.task_lock
:
1698 if task
["status"] == "SCHEDULED":
1699 task
["status"] = "SUPERSEDED"
1701 else: # task["status"] == "processing"
1702 self
.task_lock
.release()
1705 def _process_vim_config(self
, target_id
: str, db_vim
: dict) -> None:
1707 Process vim config, creating vim configuration files as ca_cert
1708 :param target_id: vim/sdn/wim + id
1709 :param db_vim: Vim dictionary obtained from database
1710 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1712 if not db_vim
.get("config"):
1716 work_dir
= "/app/osm_ro/certs"
1719 if db_vim
["config"].get("ca_cert_content"):
1720 file_name
= f
"{work_dir}/{target_id}:{self.worker_index}"
1722 if not path
.isdir(file_name
):
1725 file_name
= file_name
+ "/ca_cert"
1727 with
open(file_name
, "w") as f
:
1728 f
.write(db_vim
["config"]["ca_cert_content"])
1729 del db_vim
["config"]["ca_cert_content"]
1730 db_vim
["config"]["ca_cert"] = file_name
1731 except Exception as e
:
1732 raise NsWorkerException(
1733 "Error writing to file '{}': {}".format(file_name
, e
)
1736 def _load_plugin(self
, name
, type="vim"):
1737 # type can be vim or sdn
1738 if "rovim_dummy" not in self
.plugins
:
1739 self
.plugins
["rovim_dummy"] = VimDummyConnector
1741 if "rosdn_dummy" not in self
.plugins
:
1742 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1744 if name
in self
.plugins
:
1745 return self
.plugins
[name
]
1748 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1749 self
.plugins
[name
] = ep
.load()
1750 except Exception as e
:
1751 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1753 if name
and name
not in self
.plugins
:
1754 raise NsWorkerException(
1755 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1758 return self
.plugins
[name
]
1760 def _unload_vim(self
, target_id
):
1762 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1763 :param target_id: Contains type:_id; where type can be 'vim', ...
1767 self
.db_vims
.pop(target_id
, None)
1768 self
.my_vims
.pop(target_id
, None)
1770 if target_id
in self
.vim_targets
:
1771 self
.vim_targets
.remove(target_id
)
1773 self
.logger
.info("Unloaded {}".format(target_id
))
1774 except Exception as e
:
1775 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1777 def _check_vim(self
, target_id
):
1779 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1780 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1783 target
, _
, _id
= target_id
.partition(":")
1789 loaded
= target_id
in self
.vim_targets
1800 step
= "Getting {} from db".format(target_id
)
1801 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1803 for op_index
, operation
in enumerate(
1804 db_vim
["_admin"].get("operations", ())
1806 if operation
["operationState"] != "PROCESSING":
1809 locked_at
= operation
.get("locked_at")
1811 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1812 # some other thread is doing this operation
1816 op_text
= "_admin.operations.{}.".format(op_index
)
1818 if not self
.db
.set_one(
1822 op_text
+ "operationState": "PROCESSING",
1823 op_text
+ "locked_at": locked_at
,
1826 op_text
+ "locked_at": now
,
1827 "admin.current_operation": op_index
,
1829 fail_on_empty
=False,
1833 unset_dict
[op_text
+ "locked_at"] = None
1834 unset_dict
["current_operation"] = None
1835 step
= "Loading " + target_id
1836 error_text
= self
._load
_vim
(target_id
)
1839 step
= "Checking connectivity"
1842 self
.my_vims
[target_id
].check_vim_connectivity()
1844 self
.my_vims
[target_id
].check_credentials()
1846 update_dict
["_admin.operationalState"] = "ENABLED"
1847 update_dict
["_admin.detailed-status"] = ""
1848 unset_dict
[op_text
+ "detailed-status"] = None
1849 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1853 except Exception as e
:
1854 error_text
= "{}: {}".format(step
, e
)
1855 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1858 if update_dict
or unset_dict
:
1860 update_dict
[op_text
+ "operationState"] = "FAILED"
1861 update_dict
[op_text
+ "detailed-status"] = error_text
1862 unset_dict
.pop(op_text
+ "detailed-status", None)
1863 update_dict
["_admin.operationalState"] = "ERROR"
1864 update_dict
["_admin.detailed-status"] = error_text
1867 update_dict
[op_text
+ "statusEnteredTime"] = now
1871 q_filter
={"_id": _id
},
1872 update_dict
=update_dict
,
1874 fail_on_empty
=False,
1878 self
._unload
_vim
(target_id
)
1880 def _reload_vim(self
, target_id
):
1881 if target_id
in self
.vim_targets
:
1882 self
._load
_vim
(target_id
)
1884 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1885 # just remove it to force load again next time it is needed
1886 self
.db_vims
.pop(target_id
, None)
1888 def _load_vim(self
, target_id
):
1890 Load or reload a vim_account, sdn_controller or wim_account.
1891 Read content from database, load the plugin if not loaded.
1892 In case of error loading the plugin, it loads a failing VIM_connector
1893 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1894 :param target_id: Contains type:_id; where type can be 'vim', ...
1895 :return: None if ok, descriptive text if error
1897 target
, _
, _id
= target_id
.partition(":")
1907 step
= "Getting {}={} from db".format(target
, _id
)
1910 # TODO process for wim, sdnc, ...
1911 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1913 # if deep_get(vim, "config", "sdn-controller"):
1914 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1915 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1917 step
= "Decrypting password"
1918 schema_version
= vim
.get("schema_version")
1919 self
.db
.encrypt_decrypt_fields(
1922 fields
=("password", "secret"),
1923 schema_version
=schema_version
,
1926 self
._process
_vim
_config
(target_id
, vim
)
1929 plugin_name
= "rovim_" + vim
["vim_type"]
1930 step
= "Loading plugin '{}'".format(plugin_name
)
1931 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1932 step
= "Loading {}'".format(target_id
)
1933 self
.my_vims
[target_id
] = vim_module_conn(
1936 tenant_id
=vim
.get("vim_tenant_id"),
1937 tenant_name
=vim
.get("vim_tenant_name"),
1940 user
=vim
["vim_user"],
1941 passwd
=vim
["vim_password"],
1942 config
=vim
.get("config") or {},
1946 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
1947 step
= "Loading plugin '{}'".format(plugin_name
)
1948 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1949 step
= "Loading {}'".format(target_id
)
1951 wim_config
= wim
.pop("config", {}) or {}
1952 wim
["uuid"] = wim
["_id"]
1953 if "url" in wim
and "wim_url" not in wim
:
1954 wim
["wim_url"] = wim
["url"]
1955 elif "url" not in wim
and "wim_url" in wim
:
1956 wim
["url"] = wim
["wim_url"]
1959 wim_config
["dpid"] = wim
.pop("dpid")
1961 if wim
.get("switch_id"):
1962 wim_config
["switch_id"] = wim
.pop("switch_id")
1964 # wim, wim_account, config
1965 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1966 self
.db_vims
[target_id
] = vim
1967 self
.error_status
= None
1970 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1972 except Exception as e
:
1974 "Cannot load {} plugin={}: {} {}".format(
1975 target_id
, plugin_name
, step
, e
1979 self
.db_vims
[target_id
] = vim
or {}
1980 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1981 error_status
= "{} Error: {}".format(step
, e
)
1985 if target_id
not in self
.vim_targets
:
1986 self
.vim_targets
.append(target_id
)
1988 def _get_db_task(self
):
1990 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1995 if not self
.time_last_task_processed
:
1996 self
.time_last_task_processed
= now
2001 # Log RO tasks only when loglevel is DEBUG
2002 if self.logger.getEffectiveLevel() == logging.DEBUG:
2009 + str(self.task_locked_time)
2011 + "time_last_task_processed="
2012 + str(self.time_last_task_processed)
2018 locked
= self
.db
.set_one(
2021 "target_id": self
.vim_targets
,
2022 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2023 "locked_at.lt": now
- self
.task_locked_time
,
2024 "to_check_at.lt": self
.time_last_task_processed
,
2025 "to_check_at.gt": -1,
2027 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
2028 fail_on_empty
=False,
2033 ro_task
= self
.db
.get_one(
2036 "target_id": self
.vim_targets
,
2037 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2043 if self
.time_last_task_processed
== now
:
2044 self
.time_last_task_processed
= None
2047 self
.time_last_task_processed
= now
2048 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2050 except DbException
as e
:
2051 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
2052 except Exception as e
:
2053 self
.logger
.critical(
2054 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
2059 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2061 Determine if this task need to be done or superseded
2064 my_task
= ro_task
["tasks"][task_index
]
2065 task_id
= my_task
["task_id"]
2066 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
2067 "created_items", False
2070 self
.logger
.debug("Needed delete: {}".format(needed_delete
))
2071 if my_task
["status"] == "FAILED":
2072 return None, None # TODO need to be retry??
2075 for index
, task
in enumerate(ro_task
["tasks"]):
2076 if index
== task_index
or not task
:
2080 my_task
["target_record"] == task
["target_record"]
2081 and task
["action"] == "CREATE"
2084 db_update
["tasks.{}.status".format(index
)] = task
[
2087 elif task
["action"] == "CREATE" and task
["status"] not in (
2091 needed_delete
= False
2095 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
2097 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
2099 return "SUPERSEDED", None
2100 except Exception as e
:
2101 if not isinstance(e
, NsWorkerException
):
2102 self
.logger
.critical(
2103 "Unexpected exception at _delete_task task={}: {}".format(
2109 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2111 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2113 Determine if this task need to create something at VIM
2116 my_task
= ro_task
["tasks"][task_index
]
2117 task_id
= my_task
["task_id"]
2119 if my_task
["status"] == "FAILED":
2120 return None, None # TODO need to be retry??
2121 elif my_task
["status"] == "SCHEDULED":
2122 # check if already created by another task
2123 for index
, task
in enumerate(ro_task
["tasks"]):
2124 if index
== task_index
or not task
:
2127 if task
["action"] == "CREATE" and task
["status"] not in (
2132 return task
["status"], "COPY_VIM_INFO"
2135 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2136 ro_task
, task_index
, task_depends
2138 # TODO update other CREATE tasks
2139 except Exception as e
:
2140 if not isinstance(e
, NsWorkerException
):
2142 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2145 task_status
= "FAILED"
2146 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2147 # TODO update ro_vim_item_update
2149 return task_status
, ro_vim_item_update
2153 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2155 Look for dependency task
2156 :param task_id: Can be one of
2157 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2158 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2159 3. task.task_id: "<action_id>:number"
2162 :return: database ro_task plus index of task
2165 task_id
.startswith("vim:")
2166 or task_id
.startswith("sdn:")
2167 or task_id
.startswith("wim:")
2169 target_id
, _
, task_id
= task_id
.partition(" ")
2171 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2172 ro_task_dependency
= self
.db
.get_one(
2174 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2175 fail_on_empty
=False,
2178 if ro_task_dependency
:
2179 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2180 if task
["target_record_id"] == task_id
:
2181 return ro_task_dependency
, task_index
2185 for task_index
, task
in enumerate(ro_task
["tasks"]):
2186 if task
and task
["task_id"] == task_id
:
2187 return ro_task
, task_index
2189 ro_task_dependency
= self
.db
.get_one(
2192 "tasks.ANYINDEX.task_id": task_id
,
2193 "tasks.ANYINDEX.target_record.ne": None,
2195 fail_on_empty
=False,
2198 self
.logger
.debug("ro_task_dependency={}".format(ro_task_dependency
))
2199 if ro_task_dependency
:
2200 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2201 if task
["task_id"] == task_id
:
2202 return ro_task_dependency
, task_index
2203 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2205 def update_vm_refresh(self
, ro_task
):
2206 """Enables the VM status updates if self.refresh_config.active parameter
2207 is not -1 and then updates the DB accordingly
2211 self
.logger
.debug("Checking if VM status update config")
2212 next_refresh
= time
.time()
2213 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2215 if next_refresh
!= -1:
2216 db_ro_task_update
= {}
2218 next_check_at
= now
+ (24 * 60 * 60)
2219 next_check_at
= min(next_check_at
, next_refresh
)
2220 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2221 db_ro_task_update
["to_check_at"] = next_check_at
2224 "Finding tasks which to be updated to enable VM status updates"
2226 refresh_tasks
= self
.db
.get_list(
2229 "tasks.status": "DONE",
2230 "to_check_at.lt": 0,
2233 self
.logger
.debug("Updating tasks to change the to_check_at status")
2234 for task
in refresh_tasks
:
2241 update_dict
=db_ro_task_update
,
2245 except Exception as e
:
2246 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2248 def _get_next_refresh(self
, ro_task
: dict, next_refresh
: float):
2249 """Decide the next_refresh according to vim type and refresh config period.
2251 ro_task (dict): ro_task details
2252 next_refresh (float): next refresh time as epoch format
2255 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2257 target_vim
= ro_task
["target_id"]
2258 vim_type
= self
.db_vims
[target_vim
]["vim_type"]
2259 if self
.refresh_config
.active
== -1 or vim_type
== "openstack":
2262 next_refresh
+= self
.refresh_config
.active
2265 def _process_pending_tasks(self
, ro_task
):
2266 ro_task_id
= ro_task
["_id"]
2269 next_check_at
= now
+ (24 * 60 * 60)
2270 db_ro_task_update
= {}
2272 def _update_refresh(new_status
):
2273 # compute next_refresh
2275 nonlocal next_check_at
2276 nonlocal db_ro_task_update
2279 next_refresh
= time
.time()
2281 if task
["item"] in ("image", "flavor"):
2282 next_refresh
+= self
.refresh_config
.image
2283 elif new_status
== "BUILD":
2284 next_refresh
+= self
.refresh_config
.build
2285 elif new_status
== "DONE":
2286 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2288 next_refresh
+= self
.refresh_config
.error
2290 next_check_at
= min(next_check_at
, next_refresh
)
2291 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2292 ro_task
["vim_info"]["refresh_at"] = next_refresh
2296 # Log RO tasks only when loglevel is DEBUG
2297 if self.logger.getEffectiveLevel() == logging.DEBUG:
2298 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2300 # Check if vim status refresh is enabled again
2301 self
.update_vm_refresh(ro_task
)
2302 # 0: get task_status_create
2304 task_status_create
= None
2308 for t
in ro_task
["tasks"]
2310 and t
["action"] == "CREATE"
2311 and t
["status"] in ("BUILD", "DONE")
2317 task_status_create
= task_create
["status"]
2319 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2320 for task_action
in ("DELETE", "CREATE", "EXEC"):
2321 db_vim_update
= None
2324 for task_index
, task
in enumerate(ro_task
["tasks"]):
2326 continue # task deleted
2329 target_update
= None
2333 task_action
in ("DELETE", "EXEC")
2334 and task
["status"] not in ("SCHEDULED", "BUILD")
2336 or task
["action"] != task_action
2338 task_action
== "CREATE"
2339 and task
["status"] in ("FINISHED", "SUPERSEDED")
2344 task_path
= "tasks.{}.status".format(task_index
)
2346 db_vim_info_update
= None
2347 dependency_ro_task
= {}
2349 if task
["status"] == "SCHEDULED":
2350 # check if tasks that this depends on have been completed
2351 dependency_not_completed
= False
2353 for dependency_task_id
in task
.get("depends_on") or ():
2356 dependency_task_index
,
2357 ) = self
._get
_dependency
(
2358 dependency_task_id
, target_id
=ro_task
["target_id"]
2360 dependency_task
= dependency_ro_task
["tasks"][
2361 dependency_task_index
2364 "dependency_ro_task={} dependency_task_index={}".format(
2365 dependency_ro_task
, dependency_task_index
2369 if dependency_task
["status"] == "SCHEDULED":
2370 dependency_not_completed
= True
2371 next_check_at
= min(
2372 next_check_at
, dependency_ro_task
["to_check_at"]
2374 # must allow dependent task to be processed first
2375 # to do this set time after last_task_processed
2376 next_check_at
= max(
2377 self
.time_last_task_processed
, next_check_at
2380 elif dependency_task
["status"] == "FAILED":
2381 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2384 dependency_task
["action"],
2385 dependency_task
["item"],
2387 dependency_ro_task
["vim_info"].get(
2392 "task={} {}".format(task
["task_id"], error_text
)
2394 raise NsWorkerException(error_text
)
2396 task_depends
[dependency_task_id
] = dependency_ro_task
[
2400 "TASK-{}".format(dependency_task_id
)
2401 ] = dependency_ro_task
["vim_info"]["vim_id"]
2403 if dependency_not_completed
:
2404 self
.logger
.warning(
2405 "DEPENDENCY NOT COMPLETED {}".format(
2406 dependency_ro_task
["vim_info"]["vim_id"]
2409 # TODO set at vim_info.vim_details that it is waiting
2412 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2413 # the task of renew this locking. It will update database locket_at periodically
2415 lock_object
= LockRenew
.add_lock_object(
2416 "ro_tasks", ro_task
, self
2418 if task
["action"] == "DELETE":
2422 ) = self
._delete
_task
(
2423 ro_task
, task_index
, task_depends
, db_ro_task_update
2426 "FINISHED" if new_status
== "DONE" else new_status
2428 # ^with FINISHED instead of DONE it will not be refreshing
2430 if new_status
in ("FINISHED", "SUPERSEDED"):
2431 target_update
= "DELETE"
2432 elif task
["action"] == "EXEC":
2437 ) = self
.item2class
[task
["item"]].exec(
2438 ro_task
, task_index
, task_depends
2441 "FINISHED" if new_status
== "DONE" else new_status
2443 # ^with FINISHED instead of DONE it will not be refreshing
2446 # load into database the modified db_task_update "retries" and "next_retry"
2447 if db_task_update
.get("retries"):
2449 "tasks.{}.retries".format(task_index
)
2450 ] = db_task_update
["retries"]
2452 next_check_at
= time
.time() + db_task_update
.get(
2455 target_update
= None
2456 elif task
["action"] == "CREATE":
2457 if task
["status"] == "SCHEDULED":
2458 if task_status_create
:
2459 new_status
= task_status_create
2460 target_update
= "COPY_VIM_INFO"
2462 new_status
, db_vim_info_update
= self
.item2class
[
2464 ].new(ro_task
, task_index
, task_depends
)
2465 _update_refresh(new_status
)
2467 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2468 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2472 ) = self
.item2class
[
2475 _update_refresh(new_status
)
2477 # The refresh is updated to avoid set the value of "refresh_at" to
2478 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2479 # because it can happen that in this case the task is never processed
2480 _update_refresh(task
["status"])
2482 except Exception as e
:
2483 new_status
= "FAILED"
2484 db_vim_info_update
= {
2485 "vim_status": "VIM_ERROR",
2486 "vim_message": str(e
),
2490 e
, (NsWorkerException
, vimconn
.VimConnException
)
2493 "Unexpected exception at _delete_task task={}: {}".format(
2500 if db_vim_info_update
:
2501 db_vim_update
= db_vim_info_update
.copy()
2502 db_ro_task_update
.update(
2505 for k
, v
in db_vim_info_update
.items()
2508 ro_task
["vim_info"].update(db_vim_info_update
)
2511 if task_action
== "CREATE":
2512 task_status_create
= new_status
2513 db_ro_task_update
[task_path
] = new_status
2515 if target_update
or db_vim_update
:
2516 if target_update
== "DELETE":
2517 self
._update
_target
(task
, None)
2518 elif target_update
== "COPY_VIM_INFO":
2519 self
._update
_target
(task
, ro_task
["vim_info"])
2521 self
._update
_target
(task
, db_vim_update
)
2523 except Exception as e
:
2525 isinstance(e
, DbException
)
2526 and e
.http_code
== HTTPStatus
.NOT_FOUND
2528 # if the vnfrs or nsrs has been removed from database, this task must be removed
2530 "marking to delete task={}".format(task
["task_id"])
2532 self
.tasks_to_delete
.append(task
)
2535 "Unexpected exception at _update_target task={}: {}".format(
2541 locked_at
= ro_task
["locked_at"]
2545 lock_object
["locked_at"],
2546 lock_object
["locked_at"] + self
.task_locked_time
,
2548 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2549 # contain exactly locked_at + self.task_locked_time
2550 LockRenew
.remove_lock_object(lock_object
)
2553 "_id": ro_task
["_id"],
2554 "to_check_at": ro_task
["to_check_at"],
2555 "locked_at": locked_at
,
2557 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2558 # outside this task (by ro_nbi) do not update it
2559 db_ro_task_update
["locked_by"] = None
2560 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2561 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2562 db_ro_task_update
["modified_at"] = now
2563 db_ro_task_update
["to_check_at"] = next_check_at
2566 # Log RO tasks only when loglevel is DEBUG
2567 if self.logger.getEffectiveLevel() == logging.DEBUG:
2568 db_ro_task_update_log = db_ro_task_update.copy()
2569 db_ro_task_update_log["_id"] = q_filter["_id"]
2570 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2573 if not self
.db
.set_one(
2575 update_dict
=db_ro_task_update
,
2577 fail_on_empty
=False,
2579 del db_ro_task_update
["to_check_at"]
2580 del q_filter
["to_check_at"]
2582 # Log RO tasks only when loglevel is DEBUG
2583 if self.logger.getEffectiveLevel() == logging.DEBUG:
2586 db_ro_task_update_log,
2589 "SET_TASK " + str(q_filter),
2595 update_dict
=db_ro_task_update
,
2598 except DbException
as e
:
2600 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2602 except Exception as e
:
2604 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2607 def _update_target(self
, task
, ro_vim_item_update
):
2608 table
, _
, temp
= task
["target_record"].partition(":")
2609 _id
, _
, path_vim_status
= temp
.partition(":")
2610 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2611 path_item
= path_item
[: path_item
.rfind(".")]
2612 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2613 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2615 if ro_vim_item_update
:
2617 path_vim_status
+ "." + k
: v
2618 for k
, v
in ro_vim_item_update
.items()
2627 "interfaces_backup",
2631 if path_vim_status
.startswith("vdur."):
2632 # for backward compatibility, add vdur.name apart from vdur.vim_name
2633 if ro_vim_item_update
.get("vim_name"):
2634 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2636 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2637 if ro_vim_item_update
.get("vim_id"):
2638 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2640 # update general status
2641 if ro_vim_item_update
.get("vim_status"):
2642 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2646 if ro_vim_item_update
.get("interfaces"):
2647 path_interfaces
= path_item
+ ".interfaces"
2649 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2653 path_interfaces
+ ".{}.".format(i
) + k
: v
2654 for k
, v
in iface
.items()
2655 if k
in ("vlan", "compute_node", "pci")
2659 # put ip_address and mac_address with ip-address and mac-address
2660 if iface
.get("ip_address"):
2662 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2663 ] = iface
["ip_address"]
2665 if iface
.get("mac_address"):
2667 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2668 ] = iface
["mac_address"]
2670 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2671 update_dict
["ip-address"] = iface
.get("ip_address").split(
2675 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2676 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2680 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2682 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2683 if ro_vim_item_update
.get("interfaces"):
2684 search_key
= path_vim_status
+ ".interfaces"
2685 if update_dict
.get(search_key
):
2686 interfaces_backup_update
= {
2687 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
2692 q_filter
={"_id": _id
},
2693 update_dict
=interfaces_backup_update
,
2697 update_dict
= {path_item
+ ".status": "DELETED"}
2700 q_filter
={"_id": _id
},
2701 update_dict
=update_dict
,
2702 unset
={path_vim_status
: None},
2705 def _process_delete_db_tasks(self
):
2707 Delete task from database because vnfrs or nsrs or both have been deleted
2708 :return: None. Uses and modify self.tasks_to_delete
2710 while self
.tasks_to_delete
:
2711 task
= self
.tasks_to_delete
[0]
2712 vnfrs_deleted
= None
2713 nsr_id
= task
["nsr_id"]
2715 if task
["target_record"].startswith("vnfrs:"):
2716 # check if nsrs is present
2717 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2718 vnfrs_deleted
= task
["target_record"].split(":")[1]
2721 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2722 except Exception as e
:
2724 "Error deleting task={}: {}".format(task
["task_id"], e
)
2726 self
.tasks_to_delete
.pop(0)
2729 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2731 Static method because it is called from osm_ng_ro.ns
2732 :param db: instance of database to use
2733 :param nsr_id: affected nsrs id
2734 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2735 :return: None, exception is fails
2738 for retry
in range(retries
):
2739 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2743 for ro_task
in ro_tasks
:
2745 to_delete_ro_task
= True
2747 for index
, task
in enumerate(ro_task
["tasks"]):
2750 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2752 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2754 db_update
["tasks.{}".format(index
)] = None
2756 # used by other nsr, ro_task cannot be deleted
2757 to_delete_ro_task
= False
2759 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2760 if to_delete_ro_task
:
2764 "_id": ro_task
["_id"],
2765 "modified_at": ro_task
["modified_at"],
2767 fail_on_empty
=False,
2771 db_update
["modified_at"] = now
2775 "_id": ro_task
["_id"],
2776 "modified_at": ro_task
["modified_at"],
2778 update_dict
=db_update
,
2779 fail_on_empty
=False,
2785 raise NsWorkerException("Exceeded {} retries".format(retries
))
2789 self
.logger
.info("Starting")
2791 # step 1: get commands from queue
2793 if self
.vim_targets
:
2794 task
= self
.task_queue
.get(block
=False)
2797 self
.logger
.debug("enters in idle state")
2799 task
= self
.task_queue
.get(block
=True)
2802 if task
[0] == "terminate":
2804 elif task
[0] == "load_vim":
2805 self
.logger
.info("order to load vim {}".format(task
[1]))
2806 self
._load
_vim
(task
[1])
2807 elif task
[0] == "unload_vim":
2808 self
.logger
.info("order to unload vim {}".format(task
[1]))
2809 self
._unload
_vim
(task
[1])
2810 elif task
[0] == "reload_vim":
2811 self
._reload
_vim
(task
[1])
2812 elif task
[0] == "check_vim":
2813 self
.logger
.info("order to check vim {}".format(task
[1]))
2814 self
._check
_vim
(task
[1])
2816 except Exception as e
:
2817 if isinstance(e
, queue
.Empty
):
2820 self
.logger
.critical(
2821 "Error processing task: {}".format(e
), exc_info
=True
2824 # step 2: process pending_tasks, delete not needed tasks
2826 if self
.tasks_to_delete
:
2827 self
._process
_delete
_db
_tasks
()
2830 # Log RO tasks only when loglevel is DEBUG
2831 if self.logger.getEffectiveLevel() == logging.DEBUG:
2832 _ = self._get_db_all_tasks()
2834 ro_task
= self
._get
_db
_task
()
2836 self
.logger
.debug("Task to process: {}".format(ro_task
))
2838 self
._process
_pending
_tasks
(ro_task
)
2842 except Exception as e
:
2843 self
.logger
.critical(
2844 "Unexpected exception at run: " + str(e
), exc_info
=True
2847 self
.logger
.info("Finishing")