1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
32 from shutil
import rmtree
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
, vimconn
43 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
44 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
47 __author__
= "Alfonso Tierno"
48 __date__
= "$28-Sep-2017 12:07:15$"
51 def deep_get(target_dict
, *args
, **kwargs
):
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
61 if not isinstance(target_dict
, dict) or key
not in target_dict
:
62 return kwargs
.get("default")
63 target_dict
= target_dict
[key
]
67 class NsWorkerException(Exception):
71 class FailingConnector
:
72 def __init__(self
, error_msg
):
73 self
.error_msg
= error_msg
75 for method
in dir(vimconn
.VimConnector
):
78 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
81 for method
in dir(sdnconn
.SdnConnectorBase
):
84 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
88 class NsWorkerExceptionNotFound(NsWorkerException
):
92 class VimInteractionBase
:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
96 def __init__(self
, db
, my_vims
, db_vims
, logger
):
99 self
.my_vims
= my_vims
100 self
.db_vims
= db_vims
102 def new(self
, ro_task
, task_index
, task_depends
):
105 def refresh(self
, ro_task
):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
112 def delete(self
, ro_task
, task_index
):
113 """skip calling VIM to delete image. Assumes ok"""
116 def exec(self
, ro_task
, task_index
, task_depends
):
117 return "DONE", None, None
120 class VimInteractionNet(VimInteractionBase
):
121 def new(self
, ro_task
, task_index
, task_depends
):
123 task
= ro_task
["tasks"][task_index
]
124 task_id
= task
["task_id"]
127 target_vim
= self
.my_vims
[ro_task
["target_id"]]
129 mgmtnet_defined_in_vim
= False
133 if task
.get("find_params"):
134 # if management, get configuration of VIM
135 if task
["find_params"].get("filter_dict"):
136 vim_filter
= task
["find_params"]["filter_dict"]
138 elif task
["find_params"].get("mgmt"):
141 self
.db_vims
[ro_task
["target_id"]],
143 "management_network_id",
145 mgmtnet_defined_in_vim
= True
147 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
148 "management_network_id"
152 self
.db_vims
[ro_task
["target_id"]],
154 "management_network_name",
156 mgmtnet_defined_in_vim
= True
158 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
159 "management_network_name"
163 vim_filter
= {"name": task
["find_params"]["name"]}
165 raise NsWorkerExceptionNotFound(
166 "Invalid find_params for new_net {}".format(task
["find_params"])
169 vim_nets
= target_vim
.get_network_list(vim_filter
)
170 if not vim_nets
and not task
.get("params"):
171 # If there is mgmt-network in the descriptor,
172 # there is no mapping of that network to a VIM network in the descriptor,
173 # also there is no mapping in the "--config" parameter or at VIM creation;
174 # that mgmt-network will be created.
175 if mgmtnet
and not mgmtnet_defined_in_vim
:
177 vim_filter
.get("name")
178 if vim_filter
.get("name")
179 else vim_filter
.get("id")[:16]
181 vim_net_id
, created_items
= target_vim
.new_network(
185 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
189 raise NsWorkerExceptionNotFound(
190 "Network not found with this criteria: '{}'".format(
191 task
.get("find_params")
194 elif len(vim_nets
) > 1:
195 raise NsWorkerException(
196 "More than one network found with this criteria: '{}'".format(
202 vim_net_id
= vim_nets
[0]["id"]
205 params
= task
["params"]
206 vim_net_id
, created_items
= target_vim
.new_network(**params
)
209 ro_vim_item_update
= {
210 "vim_id": vim_net_id
,
211 "vim_status": "BUILD",
213 "created_items": created_items
,
218 "task={} {} new-net={} created={}".format(
219 task_id
, ro_task
["target_id"], vim_net_id
, created
223 return "BUILD", ro_vim_item_update
224 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
226 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
228 ro_vim_item_update
= {
229 "vim_status": "VIM_ERROR",
231 "vim_message": str(e
),
234 return "FAILED", ro_vim_item_update
236 def refresh(self
, ro_task
):
237 """Call VIM to get network status"""
238 ro_task_id
= ro_task
["_id"]
239 target_vim
= self
.my_vims
[ro_task
["target_id"]]
240 vim_id
= ro_task
["vim_info"]["vim_id"]
241 net_to_refresh_list
= [vim_id
]
244 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
245 vim_info
= vim_dict
[vim_id
]
247 if vim_info
["status"] == "ACTIVE":
249 elif vim_info
["status"] == "BUILD":
250 task_status
= "BUILD"
252 task_status
= "FAILED"
253 except vimconn
.VimConnException
as e
:
254 # Mark all tasks at VIM_ERROR status
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id
, ro_task
["target_id"], vim_id
, e
260 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
261 task_status
= "FAILED"
263 ro_vim_item_update
= {}
264 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
265 ro_vim_item_update
["vim_status"] = vim_info
["status"]
267 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
268 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
270 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
272 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
273 elif vim_info
["status"] == "DELETED":
274 ro_vim_item_update
["vim_id"] = None
275 ro_vim_item_update
["vim_message"] = "Deleted externally"
277 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
278 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
280 if ro_vim_item_update
:
282 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task
["target_id"],
286 ro_vim_item_update
.get("vim_status"),
287 ro_vim_item_update
.get("vim_message")
288 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
293 return task_status
, ro_vim_item_update
295 def delete(self
, ro_task
, task_index
):
296 task
= ro_task
["tasks"][task_index
]
297 task_id
= task
["task_id"]
298 net_vim_id
= ro_task
["vim_info"]["vim_id"]
299 ro_vim_item_update_ok
= {
300 "vim_status": "DELETED",
302 "vim_message": "DELETED",
307 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
308 target_vim
= self
.my_vims
[ro_task
["target_id"]]
309 target_vim
.delete_network(
310 net_vim_id
, ro_task
["vim_info"]["created_items"]
312 except vimconn
.VimConnNotFoundException
:
313 ro_vim_item_update_ok
["vim_message"] = "already deleted"
314 except vimconn
.VimConnException
as e
:
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
320 ro_vim_item_update
= {
321 "vim_status": "VIM_ERROR",
322 "vim_message": "Error while deleting: {}".format(e
),
325 return "FAILED", ro_vim_item_update
328 "task={} {} del-net={} {}".format(
330 ro_task
["target_id"],
332 ro_vim_item_update_ok
.get("vim_message", ""),
336 return "DONE", ro_vim_item_update_ok
339 class VimInteractionVdu(VimInteractionBase
):
340 max_retries_inject_ssh_key
= 20 # 20 times
341 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
343 def new(self
, ro_task
, task_index
, task_depends
):
344 task
= ro_task
["tasks"][task_index
]
345 task_id
= task
["task_id"]
348 target_vim
= self
.my_vims
[ro_task
["target_id"]]
352 params
= task
["params"]
353 params_copy
= deepcopy(params
)
354 net_list
= params_copy
["net_list"]
357 # change task_id into network_id
358 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
359 network_id
= task_depends
[net
["net_id"]]
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net
["net_id"])
367 net
["net_id"] = network_id
369 if params_copy
["image_id"].startswith("TASK-"):
370 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
372 if params_copy
["flavor_id"].startswith("TASK-"):
373 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
375 affinity_group_list
= params_copy
["affinity_group_list"]
376 for affinity_group
in affinity_group_list
:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group
and affinity_group
[
380 ].startswith("TASK-"):
381 affinity_group_id
= task_depends
[
382 affinity_group
["affinity_group_id"]
385 if not affinity_group_id
:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group
["affinity_group_id"])
390 affinity_group
["affinity_group_id"] = affinity_group_id
392 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
393 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
395 # add to created items previous_created_volumes (healing)
396 if task
.get("previous_created_volumes"):
397 for k
, v
in task
["previous_created_volumes"].items():
400 ro_vim_item_update
= {
402 "vim_status": "BUILD",
404 "created_items": created_items
,
407 "interfaces_vim_ids": interfaces
,
409 "interfaces_backup": [],
412 "task={} {} new-vm={} created={}".format(
413 task_id
, ro_task
["target_id"], vim_vm_id
, created
417 return "BUILD", ro_vim_item_update
418 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
419 self
.logger
.debug(traceback
.format_exc())
421 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
423 ro_vim_item_update
= {
424 "vim_status": "VIM_ERROR",
426 "vim_message": str(e
),
429 return "FAILED", ro_vim_item_update
431 def delete(self
, ro_task
, task_index
):
432 task
= ro_task
["tasks"][task_index
]
433 task_id
= task
["task_id"]
434 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
435 ro_vim_item_update_ok
= {
436 "vim_status": "DELETED",
438 "vim_message": "DELETED",
444 "delete_vminstance: vm_vim_id={} created_items={}".format(
445 vm_vim_id
, ro_task
["vim_info"]["created_items"]
448 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
449 target_vim
= self
.my_vims
[ro_task
["target_id"]]
450 target_vim
.delete_vminstance(
452 ro_task
["vim_info"]["created_items"],
453 ro_task
["vim_info"].get("volumes_to_hold", []),
455 except vimconn
.VimConnNotFoundException
:
456 ro_vim_item_update_ok
["vim_message"] = "already deleted"
457 except vimconn
.VimConnException
as e
:
459 "ro_task={} vim={} del-vm={}: {}".format(
460 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
463 ro_vim_item_update
= {
464 "vim_status": "VIM_ERROR",
465 "vim_message": "Error while deleting: {}".format(e
),
468 return "FAILED", ro_vim_item_update
471 "task={} {} del-vm={} {}".format(
473 ro_task
["target_id"],
475 ro_vim_item_update_ok
.get("vim_message", ""),
479 return "DONE", ro_vim_item_update_ok
481 def refresh(self
, ro_task
):
482 """Call VIM to get vm status"""
483 ro_task_id
= ro_task
["_id"]
484 target_vim
= self
.my_vims
[ro_task
["target_id"]]
485 vim_id
= ro_task
["vim_info"]["vim_id"]
490 vm_to_refresh_list
= [vim_id
]
492 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
493 vim_info
= vim_dict
[vim_id
]
495 if vim_info
["status"] == "ACTIVE":
497 elif vim_info
["status"] == "BUILD":
498 task_status
= "BUILD"
500 task_status
= "FAILED"
502 # try to load and parse vim_information
504 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
505 if vim_info_info
.get("name"):
506 vim_info
["name"] = vim_info_info
["name"]
507 except Exception as vim_info_error
:
508 self
.logger
.exception(
509 f
"{vim_info_error} occured while getting the vim_info from yaml"
511 except vimconn
.VimConnException
as e
:
512 # Mark all tasks at VIM_ERROR status
514 "ro_task={} vim={} get-vm={}: {}".format(
515 ro_task_id
, ro_task
["target_id"], vim_id
, e
518 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
519 task_status
= "FAILED"
521 ro_vim_item_update
= {}
523 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
525 if vim_info
.get("interfaces"):
526 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
530 for iface
in vim_info
["interfaces"]
531 if vim_iface_id
== iface
["vim_interface_id"]
536 # iface.pop("vim_info", None)
537 vim_interfaces
.append(iface
)
541 for t
in ro_task
["tasks"]
542 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
544 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
545 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
549 mgmt_vdu_iface
= task_create
.get(
550 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
553 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
555 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
556 ro_vim_item_update
["interfaces"] = vim_interfaces
558 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
559 ro_vim_item_update
["vim_status"] = vim_info
["status"]
561 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
562 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
564 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
565 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
566 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
567 elif vim_info
["status"] == "DELETED":
568 ro_vim_item_update
["vim_id"] = None
569 ro_vim_item_update
["vim_message"] = "Deleted externally"
571 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
572 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
574 if ro_vim_item_update
:
576 "ro_task={} {} get-vm={}: status={} {}".format(
578 ro_task
["target_id"],
580 ro_vim_item_update
.get("vim_status"),
581 ro_vim_item_update
.get("vim_message")
582 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
587 return task_status
, ro_vim_item_update
589 def exec(self
, ro_task
, task_index
, task_depends
):
590 task
= ro_task
["tasks"][task_index
]
591 task_id
= task
["task_id"]
592 target_vim
= self
.my_vims
[ro_task
["target_id"]]
593 db_task_update
= {"retries": 0}
594 retries
= task
.get("retries", 0)
597 params
= task
["params"]
598 params_copy
= deepcopy(params
)
599 params_copy
["ro_key"] = self
.db
.decrypt(
600 params_copy
.pop("private_key"),
601 params_copy
.pop("schema_version"),
602 params_copy
.pop("salt"),
604 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
605 target_vim
.inject_user_key(**params_copy
)
607 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
614 ) # params_copy["key"]
615 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
618 self
.logger
.debug(traceback
.format_exc())
619 if retries
< self
.max_retries_inject_ssh_key
:
625 "next_retry": self
.time_retries_inject_ssh_key
,
630 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
632 ro_vim_item_update
= {"vim_message": str(e
)}
634 return "FAILED", ro_vim_item_update
, db_task_update
637 class VimInteractionImage(VimInteractionBase
):
638 def new(self
, ro_task
, task_index
, task_depends
):
639 task
= ro_task
["tasks"][task_index
]
640 task_id
= task
["task_id"]
643 target_vim
= self
.my_vims
[ro_task
["target_id"]]
647 if task
.get("find_params"):
648 vim_images
= target_vim
.get_image_list(**task
["find_params"])
651 raise NsWorkerExceptionNotFound(
652 "Image not found with this criteria: '{}'".format(
656 elif len(vim_images
) > 1:
657 raise NsWorkerException(
658 "More than one image found with this criteria: '{}'".format(
663 vim_image_id
= vim_images
[0]["id"]
665 ro_vim_item_update
= {
666 "vim_id": vim_image_id
,
667 "vim_status": "DONE",
669 "created_items": created_items
,
674 "task={} {} new-image={} created={}".format(
675 task_id
, ro_task
["target_id"], vim_image_id
, created
679 return "DONE", ro_vim_item_update
680 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
682 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
684 ro_vim_item_update
= {
685 "vim_status": "VIM_ERROR",
687 "vim_message": str(e
),
690 return "FAILED", ro_vim_item_update
693 class VimInteractionFlavor(VimInteractionBase
):
694 def delete(self
, ro_task
, task_index
):
695 task
= ro_task
["tasks"][task_index
]
696 task_id
= task
["task_id"]
697 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
698 ro_vim_item_update_ok
= {
699 "vim_status": "DELETED",
701 "vim_message": "DELETED",
707 target_vim
= self
.my_vims
[ro_task
["target_id"]]
708 target_vim
.delete_flavor(flavor_vim_id
)
709 except vimconn
.VimConnNotFoundException
:
710 ro_vim_item_update_ok
["vim_message"] = "already deleted"
711 except vimconn
.VimConnException
as e
:
713 "ro_task={} vim={} del-flavor={}: {}".format(
714 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
717 ro_vim_item_update
= {
718 "vim_status": "VIM_ERROR",
719 "vim_message": "Error while deleting: {}".format(e
),
722 return "FAILED", ro_vim_item_update
725 "task={} {} del-flavor={} {}".format(
727 ro_task
["target_id"],
729 ro_vim_item_update_ok
.get("vim_message", ""),
733 return "DONE", ro_vim_item_update_ok
735 def new(self
, ro_task
, task_index
, task_depends
):
736 task
= ro_task
["tasks"][task_index
]
737 task_id
= task
["task_id"]
740 target_vim
= self
.my_vims
[ro_task
["target_id"]]
746 if task
.get("find_params"):
748 flavor_data
= task
["find_params"]["flavor_data"]
749 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
750 except vimconn
.VimConnNotFoundException
:
751 self
.logger
.exception("VimConnNotFoundException occured.")
753 if not vim_flavor_id
and task
.get("params"):
755 flavor_data
= task
["params"]["flavor_data"]
756 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
759 ro_vim_item_update
= {
760 "vim_id": vim_flavor_id
,
761 "vim_status": "DONE",
763 "created_items": created_items
,
768 "task={} {} new-flavor={} created={}".format(
769 task_id
, ro_task
["target_id"], vim_flavor_id
, created
773 return "DONE", ro_vim_item_update
774 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
776 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
778 ro_vim_item_update
= {
779 "vim_status": "VIM_ERROR",
781 "vim_message": str(e
),
784 return "FAILED", ro_vim_item_update
787 class VimInteractionAffinityGroup(VimInteractionBase
):
788 def delete(self
, ro_task
, task_index
):
789 task
= ro_task
["tasks"][task_index
]
790 task_id
= task
["task_id"]
791 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
792 ro_vim_item_update_ok
= {
793 "vim_status": "DELETED",
795 "vim_message": "DELETED",
800 if affinity_group_vim_id
:
801 target_vim
= self
.my_vims
[ro_task
["target_id"]]
802 target_vim
.delete_affinity_group(affinity_group_vim_id
)
803 except vimconn
.VimConnNotFoundException
:
804 ro_vim_item_update_ok
["vim_message"] = "already deleted"
805 except vimconn
.VimConnException
as e
:
807 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
808 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
811 ro_vim_item_update
= {
812 "vim_status": "VIM_ERROR",
813 "vim_message": "Error while deleting: {}".format(e
),
816 return "FAILED", ro_vim_item_update
819 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
821 ro_task
["target_id"],
822 affinity_group_vim_id
,
823 ro_vim_item_update_ok
.get("vim_message", ""),
827 return "DONE", ro_vim_item_update_ok
829 def new(self
, ro_task
, task_index
, task_depends
):
830 task
= ro_task
["tasks"][task_index
]
831 task_id
= task
["task_id"]
834 target_vim
= self
.my_vims
[ro_task
["target_id"]]
837 affinity_group_vim_id
= None
838 affinity_group_data
= None
840 if task
.get("params"):
841 affinity_group_data
= task
["params"].get("affinity_group_data")
843 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
845 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
846 "vim-affinity-group-id"
848 affinity_group_vim_id
= target_vim
.get_affinity_group(
849 param_affinity_group_id
851 except vimconn
.VimConnNotFoundException
:
853 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
854 "could not be found at VIM. Creating a new one.".format(
855 task_id
, ro_task
["target_id"], param_affinity_group_id
859 if not affinity_group_vim_id
and affinity_group_data
:
860 affinity_group_vim_id
= target_vim
.new_affinity_group(
865 ro_vim_item_update
= {
866 "vim_id": affinity_group_vim_id
,
867 "vim_status": "DONE",
869 "created_items": created_items
,
874 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
875 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
879 return "DONE", ro_vim_item_update
880 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
882 "task={} vim={} new-affinity-or-anti-affinity-group:"
883 " {}".format(task_id
, ro_task
["target_id"], e
)
885 ro_vim_item_update
= {
886 "vim_status": "VIM_ERROR",
888 "vim_message": str(e
),
891 return "FAILED", ro_vim_item_update
894 class VimInteractionUpdateVdu(VimInteractionBase
):
895 def exec(self
, ro_task
, task_index
, task_depends
):
896 task
= ro_task
["tasks"][task_index
]
897 task_id
= task
["task_id"]
898 db_task_update
= {"retries": 0}
901 target_vim
= self
.my_vims
[ro_task
["target_id"]]
904 if task
.get("params"):
905 vim_vm_id
= task
["params"].get("vim_vm_id")
906 action
= task
["params"].get("action")
907 context
= {action
: action
}
908 target_vim
.action_vminstance(vim_vm_id
, context
)
910 ro_vim_item_update
= {
912 "vim_status": "DONE",
914 "created_items": created_items
,
919 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
921 return "DONE", ro_vim_item_update
, db_task_update
922 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
924 "task={} vim={} VM Migration:"
925 " {}".format(task_id
, ro_task
["target_id"], e
)
927 ro_vim_item_update
= {
928 "vim_status": "VIM_ERROR",
930 "vim_message": str(e
),
933 return "FAILED", ro_vim_item_update
, db_task_update
936 class VimInteractionSdnNet(VimInteractionBase
):
938 def _match_pci(port_pci
, mapping
):
940 Check if port_pci matches with mapping
941 mapping can have brackets to indicate that several chars are accepted. e.g
942 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
943 :param port_pci: text
944 :param mapping: text, can contain brackets to indicate several chars are available
945 :return: True if matches, False otherwise
947 if not port_pci
or not mapping
:
949 if port_pci
== mapping
:
955 bracket_start
= mapping
.find("[", mapping_index
)
957 if bracket_start
== -1:
960 bracket_end
= mapping
.find("]", bracket_start
)
961 if bracket_end
== -1:
964 length
= bracket_start
- mapping_index
967 and port_pci
[pci_index
: pci_index
+ length
]
968 != mapping
[mapping_index
:bracket_start
]
973 port_pci
[pci_index
+ length
]
974 not in mapping
[bracket_start
+ 1 : bracket_end
]
978 pci_index
+= length
+ 1
979 mapping_index
= bracket_end
+ 1
981 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
986 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
988 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
989 :param vim_account_id:
994 for vld
in vlds_to_connect
:
995 table
, _
, db_id
= vld
.partition(":")
996 db_id
, _
, vld
= db_id
.partition(":")
997 _
, _
, vld_id
= vld
.partition(".")
1000 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
1001 iface_key
= "vnf-vld-id"
1002 else: # table == "nsrs"
1003 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1004 iface_key
= "ns-vld-id"
1006 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1008 for db_vnfr
in db_vnfrs
:
1009 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1010 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1011 if interface
.get(iface_key
) == vld_id
and interface
.get(
1013 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1015 interface_
= interface
.copy()
1016 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1017 db_vnfr
["_id"], vdu_index
, iface_index
1020 if vdur
.get("status") == "ERROR":
1021 interface_
["status"] = "ERROR"
1023 interfaces
.append(interface_
)
1027 def refresh(self
, ro_task
):
1028 # look for task create
1029 task_create_index
, _
= next(
1031 for i_t
in enumerate(ro_task
["tasks"])
1033 and i_t
[1]["action"] == "CREATE"
1034 and i_t
[1]["status"] != "FINISHED"
1037 return self
.new(ro_task
, task_create_index
, None)
1039 def new(self
, ro_task
, task_index
, task_depends
):
1041 task
= ro_task
["tasks"][task_index
]
1042 task_id
= task
["task_id"]
1043 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1045 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1047 created_items
= ro_task
["vim_info"].get("created_items")
1048 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1049 new_connected_ports
= []
1050 last_update
= ro_task
["vim_info"].get("last_update", 0)
1051 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1053 created
= ro_task
["vim_info"].get("created", False)
1057 params
= task
["params"]
1058 vlds_to_connect
= params
["vlds"]
1059 associated_vim
= params
["target_vim"]
1060 # external additional ports
1061 additional_ports
= params
.get("sdn-ports") or ()
1062 _
, _
, vim_account_id
= associated_vim
.partition(":")
1065 # get associated VIM
1066 if associated_vim
not in self
.db_vims
:
1067 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1068 "vim_accounts", {"_id": vim_account_id
}
1071 db_vim
= self
.db_vims
[associated_vim
]
1073 # look for ports to connect
1074 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1078 pending_ports
= error_ports
= 0
1080 sdn_need_update
= False
1083 vlan_used
= port
.get("vlan") or vlan_used
1085 # TODO. Do not connect if already done
1086 if not port
.get("compute_node") or not port
.get("pci"):
1087 if port
.get("status") == "ERROR":
1094 compute_node_mappings
= next(
1097 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1098 if c
and c
["compute_node"] == port
["compute_node"]
1103 if compute_node_mappings
:
1104 # process port_mapping pci of type 0000:af:1[01].[1357]
1108 for p
in compute_node_mappings
["ports"]
1109 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1115 if not db_vim
["config"].get("mapping_not_needed"):
1117 "Port mapping not found for compute_node={} pci={}".format(
1118 port
["compute_node"], port
["pci"]
1125 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1127 "service_endpoint_id": pmap
.get("service_endpoint_id")
1128 or service_endpoint_id
,
1129 "service_endpoint_encapsulation_type": "dot1q"
1130 if port
["type"] == "SR-IOV"
1132 "service_endpoint_encapsulation_info": {
1133 "vlan": port
.get("vlan"),
1134 "mac": port
.get("mac-address"),
1135 "device_id": pmap
.get("device_id") or port
["compute_node"],
1136 "device_interface_id": pmap
.get("device_interface_id")
1138 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1139 "switch_port": pmap
.get("switch_port"),
1140 "service_mapping_info": pmap
.get("service_mapping_info"),
1145 # if port["modified_at"] > last_update:
1146 # sdn_need_update = True
1147 new_connected_ports
.append(port
["id"]) # TODO
1148 sdn_ports
.append(new_port
)
1152 "{} interfaces have not been created as VDU is on ERROR status".format(
1157 # connect external ports
1158 for index
, additional_port
in enumerate(additional_ports
):
1159 additional_port_id
= additional_port
.get(
1160 "service_endpoint_id"
1161 ) or "external-{}".format(index
)
1164 "service_endpoint_id": additional_port_id
,
1165 "service_endpoint_encapsulation_type": additional_port
.get(
1166 "service_endpoint_encapsulation_type", "dot1q"
1168 "service_endpoint_encapsulation_info": {
1169 "vlan": additional_port
.get("vlan") or vlan_used
,
1170 "mac": additional_port
.get("mac_address"),
1171 "device_id": additional_port
.get("device_id"),
1172 "device_interface_id": additional_port
.get(
1173 "device_interface_id"
1175 "switch_dpid": additional_port
.get("switch_dpid")
1176 or additional_port
.get("switch_id"),
1177 "switch_port": additional_port
.get("switch_port"),
1178 "service_mapping_info": additional_port
.get(
1179 "service_mapping_info"
1184 new_connected_ports
.append(additional_port_id
)
1187 # if there are more ports to connect or they have been modified, call create/update
1189 sdn_status
= "ERROR"
1190 sdn_info
= "; ".join(error_list
)
1191 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1192 last_update
= time
.time()
1195 if len(sdn_ports
) < 2:
1196 sdn_status
= "ACTIVE"
1198 if not pending_ports
:
1200 "task={} {} new-sdn-net done, less than 2 ports".format(
1201 task_id
, ro_task
["target_id"]
1205 net_type
= params
.get("type") or "ELAN"
1209 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1212 "task={} {} new-sdn-net={} created={}".format(
1213 task_id
, ro_task
["target_id"], sdn_net_id
, created
1217 created_items
= target_vim
.edit_connectivity_service(
1218 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1222 "task={} {} update-sdn-net={} created={}".format(
1223 task_id
, ro_task
["target_id"], sdn_net_id
, created
1227 connected_ports
= new_connected_ports
1229 wim_status_dict
= target_vim
.get_connectivity_service_status(
1230 sdn_net_id
, conn_info
=created_items
1232 sdn_status
= wim_status_dict
["sdn_status"]
1234 if wim_status_dict
.get("sdn_info"):
1235 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1237 if wim_status_dict
.get("error_msg"):
1238 sdn_info
= wim_status_dict
.get("error_msg") or ""
1241 if sdn_status
!= "ERROR":
1242 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1243 len(ports
) - pending_ports
, len(ports
)
1246 if sdn_status
== "ACTIVE":
1247 sdn_status
= "BUILD"
1249 ro_vim_item_update
= {
1250 "vim_id": sdn_net_id
,
1251 "vim_status": sdn_status
,
1253 "created_items": created_items
,
1254 "connected_ports": connected_ports
,
1255 "vim_details": sdn_info
,
1256 "vim_message": None,
1257 "last_update": last_update
,
1260 return sdn_status
, ro_vim_item_update
1261 except Exception as e
:
1263 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1264 exc_info
=not isinstance(
1265 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1268 ro_vim_item_update
= {
1269 "vim_status": "VIM_ERROR",
1271 "vim_message": str(e
),
1274 return "FAILED", ro_vim_item_update
1276 def delete(self
, ro_task
, task_index
):
1277 task
= ro_task
["tasks"][task_index
]
1278 task_id
= task
["task_id"]
1279 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1280 ro_vim_item_update_ok
= {
1281 "vim_status": "DELETED",
1283 "vim_message": "DELETED",
1289 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1290 target_vim
.delete_connectivity_service(
1291 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1294 except Exception as e
:
1296 isinstance(e
, sdnconn
.SdnConnectorError
)
1297 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1299 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1302 "ro_task={} vim={} del-sdn-net={}: {}".format(
1303 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1305 exc_info
=not isinstance(
1306 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1309 ro_vim_item_update
= {
1310 "vim_status": "VIM_ERROR",
1311 "vim_message": "Error while deleting: {}".format(e
),
1314 return "FAILED", ro_vim_item_update
1317 "task={} {} del-sdn-net={} {}".format(
1319 ro_task
["target_id"],
1321 ro_vim_item_update_ok
.get("vim_message", ""),
1325 return "DONE", ro_vim_item_update_ok
1328 class VimInteractionMigration(VimInteractionBase
):
1329 def exec(self
, ro_task
, task_index
, task_depends
):
1330 task
= ro_task
["tasks"][task_index
]
1331 task_id
= task
["task_id"]
1332 db_task_update
= {"retries": 0}
1333 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1337 refreshed_vim_info
= {}
1340 if task
.get("params"):
1341 vim_vm_id
= task
["params"].get("vim_vm_id")
1342 migrate_host
= task
["params"].get("migrate_host")
1343 _
, migrated_compute_node
= target_vim
.migrate_instance(
1344 vim_vm_id
, migrate_host
1347 if migrated_compute_node
:
1348 # When VM is migrated, vdu["vim_info"] needs to be updated
1349 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1350 ro_task
["target_id"]
1353 # Refresh VM to get new vim_info
1354 vm_to_refresh_list
= [vim_vm_id
]
1355 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1356 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1358 if refreshed_vim_info
.get("interfaces"):
1359 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1363 for iface
in refreshed_vim_info
["interfaces"]
1364 if old_iface
["vim_interface_id"]
1365 == iface
["vim_interface_id"]
1369 vim_interfaces
.append(iface
)
1371 ro_vim_item_update
= {
1372 "vim_id": vim_vm_id
,
1373 "vim_status": "ACTIVE",
1375 "created_items": created_items
,
1376 "vim_details": None,
1377 "vim_message": None,
1380 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1384 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1387 ro_vim_item_update
["interfaces"] = vim_interfaces
1390 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1393 return "DONE", ro_vim_item_update
, db_task_update
1395 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1397 "task={} vim={} VM Migration:"
1398 " {}".format(task_id
, ro_task
["target_id"], e
)
1400 ro_vim_item_update
= {
1401 "vim_status": "VIM_ERROR",
1403 "vim_message": str(e
),
1406 return "FAILED", ro_vim_item_update
, db_task_update
1409 class VimInteractionResize(VimInteractionBase
):
1410 def exec(self
, ro_task
, task_index
, task_depends
):
1411 task
= ro_task
["tasks"][task_index
]
1412 task_id
= task
["task_id"]
1413 db_task_update
= {"retries": 0}
1415 target_flavor_uuid
= None
1417 refreshed_vim_info
= {}
1418 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1421 if task
.get("params"):
1422 vim_vm_id
= task
["params"].get("vim_vm_id")
1423 flavor_dict
= task
["params"].get("flavor_dict")
1424 self
.logger
.info("flavor_dict %s", flavor_dict
)
1427 target_flavor_uuid
= target_vim
.get_flavor_id_from_data(flavor_dict
)
1428 except Exception as e
:
1429 self
.logger
.info("Cannot find any flavor matching %s.", str(e
))
1431 target_flavor_uuid
= target_vim
.new_flavor(flavor_dict
)
1432 except Exception as e
:
1433 self
.logger
.error("Error creating flavor at VIM %s.", str(e
))
1435 if target_flavor_uuid
is not None:
1436 resized_status
= target_vim
.resize_instance(
1437 vim_vm_id
, target_flavor_uuid
1441 # Refresh VM to get new vim_info
1442 vm_to_refresh_list
= [vim_vm_id
]
1443 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1444 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1446 ro_vim_item_update
= {
1447 "vim_id": vim_vm_id
,
1448 "vim_status": "DONE",
1450 "created_items": created_items
,
1451 "vim_details": None,
1452 "vim_message": None,
1455 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1459 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1462 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1464 return "DONE", ro_vim_item_update
, db_task_update
1465 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1467 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1469 ro_vim_item_update
= {
1470 "vim_status": "VIM_ERROR",
1472 "vim_message": str(e
),
1475 return "FAILED", ro_vim_item_update
, db_task_update
1478 class ConfigValidate
:
1479 def __init__(self
, config
: Dict
):
1484 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1486 self
.conf
["period"]["refresh_active"] >= 60
1487 or self
.conf
["period"]["refresh_active"] == -1
1489 return self
.conf
["period"]["refresh_active"]
1495 return self
.conf
["period"]["refresh_build"]
1499 return self
.conf
["period"]["refresh_image"]
1503 return self
.conf
["period"]["refresh_error"]
1506 def queue_size(self
):
1507 return self
.conf
["period"]["queue_size"]
1510 class NsWorker(threading
.Thread
):
1511 def __init__(self
, worker_index
, config
, plugins
, db
):
1513 :param worker_index: thread index
1514 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1515 :param plugins: global shared dict with the loaded plugins
1516 :param db: database class instance to use
1518 threading
.Thread
.__init
__(self
)
1519 self
.config
= config
1520 self
.plugins
= plugins
1521 self
.plugin_name
= "unknown"
1522 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1523 self
.worker_index
= worker_index
1524 # refresh periods for created items
1525 self
.refresh_config
= ConfigValidate(config
)
1526 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1527 # targetvim: vimplugin class
1529 # targetvim: vim information from database
1532 self
.vim_targets
= []
1533 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1536 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1537 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1538 "image": VimInteractionImage(
1539 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1541 "flavor": VimInteractionFlavor(
1542 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1544 "sdn_net": VimInteractionSdnNet(
1545 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1547 "update": VimInteractionUpdateVdu(
1548 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1550 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1551 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1553 "migrate": VimInteractionMigration(
1554 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1556 "verticalscale": VimInteractionResize(
1557 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1560 self
.time_last_task_processed
= None
1561 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1562 self
.tasks_to_delete
= []
1563 # it is idle when there are not vim_targets associated
1565 self
.task_locked_time
= config
["global"]["task_locked_time"]
1567 def insert_task(self
, task
):
1569 self
.task_queue
.put(task
, False)
1572 raise NsWorkerException("timeout inserting a task")
1574 def terminate(self
):
1575 self
.insert_task("exit")
1577 def del_task(self
, task
):
1578 with self
.task_lock
:
1579 if task
["status"] == "SCHEDULED":
1580 task
["status"] = "SUPERSEDED"
1582 else: # task["status"] == "processing"
1583 self
.task_lock
.release()
1586 def _process_vim_config(self
, target_id
, db_vim
):
1588 Process vim config, creating vim configuration files as ca_cert
1589 :param target_id: vim/sdn/wim + id
1590 :param db_vim: Vim dictionary obtained from database
1591 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1593 if not db_vim
.get("config"):
1599 if db_vim
["config"].get("ca_cert_content"):
1600 file_name
= "{}:{}".format(target_id
, self
.worker_index
)
1604 except FileExistsError
:
1605 self
.logger
.exception(
1606 "FileExistsError occured while processing vim_config."
1609 file_name
= file_name
+ "/ca_cert"
1611 with
open(file_name
, "w") as f
:
1612 f
.write(db_vim
["config"]["ca_cert_content"])
1613 del db_vim
["config"]["ca_cert_content"]
1614 db_vim
["config"]["ca_cert"] = file_name
1615 except Exception as e
:
1616 raise NsWorkerException(
1617 "Error writing to file '{}': {}".format(file_name
, e
)
1620 def _load_plugin(self
, name
, type="vim"):
1621 # type can be vim or sdn
1622 if "rovim_dummy" not in self
.plugins
:
1623 self
.plugins
["rovim_dummy"] = VimDummyConnector
1625 if "rosdn_dummy" not in self
.plugins
:
1626 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1628 if name
in self
.plugins
:
1629 return self
.plugins
[name
]
1632 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1633 self
.plugins
[name
] = ep
.load()
1634 except Exception as e
:
1635 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1637 if name
and name
not in self
.plugins
:
1638 raise NsWorkerException(
1639 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1642 return self
.plugins
[name
]
1644 def _unload_vim(self
, target_id
):
1646 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1647 :param target_id: Contains type:_id; where type can be 'vim', ...
1651 self
.db_vims
.pop(target_id
, None)
1652 self
.my_vims
.pop(target_id
, None)
1654 if target_id
in self
.vim_targets
:
1655 self
.vim_targets
.remove(target_id
)
1657 self
.logger
.info("Unloaded {}".format(target_id
))
1658 rmtree("{}:{}".format(target_id
, self
.worker_index
))
1659 except FileNotFoundError
:
1660 # This is raised by rmtree if folder does not exist.
1661 self
.logger
.exception("FileNotFoundError occured while unloading VIM.")
1662 except Exception as e
:
1663 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1665 def _check_vim(self
, target_id
):
1667 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1668 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1671 target
, _
, _id
= target_id
.partition(":")
1677 loaded
= target_id
in self
.vim_targets
1687 step
= "Getting {} from db".format(target_id
)
1688 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1690 for op_index
, operation
in enumerate(
1691 db_vim
["_admin"].get("operations", ())
1693 if operation
["operationState"] != "PROCESSING":
1696 locked_at
= operation
.get("locked_at")
1698 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1699 # some other thread is doing this operation
1703 op_text
= "_admin.operations.{}.".format(op_index
)
1705 if not self
.db
.set_one(
1709 op_text
+ "operationState": "PROCESSING",
1710 op_text
+ "locked_at": locked_at
,
1713 op_text
+ "locked_at": now
,
1714 "admin.current_operation": op_index
,
1716 fail_on_empty
=False,
1720 unset_dict
[op_text
+ "locked_at"] = None
1721 unset_dict
["current_operation"] = None
1722 step
= "Loading " + target_id
1723 error_text
= self
._load
_vim
(target_id
)
1726 step
= "Checking connectivity"
1729 self
.my_vims
[target_id
].check_vim_connectivity()
1731 self
.my_vims
[target_id
].check_credentials()
1733 update_dict
["_admin.operationalState"] = "ENABLED"
1734 update_dict
["_admin.detailed-status"] = ""
1735 unset_dict
[op_text
+ "detailed-status"] = None
1736 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1740 except Exception as e
:
1741 error_text
= "{}: {}".format(step
, e
)
1742 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1745 if update_dict
or unset_dict
:
1747 update_dict
[op_text
+ "operationState"] = "FAILED"
1748 update_dict
[op_text
+ "detailed-status"] = error_text
1749 unset_dict
.pop(op_text
+ "detailed-status", None)
1750 update_dict
["_admin.operationalState"] = "ERROR"
1751 update_dict
["_admin.detailed-status"] = error_text
1754 update_dict
[op_text
+ "statusEnteredTime"] = now
1758 q_filter
={"_id": _id
},
1759 update_dict
=update_dict
,
1761 fail_on_empty
=False,
1765 self
._unload
_vim
(target_id
)
1767 def _reload_vim(self
, target_id
):
1768 if target_id
in self
.vim_targets
:
1769 self
._load
_vim
(target_id
)
1771 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1772 # just remove it to force load again next time it is needed
1773 self
.db_vims
.pop(target_id
, None)
1775 def _load_vim(self
, target_id
):
1777 Load or reload a vim_account, sdn_controller or wim_account.
1778 Read content from database, load the plugin if not loaded.
1779 In case of error loading the plugin, it load a failing VIM_connector
1780 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1781 :param target_id: Contains type:_id; where type can be 'vim', ...
1782 :return: None if ok, descriptive text if error
1784 target
, _
, _id
= target_id
.partition(":")
1796 step
= "Getting {}={} from db".format(target
, _id
)
1797 # TODO process for wim, sdnc, ...
1798 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1800 # if deep_get(vim, "config", "sdn-controller"):
1801 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1802 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1804 step
= "Decrypting password"
1805 schema_version
= vim
.get("schema_version")
1806 self
.db
.encrypt_decrypt_fields(
1809 fields
=("password", "secret"),
1810 schema_version
=schema_version
,
1813 self
._process
_vim
_config
(target_id
, vim
)
1816 plugin_name
= "rovim_" + vim
["vim_type"]
1817 step
= "Loading plugin '{}'".format(plugin_name
)
1818 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1819 step
= "Loading {}'".format(target_id
)
1820 self
.my_vims
[target_id
] = vim_module_conn(
1823 tenant_id
=vim
.get("vim_tenant_id"),
1824 tenant_name
=vim
.get("vim_tenant_name"),
1827 user
=vim
["vim_user"],
1828 passwd
=vim
["vim_password"],
1829 config
=vim
.get("config") or {},
1833 plugin_name
= "rosdn_" + vim
["type"]
1834 step
= "Loading plugin '{}'".format(plugin_name
)
1835 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1836 step
= "Loading {}'".format(target_id
)
1838 wim_config
= wim
.pop("config", {}) or {}
1839 wim
["uuid"] = wim
["_id"]
1840 if "url" in wim
and "wim_url" not in wim
:
1841 wim
["wim_url"] = wim
["url"]
1842 elif "url" not in wim
and "wim_url" in wim
:
1843 wim
["url"] = wim
["wim_url"]
1846 wim_config
["dpid"] = wim
.pop("dpid")
1848 if wim
.get("switch_id"):
1849 wim_config
["switch_id"] = wim
.pop("switch_id")
1851 # wim, wim_account, config
1852 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1853 self
.db_vims
[target_id
] = vim
1854 self
.error_status
= None
1857 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1859 except Exception as e
:
1861 "Cannot load {} plugin={}: {} {}".format(
1862 target_id
, plugin_name
, step
, e
1866 self
.db_vims
[target_id
] = vim
or {}
1867 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1868 error_status
= "{} Error: {}".format(step
, e
)
1872 if target_id
not in self
.vim_targets
:
1873 self
.vim_targets
.append(target_id
)
1875 def _get_db_task(self
):
1877 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1882 if not self
.time_last_task_processed
:
1883 self
.time_last_task_processed
= now
1888 # Log RO tasks only when loglevel is DEBUG
1889 if self.logger.getEffectiveLevel() == logging.DEBUG:
1896 + str(self.task_locked_time)
1898 + "time_last_task_processed="
1899 + str(self.time_last_task_processed)
1905 locked
= self
.db
.set_one(
1908 "target_id": self
.vim_targets
,
1909 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1910 "locked_at.lt": now
- self
.task_locked_time
,
1911 "to_check_at.lt": self
.time_last_task_processed
,
1912 "to_check_at.gt": -1,
1914 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
1915 fail_on_empty
=False,
1920 ro_task
= self
.db
.get_one(
1923 "target_id": self
.vim_targets
,
1924 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1930 if self
.time_last_task_processed
== now
:
1931 self
.time_last_task_processed
= None
1934 self
.time_last_task_processed
= now
1935 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1937 except DbException
as e
:
1938 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
1939 except Exception as e
:
1940 self
.logger
.critical(
1941 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
1946 def _get_db_all_tasks(self
):
1948 Read all content of table ro_tasks to log it
1952 # Checking the content of the BD:
1955 ro_task
= self
.db
.get_list("ro_tasks")
1957 self
._log
_ro
_task
(rt
, None, None, "TASK_WF", "GET_ALL_TASKS")
1960 except DbException
as e
:
1961 self
.logger
.error("Database exception at _get_db_all_tasks: {}".format(e
))
1962 except Exception as e
:
1963 self
.logger
.critical(
1964 "Unexpected exception at _get_db_all_tasks: {}".format(e
), exc_info
=True
1969 def _log_ro_task(self
, ro_task
, db_ro_task_update
, db_ro_task_delete
, mark
, event
):
1971 Generate a log with the following format:
1973 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1974 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1975 task_array_index;task_id;task_action;task_item;task_args
1979 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1980 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1981 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1982 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1983 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1984 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1985 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1990 if ro_task
is not None and isinstance(ro_task
, dict):
1991 for t
in ro_task
["tasks"]:
1995 line
.append(ro_task
.get("_id", ""))
1996 line
.append(str(ro_task
.get("locked_at", "")))
1997 line
.append(str(ro_task
.get("modified_at", "")))
1998 line
.append(str(ro_task
.get("created_at", "")))
1999 line
.append(str(ro_task
.get("to_check_at", "")))
2000 line
.append(str(ro_task
.get("locked_by", "")))
2001 line
.append(str(ro_task
.get("target_id", "")))
2002 line
.append(str(ro_task
.get("vim_info", {}).get("refresh_at", "")))
2003 line
.append(str(ro_task
.get("vim_info", "")))
2004 line
.append(str(ro_task
.get("tasks", "")))
2005 if isinstance(t
, dict):
2006 line
.append(str(t
.get("status", "")))
2007 line
.append(str(t
.get("action_id", "")))
2009 line
.append(str(t
.get("task_id", "")))
2010 line
.append(str(t
.get("action", "")))
2011 line
.append(str(t
.get("item", "")))
2012 line
.append(str(t
.get("find_params", "")))
2013 line
.append(str(t
.get("params", "")))
2015 line
.extend([""] * 2)
2017 line
.extend([""] * 5)
2020 self
.logger
.debug(";".join(line
))
2021 elif db_ro_task_update
is not None and isinstance(db_ro_task_update
, dict):
2024 st
= "tasks.{}.status".format(i
)
2025 if st
not in db_ro_task_update
:
2030 line
.append(db_ro_task_update
.get("_id", ""))
2031 line
.append(str(db_ro_task_update
.get("locked_at", "")))
2032 line
.append(str(db_ro_task_update
.get("modified_at", "")))
2034 line
.append(str(db_ro_task_update
.get("to_check_at", "")))
2035 line
.append(str(db_ro_task_update
.get("locked_by", "")))
2037 line
.append(str(db_ro_task_update
.get("vim_info.refresh_at", "")))
2039 line
.append(str(db_ro_task_update
.get("vim_info", "")))
2040 line
.append(str(str(db_ro_task_update
).count(".status")))
2041 line
.append(db_ro_task_update
.get(st
, ""))
2044 line
.extend([""] * 3)
2046 self
.logger
.debug(";".join(line
))
2048 elif db_ro_task_delete
is not None and isinstance(db_ro_task_delete
, dict):
2052 line
.append(db_ro_task_delete
.get("_id", ""))
2054 line
.append(db_ro_task_delete
.get("modified_at", ""))
2055 line
.extend([""] * 13)
2056 self
.logger
.debug(";".join(line
))
2062 line
.extend([""] * 16)
2063 self
.logger
.debug(";".join(line
))
2065 except Exception as e
:
2066 self
.logger
.error("Error logging ro_task: {}".format(e
))
2068 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2070 Determine if this task need to be done or superseded
2073 my_task
= ro_task
["tasks"][task_index
]
2074 task_id
= my_task
["task_id"]
2075 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
2076 "created_items", False
2079 self
.logger
.warning("Needed delete: {}".format(needed_delete
))
2080 if my_task
["status"] == "FAILED":
2081 return None, None # TODO need to be retry??
2084 for index
, task
in enumerate(ro_task
["tasks"]):
2085 if index
== task_index
or not task
:
2089 my_task
["target_record"] == task
["target_record"]
2090 and task
["action"] == "CREATE"
2093 db_update
["tasks.{}.status".format(index
)] = task
[
2096 elif task
["action"] == "CREATE" and task
["status"] not in (
2100 needed_delete
= False
2103 self
.logger
.warning(
2104 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
2106 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
2108 return "SUPERSEDED", None
2109 except Exception as e
:
2110 if not isinstance(e
, NsWorkerException
):
2111 self
.logger
.critical(
2112 "Unexpected exception at _delete_task task={}: {}".format(
2118 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2120 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2122 Determine if this task need to create something at VIM
2125 my_task
= ro_task
["tasks"][task_index
]
2126 task_id
= my_task
["task_id"]
2129 if my_task
["status"] == "FAILED":
2130 return None, None # TODO need to be retry??
2131 elif my_task
["status"] == "SCHEDULED":
2132 # check if already created by another task
2133 for index
, task
in enumerate(ro_task
["tasks"]):
2134 if index
== task_index
or not task
:
2137 if task
["action"] == "CREATE" and task
["status"] not in (
2142 return task
["status"], "COPY_VIM_INFO"
2145 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2146 ro_task
, task_index
, task_depends
2148 # TODO update other CREATE tasks
2149 except Exception as e
:
2150 if not isinstance(e
, NsWorkerException
):
2152 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2155 task_status
= "FAILED"
2156 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2157 # TODO update ro_vim_item_update
2159 return task_status
, ro_vim_item_update
2163 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2165 Look for dependency task
2166 :param task_id: Can be one of
2167 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2168 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2169 3. task.task_id: "<action_id>:number"
2172 :return: database ro_task plus index of task
2175 task_id
.startswith("vim:")
2176 or task_id
.startswith("sdn:")
2177 or task_id
.startswith("wim:")
2179 target_id
, _
, task_id
= task_id
.partition(" ")
2181 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2182 ro_task_dependency
= self
.db
.get_one(
2184 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2185 fail_on_empty
=False,
2188 if ro_task_dependency
:
2189 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2190 if task
["target_record_id"] == task_id
:
2191 return ro_task_dependency
, task_index
2195 for task_index
, task
in enumerate(ro_task
["tasks"]):
2196 if task
and task
["task_id"] == task_id
:
2197 return ro_task
, task_index
2199 ro_task_dependency
= self
.db
.get_one(
2202 "tasks.ANYINDEX.task_id": task_id
,
2203 "tasks.ANYINDEX.target_record.ne": None,
2205 fail_on_empty
=False,
2208 self
.logger
.warning("ro_task_dependency={}".format(ro_task_dependency
))
2209 if ro_task_dependency
:
2210 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2211 if task
["task_id"] == task_id
:
2212 return ro_task_dependency
, task_index
2213 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2215 def update_vm_refresh(self
):
2216 """Enables the VM status updates if self.refresh_config.active parameter
2217 is not -1 and than updates the DB accordingly
2221 self
.logger
.debug("Checking if VM status update config")
2222 next_refresh
= time
.time()
2223 if self
.refresh_config
.active
== -1:
2226 next_refresh
+= self
.refresh_config
.active
2228 if next_refresh
!= -1:
2229 db_ro_task_update
= {}
2231 next_check_at
= now
+ (24 * 60 * 60)
2232 next_check_at
= min(next_check_at
, next_refresh
)
2233 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2234 db_ro_task_update
["to_check_at"] = next_check_at
2237 "Finding tasks which to be updated to enable VM status updates"
2239 refresh_tasks
= self
.db
.get_list(
2242 "tasks.status": "DONE",
2243 "to_check_at.lt": 0,
2246 self
.logger
.debug("Updating tasks to change the to_check_at status")
2247 for task
in refresh_tasks
:
2254 update_dict
=db_ro_task_update
,
2258 except Exception as e
:
2259 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2261 def _process_pending_tasks(self
, ro_task
):
2262 ro_task_id
= ro_task
["_id"]
2265 next_check_at
= now
+ (24 * 60 * 60)
2266 db_ro_task_update
= {}
2268 def _update_refresh(new_status
):
2269 # compute next_refresh
2271 nonlocal next_check_at
2272 nonlocal db_ro_task_update
2275 next_refresh
= time
.time()
2277 if task
["item"] in ("image", "flavor"):
2278 next_refresh
+= self
.refresh_config
.image
2279 elif new_status
== "BUILD":
2280 next_refresh
+= self
.refresh_config
.build
2281 elif new_status
== "DONE":
2282 if self
.refresh_config
.active
== -1:
2285 next_refresh
+= self
.refresh_config
.active
2287 next_refresh
+= self
.refresh_config
.error
2289 next_check_at
= min(next_check_at
, next_refresh
)
2290 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2291 ro_task
["vim_info"]["refresh_at"] = next_refresh
2295 # Log RO tasks only when loglevel is DEBUG
2296 if self.logger.getEffectiveLevel() == logging.DEBUG:
2297 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2299 # Check if vim status refresh is enabled again
2300 self
.update_vm_refresh()
2301 # 0: get task_status_create
2303 task_status_create
= None
2307 for t
in ro_task
["tasks"]
2309 and t
["action"] == "CREATE"
2310 and t
["status"] in ("BUILD", "DONE")
2316 task_status_create
= task_create
["status"]
2318 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2319 for task_action
in ("DELETE", "CREATE", "EXEC"):
2320 db_vim_update
= None
2323 for task_index
, task
in enumerate(ro_task
["tasks"]):
2325 continue # task deleted
2328 target_update
= None
2332 task_action
in ("DELETE", "EXEC")
2333 and task
["status"] not in ("SCHEDULED", "BUILD")
2335 or task
["action"] != task_action
2337 task_action
== "CREATE"
2338 and task
["status"] in ("FINISHED", "SUPERSEDED")
2343 task_path
= "tasks.{}.status".format(task_index
)
2345 db_vim_info_update
= None
2347 if task
["status"] == "SCHEDULED":
2348 # check if tasks that this depends on have been completed
2349 dependency_not_completed
= False
2351 for dependency_task_id
in task
.get("depends_on") or ():
2354 dependency_task_index
,
2355 ) = self
._get
_dependency
(
2356 dependency_task_id
, target_id
=ro_task
["target_id"]
2358 dependency_task
= dependency_ro_task
["tasks"][
2359 dependency_task_index
2361 self
.logger
.warning(
2362 "dependency_ro_task={} dependency_task_index={}".format(
2363 dependency_ro_task
, dependency_task_index
2367 if dependency_task
["status"] == "SCHEDULED":
2368 dependency_not_completed
= True
2369 next_check_at
= min(
2370 next_check_at
, dependency_ro_task
["to_check_at"]
2372 # must allow dependent task to be processed first
2373 # to do this set time after last_task_processed
2374 next_check_at
= max(
2375 self
.time_last_task_processed
, next_check_at
2378 elif dependency_task
["status"] == "FAILED":
2379 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2382 dependency_task
["action"],
2383 dependency_task
["item"],
2385 dependency_ro_task
["vim_info"].get(
2390 "task={} {}".format(task
["task_id"], error_text
)
2392 raise NsWorkerException(error_text
)
2394 task_depends
[dependency_task_id
] = dependency_ro_task
[
2398 "TASK-{}".format(dependency_task_id
)
2399 ] = dependency_ro_task
["vim_info"]["vim_id"]
2401 if dependency_not_completed
:
2402 self
.logger
.warning(
2403 "DEPENDENCY NOT COMPLETED {}".format(
2404 dependency_ro_task
["vim_info"]["vim_id"]
2407 # TODO set at vim_info.vim_details that it is waiting
2410 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2411 # the task of renew this locking. It will update database locket_at periodically
2413 lock_object
= LockRenew
.add_lock_object(
2414 "ro_tasks", ro_task
, self
2417 if task
["action"] == "DELETE":
2418 (new_status
, db_vim_info_update
,) = self
._delete
_task
(
2419 ro_task
, task_index
, task_depends
, db_ro_task_update
2422 "FINISHED" if new_status
== "DONE" else new_status
2424 # ^with FINISHED instead of DONE it will not be refreshing
2426 if new_status
in ("FINISHED", "SUPERSEDED"):
2427 target_update
= "DELETE"
2428 elif task
["action"] == "EXEC":
2433 ) = self
.item2class
[task
["item"]].exec(
2434 ro_task
, task_index
, task_depends
2437 "FINISHED" if new_status
== "DONE" else new_status
2439 # ^with FINISHED instead of DONE it will not be refreshing
2442 # load into database the modified db_task_update "retries" and "next_retry"
2443 if db_task_update
.get("retries"):
2445 "tasks.{}.retries".format(task_index
)
2446 ] = db_task_update
["retries"]
2448 next_check_at
= time
.time() + db_task_update
.get(
2451 target_update
= None
2452 elif task
["action"] == "CREATE":
2453 if task
["status"] == "SCHEDULED":
2454 if task_status_create
:
2455 new_status
= task_status_create
2456 target_update
= "COPY_VIM_INFO"
2458 new_status
, db_vim_info_update
= self
.item2class
[
2460 ].new(ro_task
, task_index
, task_depends
)
2461 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2462 _update_refresh(new_status
)
2464 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2465 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2466 (new_status
, db_vim_info_update
,) = self
.item2class
[
2469 _update_refresh(new_status
)
2471 # The refresh is updated to avoid set the value of "refresh_at" to
2472 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2473 # because it can happen that in this case the task is never processed
2474 _update_refresh(task
["status"])
2476 except Exception as e
:
2477 new_status
= "FAILED"
2478 db_vim_info_update
= {
2479 "vim_status": "VIM_ERROR",
2480 "vim_message": str(e
),
2484 e
, (NsWorkerException
, vimconn
.VimConnException
)
2487 "Unexpected exception at _delete_task task={}: {}".format(
2494 if db_vim_info_update
:
2495 db_vim_update
= db_vim_info_update
.copy()
2496 db_ro_task_update
.update(
2499 for k
, v
in db_vim_info_update
.items()
2502 ro_task
["vim_info"].update(db_vim_info_update
)
2505 if task_action
== "CREATE":
2506 task_status_create
= new_status
2507 db_ro_task_update
[task_path
] = new_status
2509 if target_update
or db_vim_update
:
2510 if target_update
== "DELETE":
2511 self
._update
_target
(task
, None)
2512 elif target_update
== "COPY_VIM_INFO":
2513 self
._update
_target
(task
, ro_task
["vim_info"])
2515 self
._update
_target
(task
, db_vim_update
)
2517 except Exception as e
:
2519 isinstance(e
, DbException
)
2520 and e
.http_code
== HTTPStatus
.NOT_FOUND
2522 # if the vnfrs or nsrs has been removed from database, this task must be removed
2524 "marking to delete task={}".format(task
["task_id"])
2526 self
.tasks_to_delete
.append(task
)
2529 "Unexpected exception at _update_target task={}: {}".format(
2535 locked_at
= ro_task
["locked_at"]
2539 lock_object
["locked_at"],
2540 lock_object
["locked_at"] + self
.task_locked_time
,
2542 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2543 # contain exactly locked_at + self.task_locked_time
2544 LockRenew
.remove_lock_object(lock_object
)
2547 "_id": ro_task
["_id"],
2548 "to_check_at": ro_task
["to_check_at"],
2549 "locked_at": locked_at
,
2551 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2552 # outside this task (by ro_nbi) do not update it
2553 db_ro_task_update
["locked_by"] = None
2554 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2555 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2556 db_ro_task_update
["modified_at"] = now
2557 db_ro_task_update
["to_check_at"] = next_check_at
2560 # Log RO tasks only when loglevel is DEBUG
2561 if self.logger.getEffectiveLevel() == logging.DEBUG:
2562 db_ro_task_update_log = db_ro_task_update.copy()
2563 db_ro_task_update_log["_id"] = q_filter["_id"]
2564 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2567 if not self
.db
.set_one(
2569 update_dict
=db_ro_task_update
,
2571 fail_on_empty
=False,
2573 del db_ro_task_update
["to_check_at"]
2574 del q_filter
["to_check_at"]
2576 # Log RO tasks only when loglevel is DEBUG
2577 if self.logger.getEffectiveLevel() == logging.DEBUG:
2580 db_ro_task_update_log,
2583 "SET_TASK " + str(q_filter),
2589 update_dict
=db_ro_task_update
,
2592 except DbException
as e
:
2594 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2596 except Exception as e
:
2598 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2601 def _update_target(self
, task
, ro_vim_item_update
):
2602 table
, _
, temp
= task
["target_record"].partition(":")
2603 _id
, _
, path_vim_status
= temp
.partition(":")
2604 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2605 path_item
= path_item
[: path_item
.rfind(".")]
2606 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2607 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2609 if ro_vim_item_update
:
2611 path_vim_status
+ "." + k
: v
2612 for k
, v
in ro_vim_item_update
.items()
2621 "interfaces_backup",
2625 if path_vim_status
.startswith("vdur."):
2626 # for backward compatibility, add vdur.name apart from vdur.vim_name
2627 if ro_vim_item_update
.get("vim_name"):
2628 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2630 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2631 if ro_vim_item_update
.get("vim_id"):
2632 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2634 # update general status
2635 if ro_vim_item_update
.get("vim_status"):
2636 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2640 if ro_vim_item_update
.get("interfaces"):
2641 path_interfaces
= path_item
+ ".interfaces"
2643 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2647 path_interfaces
+ ".{}.".format(i
) + k
: v
2648 for k
, v
in iface
.items()
2649 if k
in ("vlan", "compute_node", "pci")
2653 # put ip_address and mac_address with ip-address and mac-address
2654 if iface
.get("ip_address"):
2656 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2657 ] = iface
["ip_address"]
2659 if iface
.get("mac_address"):
2661 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2662 ] = iface
["mac_address"]
2664 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2665 update_dict
["ip-address"] = iface
.get("ip_address").split(
2669 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2670 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2674 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2676 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2677 if ro_vim_item_update
.get("interfaces"):
2678 search_key
= path_vim_status
+ ".interfaces"
2679 if update_dict
.get(search_key
):
2680 interfaces_backup_update
= {
2681 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
2686 q_filter
={"_id": _id
},
2687 update_dict
=interfaces_backup_update
,
2691 update_dict
= {path_item
+ ".status": "DELETED"}
2694 q_filter
={"_id": _id
},
2695 update_dict
=update_dict
,
2696 unset
={path_vim_status
: None},
2699 def _process_delete_db_tasks(self
):
2701 Delete task from database because vnfrs or nsrs or both have been deleted
2702 :return: None. Uses and modify self.tasks_to_delete
2704 while self
.tasks_to_delete
:
2705 task
= self
.tasks_to_delete
[0]
2706 vnfrs_deleted
= None
2707 nsr_id
= task
["nsr_id"]
2709 if task
["target_record"].startswith("vnfrs:"):
2710 # check if nsrs is present
2711 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2712 vnfrs_deleted
= task
["target_record"].split(":")[1]
2715 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2716 except Exception as e
:
2718 "Error deleting task={}: {}".format(task
["task_id"], e
)
2720 self
.tasks_to_delete
.pop(0)
2723 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2725 Static method because it is called from osm_ng_ro.ns
2726 :param db: instance of database to use
2727 :param nsr_id: affected nsrs id
2728 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2729 :return: None, exception is fails
2732 for retry
in range(retries
):
2733 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2737 for ro_task
in ro_tasks
:
2739 to_delete_ro_task
= True
2741 for index
, task
in enumerate(ro_task
["tasks"]):
2744 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2746 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2748 db_update
["tasks.{}".format(index
)] = None
2750 # used by other nsr, ro_task cannot be deleted
2751 to_delete_ro_task
= False
2753 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2754 if to_delete_ro_task
:
2758 "_id": ro_task
["_id"],
2759 "modified_at": ro_task
["modified_at"],
2761 fail_on_empty
=False,
2765 db_update
["modified_at"] = now
2769 "_id": ro_task
["_id"],
2770 "modified_at": ro_task
["modified_at"],
2772 update_dict
=db_update
,
2773 fail_on_empty
=False,
2779 raise NsWorkerException("Exceeded {} retries".format(retries
))
2783 self
.logger
.info("Starting")
2785 # step 1: get commands from queue
2787 if self
.vim_targets
:
2788 task
= self
.task_queue
.get(block
=False)
2791 self
.logger
.debug("enters in idle state")
2793 task
= self
.task_queue
.get(block
=True)
2796 if task
[0] == "terminate":
2798 elif task
[0] == "load_vim":
2799 self
.logger
.info("order to load vim {}".format(task
[1]))
2800 self
._load
_vim
(task
[1])
2801 elif task
[0] == "unload_vim":
2802 self
.logger
.info("order to unload vim {}".format(task
[1]))
2803 self
._unload
_vim
(task
[1])
2804 elif task
[0] == "reload_vim":
2805 self
._reload
_vim
(task
[1])
2806 elif task
[0] == "check_vim":
2807 self
.logger
.info("order to check vim {}".format(task
[1]))
2808 self
._check
_vim
(task
[1])
2810 except Exception as e
:
2811 if isinstance(e
, queue
.Empty
):
2814 self
.logger
.critical(
2815 "Error processing task: {}".format(e
), exc_info
=True
2818 # step 2: process pending_tasks, delete not needed tasks
2820 if self
.tasks_to_delete
:
2821 self
._process
_delete
_db
_tasks
()
2824 # Log RO tasks only when loglevel is DEBUG
2825 if self.logger.getEffectiveLevel() == logging.DEBUG:
2826 _ = self._get_db_all_tasks()
2828 ro_task
= self
._get
_db
_task
()
2830 self
.logger
.warning("Task to process: {}".format(ro_task
))
2832 self
._process
_pending
_tasks
(ro_task
)
2836 except Exception as e
:
2837 self
.logger
.critical(
2838 "Unexpected exception at run: " + str(e
), exc_info
=True
2841 self
.logger
.info("Finishing")