1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
30 from os
import makedirs
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
43 from osm_ro_plugin
import vimconn
44 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
45 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
219 "task={} {} new-net={} created={}".format(
220 task_id
, ro_task
["target_id"], vim_net_id
, created
224 return "BUILD", ro_vim_item_update
225 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
227 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
229 ro_vim_item_update
= {
230 "vim_status": "VIM_ERROR",
232 "vim_message": str(e
),
235 return "FAILED", ro_vim_item_update
237 def refresh(self
, ro_task
):
238 """Call VIM to get network status"""
239 ro_task_id
= ro_task
["_id"]
240 target_vim
= self
.my_vims
[ro_task
["target_id"]]
241 vim_id
= ro_task
["vim_info"]["vim_id"]
242 net_to_refresh_list
= [vim_id
]
245 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
246 vim_info
= vim_dict
[vim_id
]
248 if vim_info
["status"] == "ACTIVE":
250 elif vim_info
["status"] == "BUILD":
251 task_status
= "BUILD"
253 task_status
= "FAILED"
254 except vimconn
.VimConnException
as e
:
255 # Mark all tasks at VIM_ERROR status
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id
, ro_task
["target_id"], vim_id
, e
261 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
262 task_status
= "FAILED"
264 ro_vim_item_update
= {}
265 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
266 ro_vim_item_update
["vim_status"] = vim_info
["status"]
268 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
269 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
271 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
273 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
274 elif vim_info
["status"] == "DELETED":
275 ro_vim_item_update
["vim_id"] = None
276 ro_vim_item_update
["vim_message"] = "Deleted externally"
278 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
279 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
281 if ro_vim_item_update
:
283 "ro_task={} {} get-net={}: status={} {}".format(
285 ro_task
["target_id"],
287 ro_vim_item_update
.get("vim_status"),
288 ro_vim_item_update
.get("vim_message")
289 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
294 return task_status
, ro_vim_item_update
296 def delete(self
, ro_task
, task_index
):
297 task
= ro_task
["tasks"][task_index
]
298 task_id
= task
["task_id"]
299 net_vim_id
= ro_task
["vim_info"]["vim_id"]
300 ro_vim_item_update_ok
= {
301 "vim_status": "DELETED",
303 "vim_message": "DELETED",
308 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
309 target_vim
= self
.my_vims
[ro_task
["target_id"]]
310 target_vim
.delete_network(
311 net_vim_id
, ro_task
["vim_info"]["created_items"]
313 except vimconn
.VimConnNotFoundException
:
314 ro_vim_item_update_ok
["vim_message"] = "already deleted"
315 except vimconn
.VimConnException
as e
:
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
321 ro_vim_item_update
= {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e
),
326 return "FAILED", ro_vim_item_update
329 "task={} {} del-net={} {}".format(
331 ro_task
["target_id"],
333 ro_vim_item_update_ok
.get("vim_message", ""),
337 return "DONE", ro_vim_item_update_ok
340 class VimInteractionVdu(VimInteractionBase
):
341 max_retries_inject_ssh_key
= 20 # 20 times
342 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
344 def new(self
, ro_task
, task_index
, task_depends
):
345 task
= ro_task
["tasks"][task_index
]
346 task_id
= task
["task_id"]
349 target_vim
= self
.my_vims
[ro_task
["target_id"]]
353 params
= task
["params"]
354 params_copy
= deepcopy(params
)
355 net_list
= params_copy
["net_list"]
358 # change task_id into network_id
359 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
360 network_id
= task_depends
[net
["net_id"]]
363 raise NsWorkerException(
364 "Cannot create VM because depends on a network not created or found "
365 "for {}".format(net
["net_id"])
368 net
["net_id"] = network_id
370 if params_copy
["image_id"].startswith("TASK-"):
371 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
373 if params_copy
["flavor_id"].startswith("TASK-"):
374 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
376 affinity_group_list
= params_copy
["affinity_group_list"]
377 for affinity_group
in affinity_group_list
:
378 # change task_id into affinity_group_id
379 if "affinity_group_id" in affinity_group
and affinity_group
[
381 ].startswith("TASK-"):
382 affinity_group_id
= task_depends
[
383 affinity_group
["affinity_group_id"]
386 if not affinity_group_id
:
387 raise NsWorkerException(
388 "found for {}".format(affinity_group
["affinity_group_id"])
391 affinity_group
["affinity_group_id"] = affinity_group_id
393 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
394 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
396 # add to created items previous_created_volumes (healing)
397 if task
.get("previous_created_volumes"):
398 for k
, v
in task
["previous_created_volumes"].items():
401 ro_vim_item_update
= {
403 "vim_status": "BUILD",
405 "created_items": created_items
,
408 "interfaces_vim_ids": interfaces
,
410 "interfaces_backup": [],
413 "task={} {} new-vm={} created={}".format(
414 task_id
, ro_task
["target_id"], vim_vm_id
, created
418 return "BUILD", ro_vim_item_update
419 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
420 self
.logger
.debug(traceback
.format_exc())
422 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
424 ro_vim_item_update
= {
425 "vim_status": "VIM_ERROR",
427 "vim_message": str(e
),
430 return "FAILED", ro_vim_item_update
432 def delete(self
, ro_task
, task_index
):
433 task
= ro_task
["tasks"][task_index
]
434 task_id
= task
["task_id"]
435 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
436 ro_vim_item_update_ok
= {
437 "vim_status": "DELETED",
439 "vim_message": "DELETED",
445 "delete_vminstance: vm_vim_id={} created_items={}".format(
446 vm_vim_id
, ro_task
["vim_info"]["created_items"]
449 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
450 target_vim
= self
.my_vims
[ro_task
["target_id"]]
451 target_vim
.delete_vminstance(
453 ro_task
["vim_info"]["created_items"],
454 ro_task
["vim_info"].get("volumes_to_hold", []),
456 except vimconn
.VimConnNotFoundException
:
457 ro_vim_item_update_ok
["vim_message"] = "already deleted"
458 except vimconn
.VimConnException
as e
:
460 "ro_task={} vim={} del-vm={}: {}".format(
461 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
464 ro_vim_item_update
= {
465 "vim_status": "VIM_ERROR",
466 "vim_message": "Error while deleting: {}".format(e
),
469 return "FAILED", ro_vim_item_update
472 "task={} {} del-vm={} {}".format(
474 ro_task
["target_id"],
476 ro_vim_item_update_ok
.get("vim_message", ""),
480 return "DONE", ro_vim_item_update_ok
482 def refresh(self
, ro_task
):
483 """Call VIM to get vm status"""
484 ro_task_id
= ro_task
["_id"]
485 target_vim
= self
.my_vims
[ro_task
["target_id"]]
486 vim_id
= ro_task
["vim_info"]["vim_id"]
491 vm_to_refresh_list
= [vim_id
]
493 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
494 vim_info
= vim_dict
[vim_id
]
496 if vim_info
["status"] == "ACTIVE":
498 elif vim_info
["status"] == "BUILD":
499 task_status
= "BUILD"
501 task_status
= "FAILED"
503 # try to load and parse vim_information
505 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
506 if vim_info_info
.get("name"):
507 vim_info
["name"] = vim_info_info
["name"]
508 except Exception as vim_info_error
:
509 self
.logger
.exception(
510 f
"{vim_info_error} occured while getting the vim_info from yaml"
512 except vimconn
.VimConnException
as e
:
513 # Mark all tasks at VIM_ERROR status
515 "ro_task={} vim={} get-vm={}: {}".format(
516 ro_task_id
, ro_task
["target_id"], vim_id
, e
519 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
520 task_status
= "FAILED"
522 ro_vim_item_update
= {}
524 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
526 if vim_info
.get("interfaces"):
527 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
531 for iface
in vim_info
["interfaces"]
532 if vim_iface_id
== iface
["vim_interface_id"]
537 # iface.pop("vim_info", None)
538 vim_interfaces
.append(iface
)
542 for t
in ro_task
["tasks"]
543 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
545 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
546 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
550 mgmt_vdu_iface
= task_create
.get(
551 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
554 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
556 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
557 ro_vim_item_update
["interfaces"] = vim_interfaces
559 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
560 ro_vim_item_update
["vim_status"] = vim_info
["status"]
562 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
563 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
565 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
566 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
567 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
568 elif vim_info
["status"] == "DELETED":
569 ro_vim_item_update
["vim_id"] = None
570 ro_vim_item_update
["vim_message"] = "Deleted externally"
572 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
573 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
575 if ro_vim_item_update
:
577 "ro_task={} {} get-vm={}: status={} {}".format(
579 ro_task
["target_id"],
581 ro_vim_item_update
.get("vim_status"),
582 ro_vim_item_update
.get("vim_message")
583 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
588 return task_status
, ro_vim_item_update
590 def exec(self
, ro_task
, task_index
, task_depends
):
591 task
= ro_task
["tasks"][task_index
]
592 task_id
= task
["task_id"]
593 target_vim
= self
.my_vims
[ro_task
["target_id"]]
594 db_task_update
= {"retries": 0}
595 retries
= task
.get("retries", 0)
598 params
= task
["params"]
599 params_copy
= deepcopy(params
)
600 params_copy
["ro_key"] = self
.db
.decrypt(
601 params_copy
.pop("private_key"),
602 params_copy
.pop("schema_version"),
603 params_copy
.pop("salt"),
605 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
606 target_vim
.inject_user_key(**params_copy
)
608 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
615 ) # params_copy["key"]
616 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
619 self
.logger
.debug(traceback
.format_exc())
620 if retries
< self
.max_retries_inject_ssh_key
:
626 "next_retry": self
.time_retries_inject_ssh_key
,
631 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
633 ro_vim_item_update
= {"vim_message": str(e
)}
635 return "FAILED", ro_vim_item_update
, db_task_update
638 class VimInteractionImage(VimInteractionBase
):
639 def new(self
, ro_task
, task_index
, task_depends
):
640 task
= ro_task
["tasks"][task_index
]
641 task_id
= task
["task_id"]
644 target_vim
= self
.my_vims
[ro_task
["target_id"]]
648 if task
.get("find_params"):
649 vim_images
= target_vim
.get_image_list(**task
["find_params"])
652 raise NsWorkerExceptionNotFound(
653 "Image not found with this criteria: '{}'".format(
657 elif len(vim_images
) > 1:
658 raise NsWorkerException(
659 "More than one image found with this criteria: '{}'".format(
664 vim_image_id
= vim_images
[0]["id"]
666 ro_vim_item_update
= {
667 "vim_id": vim_image_id
,
668 "vim_status": "DONE",
670 "created_items": created_items
,
675 "task={} {} new-image={} created={}".format(
676 task_id
, ro_task
["target_id"], vim_image_id
, created
680 return "DONE", ro_vim_item_update
681 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
683 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
685 ro_vim_item_update
= {
686 "vim_status": "VIM_ERROR",
688 "vim_message": str(e
),
691 return "FAILED", ro_vim_item_update
694 class VimInteractionFlavor(VimInteractionBase
):
695 def delete(self
, ro_task
, task_index
):
696 task
= ro_task
["tasks"][task_index
]
697 task_id
= task
["task_id"]
698 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
699 ro_vim_item_update_ok
= {
700 "vim_status": "DELETED",
702 "vim_message": "DELETED",
708 target_vim
= self
.my_vims
[ro_task
["target_id"]]
709 target_vim
.delete_flavor(flavor_vim_id
)
710 except vimconn
.VimConnNotFoundException
:
711 ro_vim_item_update_ok
["vim_message"] = "already deleted"
712 except vimconn
.VimConnException
as e
:
714 "ro_task={} vim={} del-flavor={}: {}".format(
715 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
718 ro_vim_item_update
= {
719 "vim_status": "VIM_ERROR",
720 "vim_message": "Error while deleting: {}".format(e
),
723 return "FAILED", ro_vim_item_update
726 "task={} {} del-flavor={} {}".format(
728 ro_task
["target_id"],
730 ro_vim_item_update_ok
.get("vim_message", ""),
734 return "DONE", ro_vim_item_update_ok
736 def new(self
, ro_task
, task_index
, task_depends
):
737 task
= ro_task
["tasks"][task_index
]
738 task_id
= task
["task_id"]
741 target_vim
= self
.my_vims
[ro_task
["target_id"]]
747 if task
.get("find_params"):
749 flavor_data
= task
["find_params"]["flavor_data"]
750 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
751 except vimconn
.VimConnNotFoundException
as flavor_not_found_msg
:
753 f
"VimConnNotFoundException occured: {flavor_not_found_msg}"
756 if not vim_flavor_id
and task
.get("params"):
758 flavor_data
= task
["params"]["flavor_data"]
759 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
762 ro_vim_item_update
= {
763 "vim_id": vim_flavor_id
,
764 "vim_status": "DONE",
766 "created_items": created_items
,
771 "task={} {} new-flavor={} created={}".format(
772 task_id
, ro_task
["target_id"], vim_flavor_id
, created
776 return "DONE", ro_vim_item_update
777 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
779 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
781 ro_vim_item_update
= {
782 "vim_status": "VIM_ERROR",
784 "vim_message": str(e
),
787 return "FAILED", ro_vim_item_update
790 class VimInteractionAffinityGroup(VimInteractionBase
):
791 def delete(self
, ro_task
, task_index
):
792 task
= ro_task
["tasks"][task_index
]
793 task_id
= task
["task_id"]
794 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
795 ro_vim_item_update_ok
= {
796 "vim_status": "DELETED",
798 "vim_message": "DELETED",
803 if affinity_group_vim_id
:
804 target_vim
= self
.my_vims
[ro_task
["target_id"]]
805 target_vim
.delete_affinity_group(affinity_group_vim_id
)
806 except vimconn
.VimConnNotFoundException
:
807 ro_vim_item_update_ok
["vim_message"] = "already deleted"
808 except vimconn
.VimConnException
as e
:
810 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
811 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
814 ro_vim_item_update
= {
815 "vim_status": "VIM_ERROR",
816 "vim_message": "Error while deleting: {}".format(e
),
819 return "FAILED", ro_vim_item_update
822 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
824 ro_task
["target_id"],
825 affinity_group_vim_id
,
826 ro_vim_item_update_ok
.get("vim_message", ""),
830 return "DONE", ro_vim_item_update_ok
832 def new(self
, ro_task
, task_index
, task_depends
):
833 task
= ro_task
["tasks"][task_index
]
834 task_id
= task
["task_id"]
837 target_vim
= self
.my_vims
[ro_task
["target_id"]]
840 affinity_group_vim_id
= None
841 affinity_group_data
= None
843 if task
.get("params"):
844 affinity_group_data
= task
["params"].get("affinity_group_data")
846 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
848 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
849 "vim-affinity-group-id"
851 affinity_group_vim_id
= target_vim
.get_affinity_group(
852 param_affinity_group_id
854 except vimconn
.VimConnNotFoundException
:
856 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
857 "could not be found at VIM. Creating a new one.".format(
858 task_id
, ro_task
["target_id"], param_affinity_group_id
862 if not affinity_group_vim_id
and affinity_group_data
:
863 affinity_group_vim_id
= target_vim
.new_affinity_group(
868 ro_vim_item_update
= {
869 "vim_id": affinity_group_vim_id
,
870 "vim_status": "DONE",
872 "created_items": created_items
,
877 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
878 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
882 return "DONE", ro_vim_item_update
883 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
885 "task={} vim={} new-affinity-or-anti-affinity-group:"
886 " {}".format(task_id
, ro_task
["target_id"], e
)
888 ro_vim_item_update
= {
889 "vim_status": "VIM_ERROR",
891 "vim_message": str(e
),
894 return "FAILED", ro_vim_item_update
897 class VimInteractionUpdateVdu(VimInteractionBase
):
898 def exec(self
, ro_task
, task_index
, task_depends
):
899 task
= ro_task
["tasks"][task_index
]
900 task_id
= task
["task_id"]
901 db_task_update
= {"retries": 0}
904 target_vim
= self
.my_vims
[ro_task
["target_id"]]
907 if task
.get("params"):
908 vim_vm_id
= task
["params"].get("vim_vm_id")
909 action
= task
["params"].get("action")
910 context
= {action
: action
}
911 target_vim
.action_vminstance(vim_vm_id
, context
)
913 ro_vim_item_update
= {
915 "vim_status": "DONE",
917 "created_items": created_items
,
922 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
924 return "DONE", ro_vim_item_update
, db_task_update
925 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
927 "task={} vim={} VM Migration:"
928 " {}".format(task_id
, ro_task
["target_id"], e
)
930 ro_vim_item_update
= {
931 "vim_status": "VIM_ERROR",
933 "vim_message": str(e
),
936 return "FAILED", ro_vim_item_update
, db_task_update
939 class VimInteractionSdnNet(VimInteractionBase
):
941 def _match_pci(port_pci
, mapping
):
943 Check if port_pci matches with mapping
944 mapping can have brackets to indicate that several chars are accepted. e.g
945 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
946 :param port_pci: text
947 :param mapping: text, can contain brackets to indicate several chars are available
948 :return: True if matches, False otherwise
950 if not port_pci
or not mapping
:
952 if port_pci
== mapping
:
958 bracket_start
= mapping
.find("[", mapping_index
)
960 if bracket_start
== -1:
963 bracket_end
= mapping
.find("]", bracket_start
)
964 if bracket_end
== -1:
967 length
= bracket_start
- mapping_index
970 and port_pci
[pci_index
: pci_index
+ length
]
971 != mapping
[mapping_index
:bracket_start
]
976 port_pci
[pci_index
+ length
]
977 not in mapping
[bracket_start
+ 1 : bracket_end
]
981 pci_index
+= length
+ 1
982 mapping_index
= bracket_end
+ 1
984 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
989 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
991 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
992 :param vim_account_id:
997 for vld
in vlds_to_connect
:
998 table
, _
, db_id
= vld
.partition(":")
999 db_id
, _
, vld
= db_id
.partition(":")
1000 _
, _
, vld_id
= vld
.partition(".")
1002 if table
== "vnfrs":
1003 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
1004 iface_key
= "vnf-vld-id"
1005 else: # table == "nsrs"
1006 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1007 iface_key
= "ns-vld-id"
1009 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1011 for db_vnfr
in db_vnfrs
:
1012 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1013 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1014 if interface
.get(iface_key
) == vld_id
and interface
.get(
1016 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1018 interface_
= interface
.copy()
1019 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1020 db_vnfr
["_id"], vdu_index
, iface_index
1023 if vdur
.get("status") == "ERROR":
1024 interface_
["status"] = "ERROR"
1026 interfaces
.append(interface_
)
1030 def refresh(self
, ro_task
):
1031 # look for task create
1032 task_create_index
, _
= next(
1034 for i_t
in enumerate(ro_task
["tasks"])
1036 and i_t
[1]["action"] == "CREATE"
1037 and i_t
[1]["status"] != "FINISHED"
1040 return self
.new(ro_task
, task_create_index
, None)
1042 def new(self
, ro_task
, task_index
, task_depends
):
1043 task
= ro_task
["tasks"][task_index
]
1044 task_id
= task
["task_id"]
1045 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1047 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1049 created_items
= ro_task
["vim_info"].get("created_items")
1050 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1051 new_connected_ports
= []
1052 last_update
= ro_task
["vim_info"].get("last_update", 0)
1053 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1055 created
= ro_task
["vim_info"].get("created", False)
1059 params
= task
["params"]
1060 vlds_to_connect
= params
.get("vlds", [])
1061 associated_vim
= params
.get("target_vim")
1062 # external additional ports
1063 additional_ports
= params
.get("sdn-ports") or ()
1064 _
, _
, vim_account_id
= (
1066 if associated_vim
is None
1067 else associated_vim
.partition(":")
1071 # get associated VIM
1072 if associated_vim
not in self
.db_vims
:
1073 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1074 "vim_accounts", {"_id": vim_account_id
}
1077 db_vim
= self
.db_vims
[associated_vim
]
1079 # look for ports to connect
1080 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1084 pending_ports
= error_ports
= 0
1086 sdn_need_update
= False
1089 vlan_used
= port
.get("vlan") or vlan_used
1091 # TODO. Do not connect if already done
1092 if not port
.get("compute_node") or not port
.get("pci"):
1093 if port
.get("status") == "ERROR":
1100 compute_node_mappings
= next(
1103 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1104 if c
and c
["compute_node"] == port
["compute_node"]
1109 if compute_node_mappings
:
1110 # process port_mapping pci of type 0000:af:1[01].[1357]
1114 for p
in compute_node_mappings
["ports"]
1115 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1121 if not db_vim
["config"].get("mapping_not_needed"):
1123 "Port mapping not found for compute_node={} pci={}".format(
1124 port
["compute_node"], port
["pci"]
1131 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1133 "service_endpoint_id": pmap
.get("service_endpoint_id")
1134 or service_endpoint_id
,
1135 "service_endpoint_encapsulation_type": "dot1q"
1136 if port
["type"] == "SR-IOV"
1138 "service_endpoint_encapsulation_info": {
1139 "vlan": port
.get("vlan"),
1140 "mac": port
.get("mac-address"),
1141 "device_id": pmap
.get("device_id") or port
["compute_node"],
1142 "device_interface_id": pmap
.get("device_interface_id")
1144 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1145 "switch_port": pmap
.get("switch_port"),
1146 "service_mapping_info": pmap
.get("service_mapping_info"),
1151 # if port["modified_at"] > last_update:
1152 # sdn_need_update = True
1153 new_connected_ports
.append(port
["id"]) # TODO
1154 sdn_ports
.append(new_port
)
1158 "{} interfaces have not been created as VDU is on ERROR status".format(
1163 # connect external ports
1164 for index
, additional_port
in enumerate(additional_ports
):
1165 additional_port_id
= additional_port
.get(
1166 "service_endpoint_id"
1167 ) or "external-{}".format(index
)
1170 "service_endpoint_id": additional_port_id
,
1171 "service_endpoint_encapsulation_type": additional_port
.get(
1172 "service_endpoint_encapsulation_type", "dot1q"
1174 "service_endpoint_encapsulation_info": {
1175 "vlan": additional_port
.get("vlan") or vlan_used
,
1176 "mac": additional_port
.get("mac_address"),
1177 "device_id": additional_port
.get("device_id"),
1178 "device_interface_id": additional_port
.get(
1179 "device_interface_id"
1181 "switch_dpid": additional_port
.get("switch_dpid")
1182 or additional_port
.get("switch_id"),
1183 "switch_port": additional_port
.get("switch_port"),
1184 "service_mapping_info": additional_port
.get(
1185 "service_mapping_info"
1190 new_connected_ports
.append(additional_port_id
)
1193 # if there are more ports to connect or they have been modified, call create/update
1195 sdn_status
= "ERROR"
1196 sdn_info
= "; ".join(error_list
)
1197 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1198 last_update
= time
.time()
1201 if len(sdn_ports
) < 2:
1202 sdn_status
= "ACTIVE"
1204 if not pending_ports
:
1206 "task={} {} new-sdn-net done, less than 2 ports".format(
1207 task_id
, ro_task
["target_id"]
1211 net_type
= params
.get("type") or "ELAN"
1215 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1218 "task={} {} new-sdn-net={} created={}".format(
1219 task_id
, ro_task
["target_id"], sdn_net_id
, created
1223 created_items
= target_vim
.edit_connectivity_service(
1224 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1228 "task={} {} update-sdn-net={} created={}".format(
1229 task_id
, ro_task
["target_id"], sdn_net_id
, created
1233 connected_ports
= new_connected_ports
1235 wim_status_dict
= target_vim
.get_connectivity_service_status(
1236 sdn_net_id
, conn_info
=created_items
1238 sdn_status
= wim_status_dict
["sdn_status"]
1240 if wim_status_dict
.get("sdn_info"):
1241 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1243 if wim_status_dict
.get("error_msg"):
1244 sdn_info
= wim_status_dict
.get("error_msg") or ""
1247 if sdn_status
!= "ERROR":
1248 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1249 len(ports
) - pending_ports
, len(ports
)
1252 if sdn_status
== "ACTIVE":
1253 sdn_status
= "BUILD"
1255 ro_vim_item_update
= {
1256 "vim_id": sdn_net_id
,
1257 "vim_status": sdn_status
,
1259 "created_items": created_items
,
1260 "connected_ports": connected_ports
,
1261 "vim_details": sdn_info
,
1262 "vim_message": None,
1263 "last_update": last_update
,
1266 return sdn_status
, ro_vim_item_update
1267 except Exception as e
:
1269 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1270 exc_info
=not isinstance(
1271 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1274 ro_vim_item_update
= {
1275 "vim_status": "VIM_ERROR",
1277 "vim_message": str(e
),
1280 return "FAILED", ro_vim_item_update
1282 def delete(self
, ro_task
, task_index
):
1283 task
= ro_task
["tasks"][task_index
]
1284 task_id
= task
["task_id"]
1285 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1286 ro_vim_item_update_ok
= {
1287 "vim_status": "DELETED",
1289 "vim_message": "DELETED",
1295 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1296 target_vim
.delete_connectivity_service(
1297 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1300 except Exception as e
:
1302 isinstance(e
, sdnconn
.SdnConnectorError
)
1303 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1305 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1308 "ro_task={} vim={} del-sdn-net={}: {}".format(
1309 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1311 exc_info
=not isinstance(
1312 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1315 ro_vim_item_update
= {
1316 "vim_status": "VIM_ERROR",
1317 "vim_message": "Error while deleting: {}".format(e
),
1320 return "FAILED", ro_vim_item_update
1323 "task={} {} del-sdn-net={} {}".format(
1325 ro_task
["target_id"],
1327 ro_vim_item_update_ok
.get("vim_message", ""),
1331 return "DONE", ro_vim_item_update_ok
1334 class VimInteractionMigration(VimInteractionBase
):
1335 def exec(self
, ro_task
, task_index
, task_depends
):
1336 task
= ro_task
["tasks"][task_index
]
1337 task_id
= task
["task_id"]
1338 db_task_update
= {"retries": 0}
1339 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1343 refreshed_vim_info
= {}
1346 if task
.get("params"):
1347 vim_vm_id
= task
["params"].get("vim_vm_id")
1348 migrate_host
= task
["params"].get("migrate_host")
1349 _
, migrated_compute_node
= target_vim
.migrate_instance(
1350 vim_vm_id
, migrate_host
1353 if migrated_compute_node
:
1354 # When VM is migrated, vdu["vim_info"] needs to be updated
1355 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1356 ro_task
["target_id"]
1359 # Refresh VM to get new vim_info
1360 vm_to_refresh_list
= [vim_vm_id
]
1361 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1362 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1364 if refreshed_vim_info
.get("interfaces"):
1365 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1369 for iface
in refreshed_vim_info
["interfaces"]
1370 if old_iface
["vim_interface_id"]
1371 == iface
["vim_interface_id"]
1375 vim_interfaces
.append(iface
)
1377 ro_vim_item_update
= {
1378 "vim_id": vim_vm_id
,
1379 "vim_status": "ACTIVE",
1381 "created_items": created_items
,
1382 "vim_details": None,
1383 "vim_message": None,
1386 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1390 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1393 ro_vim_item_update
["interfaces"] = vim_interfaces
1396 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1399 return "DONE", ro_vim_item_update
, db_task_update
1401 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1403 "task={} vim={} VM Migration:"
1404 " {}".format(task_id
, ro_task
["target_id"], e
)
1406 ro_vim_item_update
= {
1407 "vim_status": "VIM_ERROR",
1409 "vim_message": str(e
),
1412 return "FAILED", ro_vim_item_update
, db_task_update
1415 class VimInteractionResize(VimInteractionBase
):
1416 def exec(self
, ro_task
, task_index
, task_depends
):
1417 task
= ro_task
["tasks"][task_index
]
1418 task_id
= task
["task_id"]
1419 db_task_update
= {"retries": 0}
1421 target_flavor_uuid
= None
1423 refreshed_vim_info
= {}
1424 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1427 if task
.get("params"):
1428 vim_vm_id
= task
["params"].get("vim_vm_id")
1429 flavor_dict
= task
["params"].get("flavor_dict")
1430 self
.logger
.info("flavor_dict %s", flavor_dict
)
1433 target_flavor_uuid
= target_vim
.get_flavor_id_from_data(flavor_dict
)
1434 except Exception as e
:
1435 self
.logger
.info("Cannot find any flavor matching %s.", str(e
))
1437 target_flavor_uuid
= target_vim
.new_flavor(flavor_dict
)
1438 except Exception as e
:
1439 self
.logger
.error("Error creating flavor at VIM %s.", str(e
))
1441 if target_flavor_uuid
is not None:
1442 resized_status
= target_vim
.resize_instance(
1443 vim_vm_id
, target_flavor_uuid
1447 # Refresh VM to get new vim_info
1448 vm_to_refresh_list
= [vim_vm_id
]
1449 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1450 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1452 ro_vim_item_update
= {
1453 "vim_id": vim_vm_id
,
1454 "vim_status": "DONE",
1456 "created_items": created_items
,
1457 "vim_details": None,
1458 "vim_message": None,
1461 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1465 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1468 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1470 return "DONE", ro_vim_item_update
, db_task_update
1471 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1473 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1475 ro_vim_item_update
= {
1476 "vim_status": "VIM_ERROR",
1478 "vim_message": str(e
),
1481 return "FAILED", ro_vim_item_update
, db_task_update
1484 class ConfigValidate
:
1485 def __init__(self
, config
: Dict
):
1490 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1492 self
.conf
["period"]["refresh_active"] >= 60
1493 or self
.conf
["period"]["refresh_active"] == -1
1495 return self
.conf
["period"]["refresh_active"]
1501 return self
.conf
["period"]["refresh_build"]
1505 return self
.conf
["period"]["refresh_image"]
1509 return self
.conf
["period"]["refresh_error"]
1512 def queue_size(self
):
1513 return self
.conf
["period"]["queue_size"]
1516 class NsWorker(threading
.Thread
):
1517 def __init__(self
, worker_index
, config
, plugins
, db
):
1519 :param worker_index: thread index
1520 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1521 :param plugins: global shared dict with the loaded plugins
1522 :param db: database class instance to use
1524 threading
.Thread
.__init
__(self
)
1525 self
.config
= config
1526 self
.plugins
= plugins
1527 self
.plugin_name
= "unknown"
1528 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1529 self
.worker_index
= worker_index
1530 # refresh periods for created items
1531 self
.refresh_config
= ConfigValidate(config
)
1532 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1533 # targetvim: vimplugin class
1535 # targetvim: vim information from database
1538 self
.vim_targets
= []
1539 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1542 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1543 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1544 "image": VimInteractionImage(
1545 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1547 "flavor": VimInteractionFlavor(
1548 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1550 "sdn_net": VimInteractionSdnNet(
1551 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1553 "update": VimInteractionUpdateVdu(
1554 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1556 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1557 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1559 "migrate": VimInteractionMigration(
1560 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1562 "verticalscale": VimInteractionResize(
1563 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1566 self
.time_last_task_processed
= None
1567 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1568 self
.tasks_to_delete
= []
1569 # it is idle when there are not vim_targets associated
1571 self
.task_locked_time
= config
["global"]["task_locked_time"]
1573 def insert_task(self
, task
):
1575 self
.task_queue
.put(task
, False)
1578 raise NsWorkerException("timeout inserting a task")
1580 def terminate(self
):
1581 self
.insert_task("exit")
1583 def del_task(self
, task
):
1584 with self
.task_lock
:
1585 if task
["status"] == "SCHEDULED":
1586 task
["status"] = "SUPERSEDED"
1588 else: # task["status"] == "processing"
1589 self
.task_lock
.release()
1592 def _process_vim_config(self
, target_id
: str, db_vim
: dict) -> None:
1594 Process vim config, creating vim configuration files as ca_cert
1595 :param target_id: vim/sdn/wim + id
1596 :param db_vim: Vim dictionary obtained from database
1597 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1599 if not db_vim
.get("config"):
1603 work_dir
= "/app/osm_ro/certs"
1606 if db_vim
["config"].get("ca_cert_content"):
1607 file_name
= f
"{work_dir}/{target_id}:{self.worker_index}"
1609 if not path
.isdir(file_name
):
1612 file_name
= file_name
+ "/ca_cert"
1614 with
open(file_name
, "w") as f
:
1615 f
.write(db_vim
["config"]["ca_cert_content"])
1616 del db_vim
["config"]["ca_cert_content"]
1617 db_vim
["config"]["ca_cert"] = file_name
1618 except Exception as e
:
1619 raise NsWorkerException(
1620 "Error writing to file '{}': {}".format(file_name
, e
)
1623 def _load_plugin(self
, name
, type="vim"):
1624 # type can be vim or sdn
1625 if "rovim_dummy" not in self
.plugins
:
1626 self
.plugins
["rovim_dummy"] = VimDummyConnector
1628 if "rosdn_dummy" not in self
.plugins
:
1629 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1631 if name
in self
.plugins
:
1632 return self
.plugins
[name
]
1635 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1636 self
.plugins
[name
] = ep
.load()
1637 except Exception as e
:
1638 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1640 if name
and name
not in self
.plugins
:
1641 raise NsWorkerException(
1642 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1645 return self
.plugins
[name
]
1647 def _unload_vim(self
, target_id
):
1649 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1650 :param target_id: Contains type:_id; where type can be 'vim', ...
1654 self
.db_vims
.pop(target_id
, None)
1655 self
.my_vims
.pop(target_id
, None)
1657 if target_id
in self
.vim_targets
:
1658 self
.vim_targets
.remove(target_id
)
1660 self
.logger
.info("Unloaded {}".format(target_id
))
1661 except Exception as e
:
1662 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1664 def _check_vim(self
, target_id
):
1666 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1667 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1670 target
, _
, _id
= target_id
.partition(":")
1676 loaded
= target_id
in self
.vim_targets
1686 step
= "Getting {} from db".format(target_id
)
1687 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1689 for op_index
, operation
in enumerate(
1690 db_vim
["_admin"].get("operations", ())
1692 if operation
["operationState"] != "PROCESSING":
1695 locked_at
= operation
.get("locked_at")
1697 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1698 # some other thread is doing this operation
1702 op_text
= "_admin.operations.{}.".format(op_index
)
1704 if not self
.db
.set_one(
1708 op_text
+ "operationState": "PROCESSING",
1709 op_text
+ "locked_at": locked_at
,
1712 op_text
+ "locked_at": now
,
1713 "admin.current_operation": op_index
,
1715 fail_on_empty
=False,
1719 unset_dict
[op_text
+ "locked_at"] = None
1720 unset_dict
["current_operation"] = None
1721 step
= "Loading " + target_id
1722 error_text
= self
._load
_vim
(target_id
)
1725 step
= "Checking connectivity"
1728 self
.my_vims
[target_id
].check_vim_connectivity()
1730 self
.my_vims
[target_id
].check_credentials()
1732 update_dict
["_admin.operationalState"] = "ENABLED"
1733 update_dict
["_admin.detailed-status"] = ""
1734 unset_dict
[op_text
+ "detailed-status"] = None
1735 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1739 except Exception as e
:
1740 error_text
= "{}: {}".format(step
, e
)
1741 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1744 if update_dict
or unset_dict
:
1746 update_dict
[op_text
+ "operationState"] = "FAILED"
1747 update_dict
[op_text
+ "detailed-status"] = error_text
1748 unset_dict
.pop(op_text
+ "detailed-status", None)
1749 update_dict
["_admin.operationalState"] = "ERROR"
1750 update_dict
["_admin.detailed-status"] = error_text
1753 update_dict
[op_text
+ "statusEnteredTime"] = now
1757 q_filter
={"_id": _id
},
1758 update_dict
=update_dict
,
1760 fail_on_empty
=False,
1764 self
._unload
_vim
(target_id
)
1766 def _reload_vim(self
, target_id
):
1767 if target_id
in self
.vim_targets
:
1768 self
._load
_vim
(target_id
)
1770 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1771 # just remove it to force load again next time it is needed
1772 self
.db_vims
.pop(target_id
, None)
1774 def _load_vim(self
, target_id
):
1776 Load or reload a vim_account, sdn_controller or wim_account.
1777 Read content from database, load the plugin if not loaded.
1778 In case of error loading the plugin, it load a failing VIM_connector
1779 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1780 :param target_id: Contains type:_id; where type can be 'vim', ...
1781 :return: None if ok, descriptive text if error
1783 target
, _
, _id
= target_id
.partition(":")
1793 step
= "Getting {}={} from db".format(target
, _id
)
1796 # TODO process for wim, sdnc, ...
1797 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1799 # if deep_get(vim, "config", "sdn-controller"):
1800 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1801 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1803 step
= "Decrypting password"
1804 schema_version
= vim
.get("schema_version")
1805 self
.db
.encrypt_decrypt_fields(
1808 fields
=("password", "secret"),
1809 schema_version
=schema_version
,
1812 self
._process
_vim
_config
(target_id
, vim
)
1815 plugin_name
= "rovim_" + vim
["vim_type"]
1816 step
= "Loading plugin '{}'".format(plugin_name
)
1817 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1818 step
= "Loading {}'".format(target_id
)
1819 self
.my_vims
[target_id
] = vim_module_conn(
1822 tenant_id
=vim
.get("vim_tenant_id"),
1823 tenant_name
=vim
.get("vim_tenant_name"),
1826 user
=vim
["vim_user"],
1827 passwd
=vim
["vim_password"],
1828 config
=vim
.get("config") or {},
1832 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
1833 step
= "Loading plugin '{}'".format(plugin_name
)
1834 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1835 step
= "Loading {}'".format(target_id
)
1837 wim_config
= wim
.pop("config", {}) or {}
1838 wim
["uuid"] = wim
["_id"]
1839 if "url" in wim
and "wim_url" not in wim
:
1840 wim
["wim_url"] = wim
["url"]
1841 elif "url" not in wim
and "wim_url" in wim
:
1842 wim
["url"] = wim
["wim_url"]
1845 wim_config
["dpid"] = wim
.pop("dpid")
1847 if wim
.get("switch_id"):
1848 wim_config
["switch_id"] = wim
.pop("switch_id")
1850 # wim, wim_account, config
1851 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1852 self
.db_vims
[target_id
] = vim
1853 self
.error_status
= None
1856 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1858 except Exception as e
:
1860 "Cannot load {} plugin={}: {} {}".format(
1861 target_id
, plugin_name
, step
, e
1865 self
.db_vims
[target_id
] = vim
or {}
1866 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1867 error_status
= "{} Error: {}".format(step
, e
)
1871 if target_id
not in self
.vim_targets
:
1872 self
.vim_targets
.append(target_id
)
1874 def _get_db_task(self
):
1876 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1881 if not self
.time_last_task_processed
:
1882 self
.time_last_task_processed
= now
1887 # Log RO tasks only when loglevel is DEBUG
1888 if self.logger.getEffectiveLevel() == logging.DEBUG:
1895 + str(self.task_locked_time)
1897 + "time_last_task_processed="
1898 + str(self.time_last_task_processed)
1904 locked
= self
.db
.set_one(
1907 "target_id": self
.vim_targets
,
1908 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1909 "locked_at.lt": now
- self
.task_locked_time
,
1910 "to_check_at.lt": self
.time_last_task_processed
,
1911 "to_check_at.gt": -1,
1913 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
1914 fail_on_empty
=False,
1919 ro_task
= self
.db
.get_one(
1922 "target_id": self
.vim_targets
,
1923 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1929 if self
.time_last_task_processed
== now
:
1930 self
.time_last_task_processed
= None
1933 self
.time_last_task_processed
= now
1934 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1936 except DbException
as e
:
1937 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
1938 except Exception as e
:
1939 self
.logger
.critical(
1940 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
1945 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1947 Determine if this task need to be done or superseded
1950 my_task
= ro_task
["tasks"][task_index
]
1951 task_id
= my_task
["task_id"]
1952 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
1953 "created_items", False
1956 self
.logger
.debug("Needed delete: {}".format(needed_delete
))
1957 if my_task
["status"] == "FAILED":
1958 return None, None # TODO need to be retry??
1961 for index
, task
in enumerate(ro_task
["tasks"]):
1962 if index
== task_index
or not task
:
1966 my_task
["target_record"] == task
["target_record"]
1967 and task
["action"] == "CREATE"
1970 db_update
["tasks.{}.status".format(index
)] = task
[
1973 elif task
["action"] == "CREATE" and task
["status"] not in (
1977 needed_delete
= False
1981 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
1983 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
1985 return "SUPERSEDED", None
1986 except Exception as e
:
1987 if not isinstance(e
, NsWorkerException
):
1988 self
.logger
.critical(
1989 "Unexpected exception at _delete_task task={}: {}".format(
1995 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
1997 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1999 Determine if this task need to create something at VIM
2002 my_task
= ro_task
["tasks"][task_index
]
2003 task_id
= my_task
["task_id"]
2006 if my_task
["status"] == "FAILED":
2007 return None, None # TODO need to be retry??
2008 elif my_task
["status"] == "SCHEDULED":
2009 # check if already created by another task
2010 for index
, task
in enumerate(ro_task
["tasks"]):
2011 if index
== task_index
or not task
:
2014 if task
["action"] == "CREATE" and task
["status"] not in (
2019 return task
["status"], "COPY_VIM_INFO"
2022 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2023 ro_task
, task_index
, task_depends
2025 # TODO update other CREATE tasks
2026 except Exception as e
:
2027 if not isinstance(e
, NsWorkerException
):
2029 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2032 task_status
= "FAILED"
2033 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2034 # TODO update ro_vim_item_update
2036 return task_status
, ro_vim_item_update
2040 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2042 Look for dependency task
2043 :param task_id: Can be one of
2044 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2045 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2046 3. task.task_id: "<action_id>:number"
2049 :return: database ro_task plus index of task
2052 task_id
.startswith("vim:")
2053 or task_id
.startswith("sdn:")
2054 or task_id
.startswith("wim:")
2056 target_id
, _
, task_id
= task_id
.partition(" ")
2058 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2059 ro_task_dependency
= self
.db
.get_one(
2061 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2062 fail_on_empty
=False,
2065 if ro_task_dependency
:
2066 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2067 if task
["target_record_id"] == task_id
:
2068 return ro_task_dependency
, task_index
2072 for task_index
, task
in enumerate(ro_task
["tasks"]):
2073 if task
and task
["task_id"] == task_id
:
2074 return ro_task
, task_index
2076 ro_task_dependency
= self
.db
.get_one(
2079 "tasks.ANYINDEX.task_id": task_id
,
2080 "tasks.ANYINDEX.target_record.ne": None,
2082 fail_on_empty
=False,
2085 self
.logger
.debug("ro_task_dependency={}".format(ro_task_dependency
))
2086 if ro_task_dependency
:
2087 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2088 if task
["task_id"] == task_id
:
2089 return ro_task_dependency
, task_index
2090 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2092 def update_vm_refresh(self
):
2093 """Enables the VM status updates if self.refresh_config.active parameter
2094 is not -1 and than updates the DB accordingly
2098 self
.logger
.debug("Checking if VM status update config")
2099 next_refresh
= time
.time()
2100 if self
.refresh_config
.active
== -1:
2103 next_refresh
+= self
.refresh_config
.active
2105 if next_refresh
!= -1:
2106 db_ro_task_update
= {}
2108 next_check_at
= now
+ (24 * 60 * 60)
2109 next_check_at
= min(next_check_at
, next_refresh
)
2110 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2111 db_ro_task_update
["to_check_at"] = next_check_at
2114 "Finding tasks which to be updated to enable VM status updates"
2116 refresh_tasks
= self
.db
.get_list(
2119 "tasks.status": "DONE",
2120 "to_check_at.lt": 0,
2123 self
.logger
.debug("Updating tasks to change the to_check_at status")
2124 for task
in refresh_tasks
:
2131 update_dict
=db_ro_task_update
,
2135 except Exception as e
:
2136 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2138 def _process_pending_tasks(self
, ro_task
):
2139 ro_task_id
= ro_task
["_id"]
2142 next_check_at
= now
+ (24 * 60 * 60)
2143 db_ro_task_update
= {}
2145 def _update_refresh(new_status
):
2146 # compute next_refresh
2148 nonlocal next_check_at
2149 nonlocal db_ro_task_update
2152 next_refresh
= time
.time()
2154 if task
["item"] in ("image", "flavor"):
2155 next_refresh
+= self
.refresh_config
.image
2156 elif new_status
== "BUILD":
2157 next_refresh
+= self
.refresh_config
.build
2158 elif new_status
== "DONE":
2159 if self
.refresh_config
.active
== -1:
2162 next_refresh
+= self
.refresh_config
.active
2164 next_refresh
+= self
.refresh_config
.error
2166 next_check_at
= min(next_check_at
, next_refresh
)
2167 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2168 ro_task
["vim_info"]["refresh_at"] = next_refresh
2172 # Log RO tasks only when loglevel is DEBUG
2173 if self.logger.getEffectiveLevel() == logging.DEBUG:
2174 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2176 # Check if vim status refresh is enabled again
2177 self
.update_vm_refresh()
2178 # 0: get task_status_create
2180 task_status_create
= None
2184 for t
in ro_task
["tasks"]
2186 and t
["action"] == "CREATE"
2187 and t
["status"] in ("BUILD", "DONE")
2193 task_status_create
= task_create
["status"]
2195 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2196 for task_action
in ("DELETE", "CREATE", "EXEC"):
2197 db_vim_update
= None
2200 for task_index
, task
in enumerate(ro_task
["tasks"]):
2202 continue # task deleted
2205 target_update
= None
2209 task_action
in ("DELETE", "EXEC")
2210 and task
["status"] not in ("SCHEDULED", "BUILD")
2212 or task
["action"] != task_action
2214 task_action
== "CREATE"
2215 and task
["status"] in ("FINISHED", "SUPERSEDED")
2220 task_path
= "tasks.{}.status".format(task_index
)
2222 db_vim_info_update
= None
2224 if task
["status"] == "SCHEDULED":
2225 # check if tasks that this depends on have been completed
2226 dependency_not_completed
= False
2228 for dependency_task_id
in task
.get("depends_on") or ():
2231 dependency_task_index
,
2232 ) = self
._get
_dependency
(
2233 dependency_task_id
, target_id
=ro_task
["target_id"]
2235 dependency_task
= dependency_ro_task
["tasks"][
2236 dependency_task_index
2239 "dependency_ro_task={} dependency_task_index={}".format(
2240 dependency_ro_task
, dependency_task_index
2244 if dependency_task
["status"] == "SCHEDULED":
2245 dependency_not_completed
= True
2246 next_check_at
= min(
2247 next_check_at
, dependency_ro_task
["to_check_at"]
2249 # must allow dependent task to be processed first
2250 # to do this set time after last_task_processed
2251 next_check_at
= max(
2252 self
.time_last_task_processed
, next_check_at
2255 elif dependency_task
["status"] == "FAILED":
2256 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2259 dependency_task
["action"],
2260 dependency_task
["item"],
2262 dependency_ro_task
["vim_info"].get(
2267 "task={} {}".format(task
["task_id"], error_text
)
2269 raise NsWorkerException(error_text
)
2271 task_depends
[dependency_task_id
] = dependency_ro_task
[
2275 "TASK-{}".format(dependency_task_id
)
2276 ] = dependency_ro_task
["vim_info"]["vim_id"]
2278 if dependency_not_completed
:
2279 self
.logger
.warning(
2280 "DEPENDENCY NOT COMPLETED {}".format(
2281 dependency_ro_task
["vim_info"]["vim_id"]
2284 # TODO set at vim_info.vim_details that it is waiting
2287 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2288 # the task of renew this locking. It will update database locket_at periodically
2290 lock_object
= LockRenew
.add_lock_object(
2291 "ro_tasks", ro_task
, self
2294 if task
["action"] == "DELETE":
2298 ) = self
._delete
_task
(
2299 ro_task
, task_index
, task_depends
, db_ro_task_update
2302 "FINISHED" if new_status
== "DONE" else new_status
2304 # ^with FINISHED instead of DONE it will not be refreshing
2306 if new_status
in ("FINISHED", "SUPERSEDED"):
2307 target_update
= "DELETE"
2308 elif task
["action"] == "EXEC":
2313 ) = self
.item2class
[task
["item"]].exec(
2314 ro_task
, task_index
, task_depends
2317 "FINISHED" if new_status
== "DONE" else new_status
2319 # ^with FINISHED instead of DONE it will not be refreshing
2322 # load into database the modified db_task_update "retries" and "next_retry"
2323 if db_task_update
.get("retries"):
2325 "tasks.{}.retries".format(task_index
)
2326 ] = db_task_update
["retries"]
2328 next_check_at
= time
.time() + db_task_update
.get(
2331 target_update
= None
2332 elif task
["action"] == "CREATE":
2333 if task
["status"] == "SCHEDULED":
2334 if task_status_create
:
2335 new_status
= task_status_create
2336 target_update
= "COPY_VIM_INFO"
2338 new_status
, db_vim_info_update
= self
.item2class
[
2340 ].new(ro_task
, task_index
, task_depends
)
2341 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2342 _update_refresh(new_status
)
2344 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2345 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2349 ) = self
.item2class
[
2352 _update_refresh(new_status
)
2354 # The refresh is updated to avoid set the value of "refresh_at" to
2355 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2356 # because it can happen that in this case the task is never processed
2357 _update_refresh(task
["status"])
2359 except Exception as e
:
2360 new_status
= "FAILED"
2361 db_vim_info_update
= {
2362 "vim_status": "VIM_ERROR",
2363 "vim_message": str(e
),
2367 e
, (NsWorkerException
, vimconn
.VimConnException
)
2370 "Unexpected exception at _delete_task task={}: {}".format(
2377 if db_vim_info_update
:
2378 db_vim_update
= db_vim_info_update
.copy()
2379 db_ro_task_update
.update(
2382 for k
, v
in db_vim_info_update
.items()
2385 ro_task
["vim_info"].update(db_vim_info_update
)
2388 if task_action
== "CREATE":
2389 task_status_create
= new_status
2390 db_ro_task_update
[task_path
] = new_status
2392 if target_update
or db_vim_update
:
2393 if target_update
== "DELETE":
2394 self
._update
_target
(task
, None)
2395 elif target_update
== "COPY_VIM_INFO":
2396 self
._update
_target
(task
, ro_task
["vim_info"])
2398 self
._update
_target
(task
, db_vim_update
)
2400 except Exception as e
:
2402 isinstance(e
, DbException
)
2403 and e
.http_code
== HTTPStatus
.NOT_FOUND
2405 # if the vnfrs or nsrs has been removed from database, this task must be removed
2407 "marking to delete task={}".format(task
["task_id"])
2409 self
.tasks_to_delete
.append(task
)
2412 "Unexpected exception at _update_target task={}: {}".format(
2418 locked_at
= ro_task
["locked_at"]
2422 lock_object
["locked_at"],
2423 lock_object
["locked_at"] + self
.task_locked_time
,
2425 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2426 # contain exactly locked_at + self.task_locked_time
2427 LockRenew
.remove_lock_object(lock_object
)
2430 "_id": ro_task
["_id"],
2431 "to_check_at": ro_task
["to_check_at"],
2432 "locked_at": locked_at
,
2434 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2435 # outside this task (by ro_nbi) do not update it
2436 db_ro_task_update
["locked_by"] = None
2437 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2438 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2439 db_ro_task_update
["modified_at"] = now
2440 db_ro_task_update
["to_check_at"] = next_check_at
2443 # Log RO tasks only when loglevel is DEBUG
2444 if self.logger.getEffectiveLevel() == logging.DEBUG:
2445 db_ro_task_update_log = db_ro_task_update.copy()
2446 db_ro_task_update_log["_id"] = q_filter["_id"]
2447 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2450 if not self
.db
.set_one(
2452 update_dict
=db_ro_task_update
,
2454 fail_on_empty
=False,
2456 del db_ro_task_update
["to_check_at"]
2457 del q_filter
["to_check_at"]
2459 # Log RO tasks only when loglevel is DEBUG
2460 if self.logger.getEffectiveLevel() == logging.DEBUG:
2463 db_ro_task_update_log,
2466 "SET_TASK " + str(q_filter),
2472 update_dict
=db_ro_task_update
,
2475 except DbException
as e
:
2477 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2479 except Exception as e
:
2481 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2484 def _update_target(self
, task
, ro_vim_item_update
):
2485 table
, _
, temp
= task
["target_record"].partition(":")
2486 _id
, _
, path_vim_status
= temp
.partition(":")
2487 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2488 path_item
= path_item
[: path_item
.rfind(".")]
2489 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2490 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2492 if ro_vim_item_update
:
2494 path_vim_status
+ "." + k
: v
2495 for k
, v
in ro_vim_item_update
.items()
2504 "interfaces_backup",
2508 if path_vim_status
.startswith("vdur."):
2509 # for backward compatibility, add vdur.name apart from vdur.vim_name
2510 if ro_vim_item_update
.get("vim_name"):
2511 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2513 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2514 if ro_vim_item_update
.get("vim_id"):
2515 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2517 # update general status
2518 if ro_vim_item_update
.get("vim_status"):
2519 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2523 if ro_vim_item_update
.get("interfaces"):
2524 path_interfaces
= path_item
+ ".interfaces"
2526 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2530 path_interfaces
+ ".{}.".format(i
) + k
: v
2531 for k
, v
in iface
.items()
2532 if k
in ("vlan", "compute_node", "pci")
2536 # put ip_address and mac_address with ip-address and mac-address
2537 if iface
.get("ip_address"):
2539 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2540 ] = iface
["ip_address"]
2542 if iface
.get("mac_address"):
2544 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2545 ] = iface
["mac_address"]
2547 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2548 update_dict
["ip-address"] = iface
.get("ip_address").split(
2552 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2553 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2557 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2559 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2560 if ro_vim_item_update
.get("interfaces"):
2561 search_key
= path_vim_status
+ ".interfaces"
2562 if update_dict
.get(search_key
):
2563 interfaces_backup_update
= {
2564 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
2569 q_filter
={"_id": _id
},
2570 update_dict
=interfaces_backup_update
,
2574 update_dict
= {path_item
+ ".status": "DELETED"}
2577 q_filter
={"_id": _id
},
2578 update_dict
=update_dict
,
2579 unset
={path_vim_status
: None},
2582 def _process_delete_db_tasks(self
):
2584 Delete task from database because vnfrs or nsrs or both have been deleted
2585 :return: None. Uses and modify self.tasks_to_delete
2587 while self
.tasks_to_delete
:
2588 task
= self
.tasks_to_delete
[0]
2589 vnfrs_deleted
= None
2590 nsr_id
= task
["nsr_id"]
2592 if task
["target_record"].startswith("vnfrs:"):
2593 # check if nsrs is present
2594 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2595 vnfrs_deleted
= task
["target_record"].split(":")[1]
2598 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2599 except Exception as e
:
2601 "Error deleting task={}: {}".format(task
["task_id"], e
)
2603 self
.tasks_to_delete
.pop(0)
2606 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2608 Static method because it is called from osm_ng_ro.ns
2609 :param db: instance of database to use
2610 :param nsr_id: affected nsrs id
2611 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2612 :return: None, exception is fails
2615 for retry
in range(retries
):
2616 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2620 for ro_task
in ro_tasks
:
2622 to_delete_ro_task
= True
2624 for index
, task
in enumerate(ro_task
["tasks"]):
2627 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2629 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2631 db_update
["tasks.{}".format(index
)] = None
2633 # used by other nsr, ro_task cannot be deleted
2634 to_delete_ro_task
= False
2636 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2637 if to_delete_ro_task
:
2641 "_id": ro_task
["_id"],
2642 "modified_at": ro_task
["modified_at"],
2644 fail_on_empty
=False,
2648 db_update
["modified_at"] = now
2652 "_id": ro_task
["_id"],
2653 "modified_at": ro_task
["modified_at"],
2655 update_dict
=db_update
,
2656 fail_on_empty
=False,
2662 raise NsWorkerException("Exceeded {} retries".format(retries
))
2666 self
.logger
.info("Starting")
2668 # step 1: get commands from queue
2670 if self
.vim_targets
:
2671 task
= self
.task_queue
.get(block
=False)
2674 self
.logger
.debug("enters in idle state")
2676 task
= self
.task_queue
.get(block
=True)
2679 if task
[0] == "terminate":
2681 elif task
[0] == "load_vim":
2682 self
.logger
.info("order to load vim {}".format(task
[1]))
2683 self
._load
_vim
(task
[1])
2684 elif task
[0] == "unload_vim":
2685 self
.logger
.info("order to unload vim {}".format(task
[1]))
2686 self
._unload
_vim
(task
[1])
2687 elif task
[0] == "reload_vim":
2688 self
._reload
_vim
(task
[1])
2689 elif task
[0] == "check_vim":
2690 self
.logger
.info("order to check vim {}".format(task
[1]))
2691 self
._check
_vim
(task
[1])
2693 except Exception as e
:
2694 if isinstance(e
, queue
.Empty
):
2697 self
.logger
.critical(
2698 "Error processing task: {}".format(e
), exc_info
=True
2701 # step 2: process pending_tasks, delete not needed tasks
2703 if self
.tasks_to_delete
:
2704 self
._process
_delete
_db
_tasks
()
2707 # Log RO tasks only when loglevel is DEBUG
2708 if self.logger.getEffectiveLevel() == logging.DEBUG:
2709 _ = self._get_db_all_tasks()
2711 ro_task
= self
._get
_db
_task
()
2713 self
.logger
.debug("Task to process: {}".format(ro_task
))
2715 self
._process
_pending
_tasks
(ro_task
)
2719 except Exception as e
:
2720 self
.logger
.critical(
2721 "Unexpected exception at run: " + str(e
), exc_info
=True
2724 self
.logger
.info("Finishing")