1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
30 from os
import makedirs
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
43 from osm_ro_plugin
import vimconn
44 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
45 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
219 "task={} {} new-net={} created={}".format(
220 task_id
, ro_task
["target_id"], vim_net_id
, created
224 return "BUILD", ro_vim_item_update
225 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
227 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
229 ro_vim_item_update
= {
230 "vim_status": "VIM_ERROR",
232 "vim_message": str(e
),
235 return "FAILED", ro_vim_item_update
237 def refresh(self
, ro_task
):
238 """Call VIM to get network status"""
239 ro_task_id
= ro_task
["_id"]
240 target_vim
= self
.my_vims
[ro_task
["target_id"]]
241 vim_id
= ro_task
["vim_info"]["vim_id"]
242 net_to_refresh_list
= [vim_id
]
245 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
246 vim_info
= vim_dict
[vim_id
]
248 if vim_info
["status"] == "ACTIVE":
250 elif vim_info
["status"] == "BUILD":
251 task_status
= "BUILD"
253 task_status
= "FAILED"
254 except vimconn
.VimConnException
as e
:
255 # Mark all tasks at VIM_ERROR status
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id
, ro_task
["target_id"], vim_id
, e
261 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
262 task_status
= "FAILED"
264 ro_vim_item_update
= {}
265 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
266 ro_vim_item_update
["vim_status"] = vim_info
["status"]
268 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
269 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
271 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
273 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
274 elif vim_info
["status"] == "DELETED":
275 ro_vim_item_update
["vim_id"] = None
276 ro_vim_item_update
["vim_message"] = "Deleted externally"
278 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
279 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
281 if ro_vim_item_update
:
283 "ro_task={} {} get-net={}: status={} {}".format(
285 ro_task
["target_id"],
287 ro_vim_item_update
.get("vim_status"),
288 ro_vim_item_update
.get("vim_message")
289 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
294 return task_status
, ro_vim_item_update
296 def delete(self
, ro_task
, task_index
):
297 task
= ro_task
["tasks"][task_index
]
298 task_id
= task
["task_id"]
299 net_vim_id
= ro_task
["vim_info"]["vim_id"]
300 ro_vim_item_update_ok
= {
301 "vim_status": "DELETED",
303 "vim_message": "DELETED",
308 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
309 target_vim
= self
.my_vims
[ro_task
["target_id"]]
310 target_vim
.delete_network(
311 net_vim_id
, ro_task
["vim_info"]["created_items"]
313 except vimconn
.VimConnNotFoundException
:
314 ro_vim_item_update_ok
["vim_message"] = "already deleted"
315 except vimconn
.VimConnException
as e
:
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
321 ro_vim_item_update
= {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e
),
326 return "FAILED", ro_vim_item_update
329 "task={} {} del-net={} {}".format(
331 ro_task
["target_id"],
333 ro_vim_item_update_ok
.get("vim_message", ""),
337 return "DONE", ro_vim_item_update_ok
340 class VimInteractionVdu(VimInteractionBase
):
341 max_retries_inject_ssh_key
= 20 # 20 times
342 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
344 def new(self
, ro_task
, task_index
, task_depends
):
345 task
= ro_task
["tasks"][task_index
]
346 task_id
= task
["task_id"]
348 target_vim
= self
.my_vims
[ro_task
["target_id"]]
351 params
= task
["params"]
352 params_copy
= deepcopy(params
)
353 net_list
= params_copy
["net_list"]
356 # change task_id into network_id
357 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
358 network_id
= task_depends
[net
["net_id"]]
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net
["net_id"])
366 net
["net_id"] = network_id
368 if params_copy
["image_id"].startswith("TASK-"):
369 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
371 if params_copy
["flavor_id"].startswith("TASK-"):
372 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
374 affinity_group_list
= params_copy
["affinity_group_list"]
375 for affinity_group
in affinity_group_list
:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group
and affinity_group
[
379 ].startswith("TASK-"):
380 affinity_group_id
= task_depends
[
381 affinity_group
["affinity_group_id"]
384 if not affinity_group_id
:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group
["affinity_group_id"])
389 affinity_group
["affinity_group_id"] = affinity_group_id
390 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
391 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
393 # add to created items previous_created_volumes (healing)
394 if task
.get("previous_created_volumes"):
395 for k
, v
in task
["previous_created_volumes"].items():
398 ro_vim_item_update
= {
400 "vim_status": "BUILD",
402 "created_items": created_items
,
405 "interfaces_vim_ids": interfaces
,
407 "interfaces_backup": [],
410 "task={} {} new-vm={} created={}".format(
411 task_id
, ro_task
["target_id"], vim_vm_id
, created
415 return "BUILD", ro_vim_item_update
416 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
417 self
.logger
.debug(traceback
.format_exc())
419 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
421 ro_vim_item_update
= {
422 "vim_status": "VIM_ERROR",
424 "vim_message": str(e
),
427 return "FAILED", ro_vim_item_update
429 def delete(self
, ro_task
, task_index
):
430 task
= ro_task
["tasks"][task_index
]
431 task_id
= task
["task_id"]
432 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
433 ro_vim_item_update_ok
= {
434 "vim_status": "DELETED",
436 "vim_message": "DELETED",
442 "delete_vminstance: vm_vim_id={} created_items={}".format(
443 vm_vim_id
, ro_task
["vim_info"]["created_items"]
446 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
447 target_vim
= self
.my_vims
[ro_task
["target_id"]]
448 target_vim
.delete_vminstance(
450 ro_task
["vim_info"]["created_items"],
451 ro_task
["vim_info"].get("volumes_to_hold", []),
453 except vimconn
.VimConnNotFoundException
:
454 ro_vim_item_update_ok
["vim_message"] = "already deleted"
455 except vimconn
.VimConnException
as e
:
457 "ro_task={} vim={} del-vm={}: {}".format(
458 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
461 ro_vim_item_update
= {
462 "vim_status": "VIM_ERROR",
463 "vim_message": "Error while deleting: {}".format(e
),
466 return "FAILED", ro_vim_item_update
469 "task={} {} del-vm={} {}".format(
471 ro_task
["target_id"],
473 ro_vim_item_update_ok
.get("vim_message", ""),
477 return "DONE", ro_vim_item_update_ok
479 def refresh(self
, ro_task
):
480 """Call VIM to get vm status"""
481 ro_task_id
= ro_task
["_id"]
482 target_vim
= self
.my_vims
[ro_task
["target_id"]]
483 vim_id
= ro_task
["vim_info"]["vim_id"]
488 vm_to_refresh_list
= [vim_id
]
490 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
491 vim_info
= vim_dict
[vim_id
]
493 if vim_info
["status"] == "ACTIVE":
495 elif vim_info
["status"] == "BUILD":
496 task_status
= "BUILD"
498 task_status
= "FAILED"
500 # try to load and parse vim_information
502 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
503 if vim_info_info
.get("name"):
504 vim_info
["name"] = vim_info_info
["name"]
505 except Exception as vim_info_error
:
506 self
.logger
.exception(
507 f
"{vim_info_error} occured while getting the vim_info from yaml"
509 except vimconn
.VimConnException
as e
:
510 # Mark all tasks at VIM_ERROR status
512 "ro_task={} vim={} get-vm={}: {}".format(
513 ro_task_id
, ro_task
["target_id"], vim_id
, e
516 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
517 task_status
= "FAILED"
519 ro_vim_item_update
= {}
521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
523 if vim_info
.get("interfaces"):
524 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
528 for iface
in vim_info
["interfaces"]
529 if vim_iface_id
== iface
["vim_interface_id"]
534 # iface.pop("vim_info", None)
535 vim_interfaces
.append(iface
)
539 for t
in ro_task
["tasks"]
540 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
542 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
543 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
547 mgmt_vdu_iface
= task_create
.get(
548 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
551 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
553 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
554 ro_vim_item_update
["interfaces"] = vim_interfaces
556 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
557 ro_vim_item_update
["vim_status"] = vim_info
["status"]
559 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
560 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
562 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
563 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
564 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
565 elif vim_info
["status"] == "DELETED":
566 ro_vim_item_update
["vim_id"] = None
567 ro_vim_item_update
["vim_message"] = "Deleted externally"
569 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
570 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
572 if ro_vim_item_update
:
574 "ro_task={} {} get-vm={}: status={} {}".format(
576 ro_task
["target_id"],
578 ro_vim_item_update
.get("vim_status"),
579 ro_vim_item_update
.get("vim_message")
580 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
585 return task_status
, ro_vim_item_update
587 def exec(self
, ro_task
, task_index
, task_depends
):
588 task
= ro_task
["tasks"][task_index
]
589 task_id
= task
["task_id"]
590 target_vim
= self
.my_vims
[ro_task
["target_id"]]
591 db_task_update
= {"retries": 0}
592 retries
= task
.get("retries", 0)
595 params
= task
["params"]
596 params_copy
= deepcopy(params
)
597 params_copy
["ro_key"] = self
.db
.decrypt(
598 params_copy
.pop("private_key"),
599 params_copy
.pop("schema_version"),
600 params_copy
.pop("salt"),
602 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
603 target_vim
.inject_user_key(**params_copy
)
605 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
612 ) # params_copy["key"]
613 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
616 self
.logger
.debug(traceback
.format_exc())
617 if retries
< self
.max_retries_inject_ssh_key
:
623 "next_retry": self
.time_retries_inject_ssh_key
,
628 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
630 ro_vim_item_update
= {"vim_message": str(e
)}
632 return "FAILED", ro_vim_item_update
, db_task_update
635 class VimInteractionImage(VimInteractionBase
):
636 def new(self
, ro_task
, task_index
, task_depends
):
637 task
= ro_task
["tasks"][task_index
]
638 task_id
= task
["task_id"]
641 target_vim
= self
.my_vims
[ro_task
["target_id"]]
646 if task
.get("find_params"):
647 vim_images
= target_vim
.get_image_list(
648 task
["find_params"].get("filter_dict", {})
652 raise NsWorkerExceptionNotFound(
653 "Image not found with this criteria: '{}'".format(
657 elif len(vim_images
) > 1:
658 raise NsWorkerException(
659 "More than one image found with this criteria: '{}'".format(
664 vim_image_id
= vim_images
[0]["id"]
666 ro_vim_item_update
= {
667 "vim_id": vim_image_id
,
668 "vim_status": "ACTIVE",
670 "created_items": created_items
,
675 "task={} {} new-image={} created={}".format(
676 task_id
, ro_task
["target_id"], vim_image_id
, created
680 return "DONE", ro_vim_item_update
681 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
683 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
685 ro_vim_item_update
= {
686 "vim_status": "VIM_ERROR",
688 "vim_message": str(e
),
691 return "FAILED", ro_vim_item_update
694 class VimInteractionSharedVolume(VimInteractionBase
):
695 def delete(self
, ro_task
, task_index
):
696 task
= ro_task
["tasks"][task_index
]
697 task_id
= task
["task_id"]
698 shared_volume_vim_id
= ro_task
["vim_info"]["vim_id"]
699 created_items
= ro_task
["vim_info"]["created_items"]
700 ro_vim_item_update_ok
= {
701 "vim_status": "DELETED",
703 "vim_message": "DELETED",
706 if created_items
and created_items
.get(shared_volume_vim_id
).get("keep"):
707 ro_vim_item_update_ok
= {
708 "vim_status": "ACTIVE",
712 return "DONE", ro_vim_item_update_ok
714 if shared_volume_vim_id
:
715 target_vim
= self
.my_vims
[ro_task
["target_id"]]
716 target_vim
.delete_shared_volumes(shared_volume_vim_id
)
717 except vimconn
.VimConnNotFoundException
:
718 ro_vim_item_update_ok
["vim_message"] = "already deleted"
719 except vimconn
.VimConnException
as e
:
721 "ro_task={} vim={} del-shared-volume={}: {}".format(
722 ro_task
["_id"], ro_task
["target_id"], shared_volume_vim_id
, e
725 ro_vim_item_update
= {
726 "vim_status": "VIM_ERROR",
727 "vim_message": "Error while deleting: {}".format(e
),
730 return "FAILED", ro_vim_item_update
733 "task={} {} del-shared-volume={} {}".format(
735 ro_task
["target_id"],
736 shared_volume_vim_id
,
737 ro_vim_item_update_ok
.get("vim_message", ""),
741 return "DONE", ro_vim_item_update_ok
743 def new(self
, ro_task
, task_index
, task_depends
):
744 task
= ro_task
["tasks"][task_index
]
745 task_id
= task
["task_id"]
748 target_vim
= self
.my_vims
[ro_task
["target_id"]]
751 shared_volume_vim_id
= None
752 shared_volume_data
= None
754 if task
.get("params"):
755 shared_volume_data
= task
["params"]
757 if shared_volume_data
:
759 f
"Creating the new shared_volume for {shared_volume_data}\n"
763 shared_volume_vim_id
,
764 ) = target_vim
.new_shared_volumes(shared_volume_data
)
766 created_items
[shared_volume_vim_id
] = {
767 "name": shared_volume_name
,
768 "keep": shared_volume_data
.get("keep"),
771 ro_vim_item_update
= {
772 "vim_id": shared_volume_vim_id
,
773 "vim_status": "ACTIVE",
775 "created_items": created_items
,
780 "task={} {} new-shared-volume={} created={}".format(
781 task_id
, ro_task
["target_id"], shared_volume_vim_id
, created
785 return "DONE", ro_vim_item_update
786 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
788 "task={} vim={} new-shared-volume:"
789 " {}".format(task_id
, ro_task
["target_id"], e
)
791 ro_vim_item_update
= {
792 "vim_status": "VIM_ERROR",
794 "vim_message": str(e
),
797 return "FAILED", ro_vim_item_update
800 class VimInteractionFlavor(VimInteractionBase
):
801 def delete(self
, ro_task
, task_index
):
802 task
= ro_task
["tasks"][task_index
]
803 task_id
= task
["task_id"]
804 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
805 ro_vim_item_update_ok
= {
806 "vim_status": "DELETED",
808 "vim_message": "DELETED",
814 target_vim
= self
.my_vims
[ro_task
["target_id"]]
815 target_vim
.delete_flavor(flavor_vim_id
)
816 except vimconn
.VimConnNotFoundException
:
817 ro_vim_item_update_ok
["vim_message"] = "already deleted"
818 except vimconn
.VimConnException
as e
:
820 "ro_task={} vim={} del-flavor={}: {}".format(
821 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
824 ro_vim_item_update
= {
825 "vim_status": "VIM_ERROR",
826 "vim_message": "Error while deleting: {}".format(e
),
829 return "FAILED", ro_vim_item_update
832 "task={} {} del-flavor={} {}".format(
834 ro_task
["target_id"],
836 ro_vim_item_update_ok
.get("vim_message", ""),
840 return "DONE", ro_vim_item_update_ok
842 def new(self
, ro_task
, task_index
, task_depends
):
843 task
= ro_task
["tasks"][task_index
]
844 task_id
= task
["task_id"]
847 target_vim
= self
.my_vims
[ro_task
["target_id"]]
852 if task
.get("find_params", {}).get("vim_flavor_id"):
853 vim_flavor_id
= task
["find_params"]["vim_flavor_id"]
854 elif task
.get("find_params", {}).get("flavor_data"):
856 flavor_data
= task
["find_params"]["flavor_data"]
857 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
858 except vimconn
.VimConnNotFoundException
as flavor_not_found_msg
:
860 f
"VimConnNotFoundException occured: {flavor_not_found_msg}"
863 if not vim_flavor_id
and task
.get("params"):
865 flavor_data
= task
["params"]["flavor_data"]
866 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
869 ro_vim_item_update
= {
870 "vim_id": vim_flavor_id
,
871 "vim_status": "ACTIVE",
873 "created_items": created_items
,
878 "task={} {} new-flavor={} created={}".format(
879 task_id
, ro_task
["target_id"], vim_flavor_id
, created
883 return "DONE", ro_vim_item_update
884 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
886 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
888 ro_vim_item_update
= {
889 "vim_status": "VIM_ERROR",
891 "vim_message": str(e
),
894 return "FAILED", ro_vim_item_update
897 class VimInteractionAffinityGroup(VimInteractionBase
):
898 def delete(self
, ro_task
, task_index
):
899 task
= ro_task
["tasks"][task_index
]
900 task_id
= task
["task_id"]
901 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
902 ro_vim_item_update_ok
= {
903 "vim_status": "DELETED",
905 "vim_message": "DELETED",
910 if affinity_group_vim_id
:
911 target_vim
= self
.my_vims
[ro_task
["target_id"]]
912 target_vim
.delete_affinity_group(affinity_group_vim_id
)
913 except vimconn
.VimConnNotFoundException
:
914 ro_vim_item_update_ok
["vim_message"] = "already deleted"
915 except vimconn
.VimConnException
as e
:
917 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
918 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
921 ro_vim_item_update
= {
922 "vim_status": "VIM_ERROR",
923 "vim_message": "Error while deleting: {}".format(e
),
926 return "FAILED", ro_vim_item_update
929 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
931 ro_task
["target_id"],
932 affinity_group_vim_id
,
933 ro_vim_item_update_ok
.get("vim_message", ""),
937 return "DONE", ro_vim_item_update_ok
939 def new(self
, ro_task
, task_index
, task_depends
):
940 task
= ro_task
["tasks"][task_index
]
941 task_id
= task
["task_id"]
944 target_vim
= self
.my_vims
[ro_task
["target_id"]]
947 affinity_group_vim_id
= None
948 affinity_group_data
= None
949 param_affinity_group_id
= ""
951 if task
.get("params"):
952 affinity_group_data
= task
["params"].get("affinity_group_data")
954 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
956 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
957 "vim-affinity-group-id"
959 affinity_group_vim_id
= target_vim
.get_affinity_group(
960 param_affinity_group_id
962 except vimconn
.VimConnNotFoundException
:
964 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
965 "could not be found at VIM. Creating a new one.".format(
966 task_id
, ro_task
["target_id"], param_affinity_group_id
970 if not affinity_group_vim_id
and affinity_group_data
:
971 affinity_group_vim_id
= target_vim
.new_affinity_group(
976 ro_vim_item_update
= {
977 "vim_id": affinity_group_vim_id
,
978 "vim_status": "ACTIVE",
980 "created_items": created_items
,
985 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
986 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
990 return "DONE", ro_vim_item_update
991 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
993 "task={} vim={} new-affinity-or-anti-affinity-group:"
994 " {}".format(task_id
, ro_task
["target_id"], e
)
996 ro_vim_item_update
= {
997 "vim_status": "VIM_ERROR",
999 "vim_message": str(e
),
1002 return "FAILED", ro_vim_item_update
1005 class VimInteractionUpdateVdu(VimInteractionBase
):
1006 def exec(self
, ro_task
, task_index
, task_depends
):
1007 task
= ro_task
["tasks"][task_index
]
1008 task_id
= task
["task_id"]
1009 db_task_update
= {"retries": 0}
1012 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1016 if task
.get("params"):
1017 vim_vm_id
= task
["params"].get("vim_vm_id")
1018 action
= task
["params"].get("action")
1019 context
= {action
: action
}
1020 target_vim
.action_vminstance(vim_vm_id
, context
)
1022 ro_vim_item_update
= {
1023 "vim_id": vim_vm_id
,
1024 "vim_status": "ACTIVE",
1026 "created_items": created_items
,
1027 "vim_details": None,
1028 "vim_message": None,
1031 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1033 return "DONE", ro_vim_item_update
, db_task_update
1034 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1036 "task={} vim={} VM Migration:"
1037 " {}".format(task_id
, ro_task
["target_id"], e
)
1039 ro_vim_item_update
= {
1040 "vim_status": "VIM_ERROR",
1042 "vim_message": str(e
),
1045 return "FAILED", ro_vim_item_update
, db_task_update
1048 class VimInteractionSdnNet(VimInteractionBase
):
1050 def _match_pci(port_pci
, mapping
):
1052 Check if port_pci matches with mapping.
1053 The mapping can have brackets to indicate that several chars are accepted. e.g
1054 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1055 :param port_pci: text
1056 :param mapping: text, can contain brackets to indicate several chars are available
1057 :return: True if matches, False otherwise
1059 if not port_pci
or not mapping
:
1061 if port_pci
== mapping
:
1067 bracket_start
= mapping
.find("[", mapping_index
)
1069 if bracket_start
== -1:
1072 bracket_end
= mapping
.find("]", bracket_start
)
1073 if bracket_end
== -1:
1076 length
= bracket_start
- mapping_index
1079 and port_pci
[pci_index
: pci_index
+ length
]
1080 != mapping
[mapping_index
:bracket_start
]
1085 port_pci
[pci_index
+ length
]
1086 not in mapping
[bracket_start
+ 1 : bracket_end
]
1090 pci_index
+= length
+ 1
1091 mapping_index
= bracket_end
+ 1
1093 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
1098 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
1100 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1101 :param vim_account_id:
1106 for vld
in vlds_to_connect
:
1107 table
, _
, db_id
= vld
.partition(":")
1108 db_id
, _
, vld
= db_id
.partition(":")
1109 _
, _
, vld_id
= vld
.partition(".")
1111 if table
== "vnfrs":
1112 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
1113 iface_key
= "vnf-vld-id"
1114 else: # table == "nsrs"
1115 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1116 iface_key
= "ns-vld-id"
1118 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1120 for db_vnfr
in db_vnfrs
:
1121 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1122 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1123 if interface
.get(iface_key
) == vld_id
and interface
.get(
1125 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1127 interface_
= interface
.copy()
1128 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1129 db_vnfr
["_id"], vdu_index
, iface_index
1132 if vdur
.get("status") == "ERROR":
1133 interface_
["status"] = "ERROR"
1135 interfaces
.append(interface_
)
1139 def refresh(self
, ro_task
):
1140 # look for task create
1141 task_create_index
, _
= next(
1143 for i_t
in enumerate(ro_task
["tasks"])
1145 and i_t
[1]["action"] == "CREATE"
1146 and i_t
[1]["status"] != "FINISHED"
1149 return self
.new(ro_task
, task_create_index
, None)
1151 def new(self
, ro_task
, task_index
, task_depends
):
1152 task
= ro_task
["tasks"][task_index
]
1153 task_id
= task
["task_id"]
1154 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1156 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1158 created_items
= ro_task
["vim_info"].get("created_items")
1159 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1160 new_connected_ports
= []
1161 last_update
= ro_task
["vim_info"].get("last_update", 0)
1162 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1164 created
= ro_task
["vim_info"].get("created", False)
1169 params
= task
["params"]
1170 vlds_to_connect
= params
.get("vlds", [])
1171 associated_vim
= params
.get("target_vim")
1172 # external additional ports
1173 additional_ports
= params
.get("sdn-ports") or ()
1174 _
, _
, vim_account_id
= (
1176 if associated_vim
is None
1177 else associated_vim
.partition(":")
1181 # get associated VIM
1182 if associated_vim
not in self
.db_vims
:
1183 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1184 "vim_accounts", {"_id": vim_account_id
}
1187 db_vim
= self
.db_vims
[associated_vim
]
1189 # look for ports to connect
1190 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1194 pending_ports
= error_ports
= 0
1196 sdn_need_update
= False
1199 vlan_used
= port
.get("vlan") or vlan_used
1201 # TODO. Do not connect if already done
1202 if not port
.get("compute_node") or not port
.get("pci"):
1203 if port
.get("status") == "ERROR":
1210 compute_node_mappings
= next(
1213 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1214 if c
and c
["compute_node"] == port
["compute_node"]
1219 if compute_node_mappings
:
1220 # process port_mapping pci of type 0000:af:1[01].[1357]
1224 for p
in compute_node_mappings
["ports"]
1225 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1231 if not db_vim
["config"].get("mapping_not_needed"):
1233 "Port mapping not found for compute_node={} pci={}".format(
1234 port
["compute_node"], port
["pci"]
1241 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1243 "service_endpoint_id": pmap
.get("service_endpoint_id")
1244 or service_endpoint_id
,
1245 "service_endpoint_encapsulation_type": "dot1q"
1246 if port
["type"] == "SR-IOV"
1248 "service_endpoint_encapsulation_info": {
1249 "vlan": port
.get("vlan"),
1250 "mac": port
.get("mac-address"),
1251 "device_id": pmap
.get("device_id") or port
["compute_node"],
1252 "device_interface_id": pmap
.get("device_interface_id")
1254 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1255 "switch_port": pmap
.get("switch_port"),
1256 "service_mapping_info": pmap
.get("service_mapping_info"),
1261 # if port["modified_at"] > last_update:
1262 # sdn_need_update = True
1263 new_connected_ports
.append(port
["id"]) # TODO
1264 sdn_ports
.append(new_port
)
1268 "{} interfaces have not been created as VDU is on ERROR status".format(
1273 # connect external ports
1274 for index
, additional_port
in enumerate(additional_ports
):
1275 additional_port_id
= additional_port
.get(
1276 "service_endpoint_id"
1277 ) or "external-{}".format(index
)
1280 "service_endpoint_id": additional_port_id
,
1281 "service_endpoint_encapsulation_type": additional_port
.get(
1282 "service_endpoint_encapsulation_type", "dot1q"
1284 "service_endpoint_encapsulation_info": {
1285 "vlan": additional_port
.get("vlan") or vlan_used
,
1286 "mac": additional_port
.get("mac_address"),
1287 "device_id": additional_port
.get("device_id"),
1288 "device_interface_id": additional_port
.get(
1289 "device_interface_id"
1291 "switch_dpid": additional_port
.get("switch_dpid")
1292 or additional_port
.get("switch_id"),
1293 "switch_port": additional_port
.get("switch_port"),
1294 "service_mapping_info": additional_port
.get(
1295 "service_mapping_info"
1300 new_connected_ports
.append(additional_port_id
)
1303 # if there are more ports to connect or they have been modified, call create/update
1305 sdn_status
= "ERROR"
1306 sdn_info
= "; ".join(error_list
)
1307 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1308 last_update
= time
.time()
1311 if len(sdn_ports
) < 2:
1312 sdn_status
= "ACTIVE"
1314 if not pending_ports
:
1316 "task={} {} new-sdn-net done, less than 2 ports".format(
1317 task_id
, ro_task
["target_id"]
1321 net_type
= params
.get("type") or "ELAN"
1325 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1328 "task={} {} new-sdn-net={} created={}".format(
1329 task_id
, ro_task
["target_id"], sdn_net_id
, created
1333 created_items
= target_vim
.edit_connectivity_service(
1334 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1338 "task={} {} update-sdn-net={} created={}".format(
1339 task_id
, ro_task
["target_id"], sdn_net_id
, created
1343 connected_ports
= new_connected_ports
1345 wim_status_dict
= target_vim
.get_connectivity_service_status(
1346 sdn_net_id
, conn_info
=created_items
1348 sdn_status
= wim_status_dict
["sdn_status"]
1350 if wim_status_dict
.get("sdn_info"):
1351 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1353 if wim_status_dict
.get("error_msg"):
1354 sdn_info
= wim_status_dict
.get("error_msg") or ""
1357 if sdn_status
!= "ERROR":
1358 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1359 len(ports
) - pending_ports
, len(ports
)
1362 if sdn_status
== "ACTIVE":
1363 sdn_status
= "BUILD"
1365 ro_vim_item_update
= {
1366 "vim_id": sdn_net_id
,
1367 "vim_status": sdn_status
,
1369 "created_items": created_items
,
1370 "connected_ports": connected_ports
,
1371 "vim_details": sdn_info
,
1372 "vim_message": None,
1373 "last_update": last_update
,
1376 return sdn_status
, ro_vim_item_update
1377 except Exception as e
:
1379 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1380 exc_info
=not isinstance(
1381 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1384 ro_vim_item_update
= {
1385 "vim_status": "VIM_ERROR",
1387 "vim_message": str(e
),
1390 return "FAILED", ro_vim_item_update
1392 def delete(self
, ro_task
, task_index
):
1393 task
= ro_task
["tasks"][task_index
]
1394 task_id
= task
["task_id"]
1395 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1396 ro_vim_item_update_ok
= {
1397 "vim_status": "DELETED",
1399 "vim_message": "DELETED",
1405 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1406 target_vim
.delete_connectivity_service(
1407 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1410 except Exception as e
:
1412 isinstance(e
, sdnconn
.SdnConnectorError
)
1413 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1415 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1418 "ro_task={} vim={} del-sdn-net={}: {}".format(
1419 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1421 exc_info
=not isinstance(
1422 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1425 ro_vim_item_update
= {
1426 "vim_status": "VIM_ERROR",
1427 "vim_message": "Error while deleting: {}".format(e
),
1430 return "FAILED", ro_vim_item_update
1433 "task={} {} del-sdn-net={} {}".format(
1435 ro_task
["target_id"],
1437 ro_vim_item_update_ok
.get("vim_message", ""),
1441 return "DONE", ro_vim_item_update_ok
1444 class VimInteractionMigration(VimInteractionBase
):
1445 def exec(self
, ro_task
, task_index
, task_depends
):
1446 task
= ro_task
["tasks"][task_index
]
1447 task_id
= task
["task_id"]
1448 db_task_update
= {"retries": 0}
1449 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1453 refreshed_vim_info
= {}
1457 if task
.get("params"):
1458 vim_vm_id
= task
["params"].get("vim_vm_id")
1459 migrate_host
= task
["params"].get("migrate_host")
1460 _
, migrated_compute_node
= target_vim
.migrate_instance(
1461 vim_vm_id
, migrate_host
1464 if migrated_compute_node
:
1465 # When VM is migrated, vdu["vim_info"] needs to be updated
1466 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1467 ro_task
["target_id"]
1470 # Refresh VM to get new vim_info
1471 vm_to_refresh_list
= [vim_vm_id
]
1472 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1473 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1475 if refreshed_vim_info
.get("interfaces"):
1476 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1480 for iface
in refreshed_vim_info
["interfaces"]
1481 if old_iface
["vim_interface_id"]
1482 == iface
["vim_interface_id"]
1486 vim_interfaces
.append(iface
)
1488 ro_vim_item_update
= {
1489 "vim_id": vim_vm_id
,
1490 "vim_status": "ACTIVE",
1492 "created_items": created_items
,
1493 "vim_details": None,
1494 "vim_message": None,
1497 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1501 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1504 ro_vim_item_update
["interfaces"] = vim_interfaces
1507 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1510 return "DONE", ro_vim_item_update
, db_task_update
1512 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1514 "task={} vim={} VM Migration:"
1515 " {}".format(task_id
, ro_task
["target_id"], e
)
1517 ro_vim_item_update
= {
1518 "vim_status": "VIM_ERROR",
1520 "vim_message": str(e
),
1523 return "FAILED", ro_vim_item_update
, db_task_update
1526 class VimInteractionResize(VimInteractionBase
):
1527 def exec(self
, ro_task
, task_index
, task_depends
):
1528 task
= ro_task
["tasks"][task_index
]
1529 task_id
= task
["task_id"]
1530 db_task_update
= {"retries": 0}
1532 target_flavor_uuid
= None
1534 refreshed_vim_info
= {}
1535 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1538 params
= task
["params"]
1539 params_copy
= deepcopy(params
)
1540 target_flavor_uuid
= task_depends
[params_copy
["flavor_id"]]
1542 if task
.get("params"):
1543 self
.logger
.info("vim_vm_id %s", vim_vm_id
)
1545 if target_flavor_uuid
is not None:
1546 resized_status
= target_vim
.resize_instance(
1547 vim_vm_id
, target_flavor_uuid
1551 # Refresh VM to get new vim_info
1552 vm_to_refresh_list
= [vim_vm_id
]
1553 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1554 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1556 ro_vim_item_update
= {
1557 "vim_id": vim_vm_id
,
1558 "vim_status": "ACTIVE",
1560 "created_items": created_items
,
1561 "vim_details": None,
1562 "vim_message": None,
1565 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1569 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1572 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1574 return "DONE", ro_vim_item_update
, db_task_update
1575 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1577 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1579 ro_vim_item_update
= {
1580 "vim_status": "VIM_ERROR",
1582 "vim_message": str(e
),
1585 return "FAILED", ro_vim_item_update
, db_task_update
1588 class ConfigValidate
:
1589 def __init__(self
, config
: Dict
):
1594 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1596 self
.conf
["period"]["refresh_active"] >= 60
1597 or self
.conf
["period"]["refresh_active"] == -1
1599 return self
.conf
["period"]["refresh_active"]
1605 return self
.conf
["period"]["refresh_build"]
1609 return self
.conf
["period"]["refresh_image"]
1613 return self
.conf
["period"]["refresh_error"]
1616 def queue_size(self
):
1617 return self
.conf
["period"]["queue_size"]
1620 class NsWorker(threading
.Thread
):
1621 def __init__(self
, worker_index
, config
, plugins
, db
):
1623 :param worker_index: thread index
1624 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1625 :param plugins: global shared dict with the loaded plugins
1626 :param db: database class instance to use
1628 threading
.Thread
.__init
__(self
)
1629 self
.config
= config
1630 self
.plugins
= plugins
1631 self
.plugin_name
= "unknown"
1632 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1633 self
.worker_index
= worker_index
1634 # refresh periods for created items
1635 self
.refresh_config
= ConfigValidate(config
)
1636 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1637 # targetvim: vimplugin class
1639 # targetvim: vim information from database
1642 self
.vim_targets
= []
1643 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1646 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1647 "shared-volumes": VimInteractionSharedVolume(
1648 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1650 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1651 "image": VimInteractionImage(
1652 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1654 "flavor": VimInteractionFlavor(
1655 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1657 "sdn_net": VimInteractionSdnNet(
1658 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1660 "update": VimInteractionUpdateVdu(
1661 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1663 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1664 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1666 "migrate": VimInteractionMigration(
1667 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1669 "verticalscale": VimInteractionResize(
1670 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1673 self
.time_last_task_processed
= None
1674 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1675 self
.tasks_to_delete
= []
1676 # it is idle when there are not vim_targets associated
1678 self
.task_locked_time
= config
["global"]["task_locked_time"]
1680 def insert_task(self
, task
):
1682 self
.task_queue
.put(task
, False)
1685 raise NsWorkerException("timeout inserting a task")
1687 def terminate(self
):
1688 self
.insert_task("exit")
1690 def del_task(self
, task
):
1691 with self
.task_lock
:
1692 if task
["status"] == "SCHEDULED":
1693 task
["status"] = "SUPERSEDED"
1695 else: # task["status"] == "processing"
1696 self
.task_lock
.release()
1699 def _process_vim_config(self
, target_id
: str, db_vim
: dict) -> None:
1701 Process vim config, creating vim configuration files as ca_cert
1702 :param target_id: vim/sdn/wim + id
1703 :param db_vim: Vim dictionary obtained from database
1704 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1706 if not db_vim
.get("config"):
1710 work_dir
= "/app/osm_ro/certs"
1713 if db_vim
["config"].get("ca_cert_content"):
1714 file_name
= f
"{work_dir}/{target_id}:{self.worker_index}"
1716 if not path
.isdir(file_name
):
1719 file_name
= file_name
+ "/ca_cert"
1721 with
open(file_name
, "w") as f
:
1722 f
.write(db_vim
["config"]["ca_cert_content"])
1723 del db_vim
["config"]["ca_cert_content"]
1724 db_vim
["config"]["ca_cert"] = file_name
1725 except Exception as e
:
1726 raise NsWorkerException(
1727 "Error writing to file '{}': {}".format(file_name
, e
)
1730 def _load_plugin(self
, name
, type="vim"):
1731 # type can be vim or sdn
1732 if "rovim_dummy" not in self
.plugins
:
1733 self
.plugins
["rovim_dummy"] = VimDummyConnector
1735 if "rosdn_dummy" not in self
.plugins
:
1736 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1738 if name
in self
.plugins
:
1739 return self
.plugins
[name
]
1742 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1743 self
.plugins
[name
] = ep
.load()
1744 except Exception as e
:
1745 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1747 if name
and name
not in self
.plugins
:
1748 raise NsWorkerException(
1749 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1752 return self
.plugins
[name
]
1754 def _unload_vim(self
, target_id
):
1756 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1757 :param target_id: Contains type:_id; where type can be 'vim', ...
1761 self
.db_vims
.pop(target_id
, None)
1762 self
.my_vims
.pop(target_id
, None)
1764 if target_id
in self
.vim_targets
:
1765 self
.vim_targets
.remove(target_id
)
1767 self
.logger
.info("Unloaded {}".format(target_id
))
1768 except Exception as e
:
1769 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1771 def _check_vim(self
, target_id
):
1773 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1774 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1777 target
, _
, _id
= target_id
.partition(":")
1783 loaded
= target_id
in self
.vim_targets
1794 step
= "Getting {} from db".format(target_id
)
1795 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1797 for op_index
, operation
in enumerate(
1798 db_vim
["_admin"].get("operations", ())
1800 if operation
["operationState"] != "PROCESSING":
1803 locked_at
= operation
.get("locked_at")
1805 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1806 # some other thread is doing this operation
1810 op_text
= "_admin.operations.{}.".format(op_index
)
1812 if not self
.db
.set_one(
1816 op_text
+ "operationState": "PROCESSING",
1817 op_text
+ "locked_at": locked_at
,
1820 op_text
+ "locked_at": now
,
1821 "admin.current_operation": op_index
,
1823 fail_on_empty
=False,
1827 unset_dict
[op_text
+ "locked_at"] = None
1828 unset_dict
["current_operation"] = None
1829 step
= "Loading " + target_id
1830 error_text
= self
._load
_vim
(target_id
)
1833 step
= "Checking connectivity"
1836 self
.my_vims
[target_id
].check_vim_connectivity()
1838 self
.my_vims
[target_id
].check_credentials()
1840 update_dict
["_admin.operationalState"] = "ENABLED"
1841 update_dict
["_admin.detailed-status"] = ""
1842 unset_dict
[op_text
+ "detailed-status"] = None
1843 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1847 except Exception as e
:
1848 error_text
= "{}: {}".format(step
, e
)
1849 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1852 if update_dict
or unset_dict
:
1854 update_dict
[op_text
+ "operationState"] = "FAILED"
1855 update_dict
[op_text
+ "detailed-status"] = error_text
1856 unset_dict
.pop(op_text
+ "detailed-status", None)
1857 update_dict
["_admin.operationalState"] = "ERROR"
1858 update_dict
["_admin.detailed-status"] = error_text
1861 update_dict
[op_text
+ "statusEnteredTime"] = now
1865 q_filter
={"_id": _id
},
1866 update_dict
=update_dict
,
1868 fail_on_empty
=False,
1872 self
._unload
_vim
(target_id
)
1874 def _reload_vim(self
, target_id
):
1875 if target_id
in self
.vim_targets
:
1876 self
._load
_vim
(target_id
)
1878 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1879 # just remove it to force load again next time it is needed
1880 self
.db_vims
.pop(target_id
, None)
1882 def _load_vim(self
, target_id
):
1884 Load or reload a vim_account, sdn_controller or wim_account.
1885 Read content from database, load the plugin if not loaded.
1886 In case of error loading the plugin, it loads a failing VIM_connector
1887 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1888 :param target_id: Contains type:_id; where type can be 'vim', ...
1889 :return: None if ok, descriptive text if error
1891 target
, _
, _id
= target_id
.partition(":")
1901 step
= "Getting {}={} from db".format(target
, _id
)
1904 # TODO process for wim, sdnc, ...
1905 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1907 # if deep_get(vim, "config", "sdn-controller"):
1908 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1909 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1911 step
= "Decrypting password"
1912 schema_version
= vim
.get("schema_version")
1913 self
.db
.encrypt_decrypt_fields(
1916 fields
=("password", "secret"),
1917 schema_version
=schema_version
,
1920 self
._process
_vim
_config
(target_id
, vim
)
1923 plugin_name
= "rovim_" + vim
["vim_type"]
1924 step
= "Loading plugin '{}'".format(plugin_name
)
1925 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1926 step
= "Loading {}'".format(target_id
)
1927 self
.my_vims
[target_id
] = vim_module_conn(
1930 tenant_id
=vim
.get("vim_tenant_id"),
1931 tenant_name
=vim
.get("vim_tenant_name"),
1934 user
=vim
["vim_user"],
1935 passwd
=vim
["vim_password"],
1936 config
=vim
.get("config") or {},
1940 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
1941 step
= "Loading plugin '{}'".format(plugin_name
)
1942 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1943 step
= "Loading {}'".format(target_id
)
1945 wim_config
= wim
.pop("config", {}) or {}
1946 wim
["uuid"] = wim
["_id"]
1947 if "url" in wim
and "wim_url" not in wim
:
1948 wim
["wim_url"] = wim
["url"]
1949 elif "url" not in wim
and "wim_url" in wim
:
1950 wim
["url"] = wim
["wim_url"]
1953 wim_config
["dpid"] = wim
.pop("dpid")
1955 if wim
.get("switch_id"):
1956 wim_config
["switch_id"] = wim
.pop("switch_id")
1958 # wim, wim_account, config
1959 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1960 self
.db_vims
[target_id
] = vim
1961 self
.error_status
= None
1964 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1966 except Exception as e
:
1968 "Cannot load {} plugin={}: {} {}".format(
1969 target_id
, plugin_name
, step
, e
1973 self
.db_vims
[target_id
] = vim
or {}
1974 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1975 error_status
= "{} Error: {}".format(step
, e
)
1979 if target_id
not in self
.vim_targets
:
1980 self
.vim_targets
.append(target_id
)
1982 def _get_db_task(self
):
1984 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1989 if not self
.time_last_task_processed
:
1990 self
.time_last_task_processed
= now
1995 # Log RO tasks only when loglevel is DEBUG
1996 if self.logger.getEffectiveLevel() == logging.DEBUG:
2003 + str(self.task_locked_time)
2005 + "time_last_task_processed="
2006 + str(self.time_last_task_processed)
2012 locked
= self
.db
.set_one(
2015 "target_id": self
.vim_targets
,
2016 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2017 "locked_at.lt": now
- self
.task_locked_time
,
2018 "to_check_at.lt": self
.time_last_task_processed
,
2019 "to_check_at.gt": -1,
2021 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
2022 fail_on_empty
=False,
2027 ro_task
= self
.db
.get_one(
2030 "target_id": self
.vim_targets
,
2031 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2037 if self
.time_last_task_processed
== now
:
2038 self
.time_last_task_processed
= None
2041 self
.time_last_task_processed
= now
2042 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2044 except DbException
as e
:
2045 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
2046 except Exception as e
:
2047 self
.logger
.critical(
2048 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
2053 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2055 Determine if this task need to be done or superseded
2058 my_task
= ro_task
["tasks"][task_index
]
2059 task_id
= my_task
["task_id"]
2060 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
2061 "created_items", False
2064 self
.logger
.debug("Needed delete: {}".format(needed_delete
))
2065 if my_task
["status"] == "FAILED":
2066 return None, None # TODO need to be retry??
2069 for index
, task
in enumerate(ro_task
["tasks"]):
2070 if index
== task_index
or not task
:
2074 my_task
["target_record"] == task
["target_record"]
2075 and task
["action"] == "CREATE"
2078 db_update
["tasks.{}.status".format(index
)] = task
[
2081 elif task
["action"] == "CREATE" and task
["status"] not in (
2085 needed_delete
= False
2089 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
2091 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
2093 return "SUPERSEDED", None
2094 except Exception as e
:
2095 if not isinstance(e
, NsWorkerException
):
2096 self
.logger
.critical(
2097 "Unexpected exception at _delete_task task={}: {}".format(
2103 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2105 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2107 Determine if this task need to create something at VIM
2110 my_task
= ro_task
["tasks"][task_index
]
2111 task_id
= my_task
["task_id"]
2113 if my_task
["status"] == "FAILED":
2114 return None, None # TODO need to be retry??
2115 elif my_task
["status"] == "SCHEDULED":
2116 # check if already created by another task
2117 for index
, task
in enumerate(ro_task
["tasks"]):
2118 if index
== task_index
or not task
:
2121 if task
["action"] == "CREATE" and task
["status"] not in (
2126 return task
["status"], "COPY_VIM_INFO"
2129 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2130 ro_task
, task_index
, task_depends
2132 # TODO update other CREATE tasks
2133 except Exception as e
:
2134 if not isinstance(e
, NsWorkerException
):
2136 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2139 task_status
= "FAILED"
2140 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2141 # TODO update ro_vim_item_update
2143 return task_status
, ro_vim_item_update
2147 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2149 Look for dependency task
2150 :param task_id: Can be one of
2151 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2152 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2153 3. task.task_id: "<action_id>:number"
2156 :return: database ro_task plus index of task
2159 task_id
.startswith("vim:")
2160 or task_id
.startswith("sdn:")
2161 or task_id
.startswith("wim:")
2163 target_id
, _
, task_id
= task_id
.partition(" ")
2165 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2166 ro_task_dependency
= self
.db
.get_one(
2168 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2169 fail_on_empty
=False,
2172 if ro_task_dependency
:
2173 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2174 if task
["target_record_id"] == task_id
:
2175 return ro_task_dependency
, task_index
2179 for task_index
, task
in enumerate(ro_task
["tasks"]):
2180 if task
and task
["task_id"] == task_id
:
2181 return ro_task
, task_index
2183 ro_task_dependency
= self
.db
.get_one(
2186 "tasks.ANYINDEX.task_id": task_id
,
2187 "tasks.ANYINDEX.target_record.ne": None,
2189 fail_on_empty
=False,
2192 self
.logger
.debug("ro_task_dependency={}".format(ro_task_dependency
))
2193 if ro_task_dependency
:
2194 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2195 if task
["task_id"] == task_id
:
2196 return ro_task_dependency
, task_index
2197 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2199 def update_vm_refresh(self
, ro_task
):
2200 """Enables the VM status updates if self.refresh_config.active parameter
2201 is not -1 and then updates the DB accordingly
2205 self
.logger
.debug("Checking if VM status update config")
2206 next_refresh
= time
.time()
2207 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2209 if next_refresh
!= -1:
2210 db_ro_task_update
= {}
2212 next_check_at
= now
+ (24 * 60 * 60)
2213 next_check_at
= min(next_check_at
, next_refresh
)
2214 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2215 db_ro_task_update
["to_check_at"] = next_check_at
2218 "Finding tasks which to be updated to enable VM status updates"
2220 refresh_tasks
= self
.db
.get_list(
2223 "tasks.status": "DONE",
2224 "to_check_at.lt": 0,
2227 self
.logger
.debug("Updating tasks to change the to_check_at status")
2228 for task
in refresh_tasks
:
2235 update_dict
=db_ro_task_update
,
2239 except Exception as e
:
2240 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2242 def _get_next_refresh(self
, ro_task
: dict, next_refresh
: float):
2243 """Decide the next_refresh according to vim type and refresh config period.
2245 ro_task (dict): ro_task details
2246 next_refresh (float): next refresh time as epoch format
2249 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2251 target_vim
= ro_task
["target_id"]
2252 vim_type
= self
.db_vims
[target_vim
]["vim_type"]
2253 if self
.refresh_config
.active
== -1 or vim_type
== "openstack":
2256 next_refresh
+= self
.refresh_config
.active
2259 def _process_pending_tasks(self
, ro_task
):
2260 ro_task_id
= ro_task
["_id"]
2263 next_check_at
= now
+ (24 * 60 * 60)
2264 db_ro_task_update
= {}
2266 def _update_refresh(new_status
):
2267 # compute next_refresh
2269 nonlocal next_check_at
2270 nonlocal db_ro_task_update
2273 next_refresh
= time
.time()
2275 if task
["item"] in ("image", "flavor"):
2276 next_refresh
+= self
.refresh_config
.image
2277 elif new_status
== "BUILD":
2278 next_refresh
+= self
.refresh_config
.build
2279 elif new_status
== "DONE":
2280 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2282 next_refresh
+= self
.refresh_config
.error
2284 next_check_at
= min(next_check_at
, next_refresh
)
2285 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2286 ro_task
["vim_info"]["refresh_at"] = next_refresh
2290 # Log RO tasks only when loglevel is DEBUG
2291 if self.logger.getEffectiveLevel() == logging.DEBUG:
2292 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2294 # Check if vim status refresh is enabled again
2295 self
.update_vm_refresh(ro_task
)
2296 # 0: get task_status_create
2298 task_status_create
= None
2302 for t
in ro_task
["tasks"]
2304 and t
["action"] == "CREATE"
2305 and t
["status"] in ("BUILD", "DONE")
2311 task_status_create
= task_create
["status"]
2313 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2314 for task_action
in ("DELETE", "CREATE", "EXEC"):
2315 db_vim_update
= None
2318 for task_index
, task
in enumerate(ro_task
["tasks"]):
2320 continue # task deleted
2323 target_update
= None
2327 task_action
in ("DELETE", "EXEC")
2328 and task
["status"] not in ("SCHEDULED", "BUILD")
2330 or task
["action"] != task_action
2332 task_action
== "CREATE"
2333 and task
["status"] in ("FINISHED", "SUPERSEDED")
2338 task_path
= "tasks.{}.status".format(task_index
)
2340 db_vim_info_update
= None
2341 dependency_ro_task
= {}
2343 if task
["status"] == "SCHEDULED":
2344 # check if tasks that this depends on have been completed
2345 dependency_not_completed
= False
2347 for dependency_task_id
in task
.get("depends_on") or ():
2350 dependency_task_index
,
2351 ) = self
._get
_dependency
(
2352 dependency_task_id
, target_id
=ro_task
["target_id"]
2354 dependency_task
= dependency_ro_task
["tasks"][
2355 dependency_task_index
2358 "dependency_ro_task={} dependency_task_index={}".format(
2359 dependency_ro_task
, dependency_task_index
2363 if dependency_task
["status"] == "SCHEDULED":
2364 dependency_not_completed
= True
2365 next_check_at
= min(
2366 next_check_at
, dependency_ro_task
["to_check_at"]
2368 # must allow dependent task to be processed first
2369 # to do this set time after last_task_processed
2370 next_check_at
= max(
2371 self
.time_last_task_processed
, next_check_at
2374 elif dependency_task
["status"] == "FAILED":
2375 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2378 dependency_task
["action"],
2379 dependency_task
["item"],
2381 dependency_ro_task
["vim_info"].get(
2386 "task={} {}".format(task
["task_id"], error_text
)
2388 raise NsWorkerException(error_text
)
2390 task_depends
[dependency_task_id
] = dependency_ro_task
[
2394 "TASK-{}".format(dependency_task_id
)
2395 ] = dependency_ro_task
["vim_info"]["vim_id"]
2397 if dependency_not_completed
:
2398 self
.logger
.warning(
2399 "DEPENDENCY NOT COMPLETED {}".format(
2400 dependency_ro_task
["vim_info"]["vim_id"]
2403 # TODO set at vim_info.vim_details that it is waiting
2406 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2407 # the task of renew this locking. It will update database locket_at periodically
2409 lock_object
= LockRenew
.add_lock_object(
2410 "ro_tasks", ro_task
, self
2412 if task
["action"] == "DELETE":
2416 ) = self
._delete
_task
(
2417 ro_task
, task_index
, task_depends
, db_ro_task_update
2420 "FINISHED" if new_status
== "DONE" else new_status
2422 # ^with FINISHED instead of DONE it will not be refreshing
2424 if new_status
in ("FINISHED", "SUPERSEDED"):
2425 target_update
= "DELETE"
2426 elif task
["action"] == "EXEC":
2431 ) = self
.item2class
[task
["item"]].exec(
2432 ro_task
, task_index
, task_depends
2435 "FINISHED" if new_status
== "DONE" else new_status
2437 # ^with FINISHED instead of DONE it will not be refreshing
2440 # load into database the modified db_task_update "retries" and "next_retry"
2441 if db_task_update
.get("retries"):
2443 "tasks.{}.retries".format(task_index
)
2444 ] = db_task_update
["retries"]
2446 next_check_at
= time
.time() + db_task_update
.get(
2449 target_update
= None
2450 elif task
["action"] == "CREATE":
2451 if task
["status"] == "SCHEDULED":
2452 if task_status_create
:
2453 new_status
= task_status_create
2454 target_update
= "COPY_VIM_INFO"
2456 new_status
, db_vim_info_update
= self
.item2class
[
2458 ].new(ro_task
, task_index
, task_depends
)
2459 _update_refresh(new_status
)
2461 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2462 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2466 ) = self
.item2class
[
2469 _update_refresh(new_status
)
2471 # The refresh is updated to avoid set the value of "refresh_at" to
2472 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2473 # because it can happen that in this case the task is never processed
2474 _update_refresh(task
["status"])
2476 except Exception as e
:
2477 new_status
= "FAILED"
2478 db_vim_info_update
= {
2479 "vim_status": "VIM_ERROR",
2480 "vim_message": str(e
),
2484 e
, (NsWorkerException
, vimconn
.VimConnException
)
2487 "Unexpected exception at _delete_task task={}: {}".format(
2494 if db_vim_info_update
:
2495 db_vim_update
= db_vim_info_update
.copy()
2496 db_ro_task_update
.update(
2499 for k
, v
in db_vim_info_update
.items()
2502 ro_task
["vim_info"].update(db_vim_info_update
)
2505 if task_action
== "CREATE":
2506 task_status_create
= new_status
2507 db_ro_task_update
[task_path
] = new_status
2509 if target_update
or db_vim_update
:
2510 if target_update
== "DELETE":
2511 self
._update
_target
(task
, None)
2512 elif target_update
== "COPY_VIM_INFO":
2513 self
._update
_target
(task
, ro_task
["vim_info"])
2515 self
._update
_target
(task
, db_vim_update
)
2517 except Exception as e
:
2519 isinstance(e
, DbException
)
2520 and e
.http_code
== HTTPStatus
.NOT_FOUND
2522 # if the vnfrs or nsrs has been removed from database, this task must be removed
2524 "marking to delete task={}".format(task
["task_id"])
2526 self
.tasks_to_delete
.append(task
)
2529 "Unexpected exception at _update_target task={}: {}".format(
2535 locked_at
= ro_task
["locked_at"]
2539 lock_object
["locked_at"],
2540 lock_object
["locked_at"] + self
.task_locked_time
,
2542 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2543 # contain exactly locked_at + self.task_locked_time
2544 LockRenew
.remove_lock_object(lock_object
)
2547 "_id": ro_task
["_id"],
2548 "to_check_at": ro_task
["to_check_at"],
2549 "locked_at": locked_at
,
2551 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2552 # outside this task (by ro_nbi) do not update it
2553 db_ro_task_update
["locked_by"] = None
2554 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2555 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2556 db_ro_task_update
["modified_at"] = now
2557 db_ro_task_update
["to_check_at"] = next_check_at
2560 # Log RO tasks only when loglevel is DEBUG
2561 if self.logger.getEffectiveLevel() == logging.DEBUG:
2562 db_ro_task_update_log = db_ro_task_update.copy()
2563 db_ro_task_update_log["_id"] = q_filter["_id"]
2564 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2567 if not self
.db
.set_one(
2569 update_dict
=db_ro_task_update
,
2571 fail_on_empty
=False,
2573 del db_ro_task_update
["to_check_at"]
2574 del q_filter
["to_check_at"]
2576 # Log RO tasks only when loglevel is DEBUG
2577 if self.logger.getEffectiveLevel() == logging.DEBUG:
2580 db_ro_task_update_log,
2583 "SET_TASK " + str(q_filter),
2589 update_dict
=db_ro_task_update
,
2592 except DbException
as e
:
2594 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2596 except Exception as e
:
2598 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2601 def _update_target(self
, task
, ro_vim_item_update
):
2602 table
, _
, temp
= task
["target_record"].partition(":")
2603 _id
, _
, path_vim_status
= temp
.partition(":")
2604 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2605 path_item
= path_item
[: path_item
.rfind(".")]
2606 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2607 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2609 if ro_vim_item_update
:
2611 path_vim_status
+ "." + k
: v
2612 for k
, v
in ro_vim_item_update
.items()
2621 "interfaces_backup",
2625 if path_vim_status
.startswith("vdur."):
2626 # for backward compatibility, add vdur.name apart from vdur.vim_name
2627 if ro_vim_item_update
.get("vim_name"):
2628 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2630 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2631 if ro_vim_item_update
.get("vim_id"):
2632 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2634 # update general status
2635 if ro_vim_item_update
.get("vim_status"):
2636 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2640 if ro_vim_item_update
.get("interfaces"):
2641 path_interfaces
= path_item
+ ".interfaces"
2643 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2647 path_interfaces
+ ".{}.".format(i
) + k
: v
2648 for k
, v
in iface
.items()
2649 if k
in ("vlan", "compute_node", "pci")
2653 # put ip_address and mac_address with ip-address and mac-address
2654 if iface
.get("ip_address"):
2656 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2657 ] = iface
["ip_address"]
2659 if iface
.get("mac_address"):
2661 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2662 ] = iface
["mac_address"]
2664 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2665 update_dict
["ip-address"] = iface
.get("ip_address").split(
2669 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2670 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2674 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2676 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2677 if ro_vim_item_update
.get("interfaces"):
2678 search_key
= path_vim_status
+ ".interfaces"
2679 if update_dict
.get(search_key
):
2680 interfaces_backup_update
= {
2681 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
2686 q_filter
={"_id": _id
},
2687 update_dict
=interfaces_backup_update
,
2691 update_dict
= {path_item
+ ".status": "DELETED"}
2694 q_filter
={"_id": _id
},
2695 update_dict
=update_dict
,
2696 unset
={path_vim_status
: None},
2699 def _process_delete_db_tasks(self
):
2701 Delete task from database because vnfrs or nsrs or both have been deleted
2702 :return: None. Uses and modify self.tasks_to_delete
2704 while self
.tasks_to_delete
:
2705 task
= self
.tasks_to_delete
[0]
2706 vnfrs_deleted
= None
2707 nsr_id
= task
["nsr_id"]
2709 if task
["target_record"].startswith("vnfrs:"):
2710 # check if nsrs is present
2711 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2712 vnfrs_deleted
= task
["target_record"].split(":")[1]
2715 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2716 except Exception as e
:
2718 "Error deleting task={}: {}".format(task
["task_id"], e
)
2720 self
.tasks_to_delete
.pop(0)
2723 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2725 Static method because it is called from osm_ng_ro.ns
2726 :param db: instance of database to use
2727 :param nsr_id: affected nsrs id
2728 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2729 :return: None, exception is fails
2732 for retry
in range(retries
):
2733 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2737 for ro_task
in ro_tasks
:
2739 to_delete_ro_task
= True
2741 for index
, task
in enumerate(ro_task
["tasks"]):
2744 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2746 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2748 db_update
["tasks.{}".format(index
)] = None
2750 # used by other nsr, ro_task cannot be deleted
2751 to_delete_ro_task
= False
2753 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2754 if to_delete_ro_task
:
2758 "_id": ro_task
["_id"],
2759 "modified_at": ro_task
["modified_at"],
2761 fail_on_empty
=False,
2765 db_update
["modified_at"] = now
2769 "_id": ro_task
["_id"],
2770 "modified_at": ro_task
["modified_at"],
2772 update_dict
=db_update
,
2773 fail_on_empty
=False,
2779 raise NsWorkerException("Exceeded {} retries".format(retries
))
2783 self
.logger
.info("Starting")
2785 # step 1: get commands from queue
2787 if self
.vim_targets
:
2788 task
= self
.task_queue
.get(block
=False)
2791 self
.logger
.debug("enters in idle state")
2793 task
= self
.task_queue
.get(block
=True)
2796 if task
[0] == "terminate":
2798 elif task
[0] == "load_vim":
2799 self
.logger
.info("order to load vim {}".format(task
[1]))
2800 self
._load
_vim
(task
[1])
2801 elif task
[0] == "unload_vim":
2802 self
.logger
.info("order to unload vim {}".format(task
[1]))
2803 self
._unload
_vim
(task
[1])
2804 elif task
[0] == "reload_vim":
2805 self
._reload
_vim
(task
[1])
2806 elif task
[0] == "check_vim":
2807 self
.logger
.info("order to check vim {}".format(task
[1]))
2808 self
._check
_vim
(task
[1])
2810 except Exception as e
:
2811 if isinstance(e
, queue
.Empty
):
2814 self
.logger
.critical(
2815 "Error processing task: {}".format(e
), exc_info
=True
2818 # step 2: process pending_tasks, delete not needed tasks
2820 if self
.tasks_to_delete
:
2821 self
._process
_delete
_db
_tasks
()
2824 # Log RO tasks only when loglevel is DEBUG
2825 if self.logger.getEffectiveLevel() == logging.DEBUG:
2826 _ = self._get_db_all_tasks()
2828 ro_task
= self
._get
_db
_task
()
2830 self
.logger
.debug("Task to process: {}".format(ro_task
))
2832 self
._process
_pending
_tasks
(ro_task
)
2836 except Exception as e
:
2837 self
.logger
.critical(
2838 "Unexpected exception at run: " + str(e
), exc_info
=True
2841 self
.logger
.info("Finishing")