1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
30 from os
import makedirs
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
43 from osm_ro_plugin
import vimconn
44 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
45 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
219 "task={} {} new-net={} created={}".format(
220 task_id
, ro_task
["target_id"], vim_net_id
, created
224 return "BUILD", ro_vim_item_update
225 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
227 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
229 ro_vim_item_update
= {
230 "vim_status": "VIM_ERROR",
232 "vim_message": str(e
),
235 return "FAILED", ro_vim_item_update
237 def refresh(self
, ro_task
):
238 """Call VIM to get network status"""
239 ro_task_id
= ro_task
["_id"]
240 target_vim
= self
.my_vims
[ro_task
["target_id"]]
241 vim_id
= ro_task
["vim_info"]["vim_id"]
242 net_to_refresh_list
= [vim_id
]
245 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
246 vim_info
= vim_dict
[vim_id
]
248 if vim_info
["status"] == "ACTIVE":
250 elif vim_info
["status"] == "BUILD":
251 task_status
= "BUILD"
253 task_status
= "FAILED"
254 except vimconn
.VimConnException
as e
:
255 # Mark all tasks at VIM_ERROR status
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id
, ro_task
["target_id"], vim_id
, e
261 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
262 task_status
= "FAILED"
264 ro_vim_item_update
= {}
265 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
266 ro_vim_item_update
["vim_status"] = vim_info
["status"]
268 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
269 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
271 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
273 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
274 elif vim_info
["status"] == "DELETED":
275 ro_vim_item_update
["vim_id"] = None
276 ro_vim_item_update
["vim_message"] = "Deleted externally"
278 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
279 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
281 if ro_vim_item_update
:
283 "ro_task={} {} get-net={}: status={} {}".format(
285 ro_task
["target_id"],
287 ro_vim_item_update
.get("vim_status"),
289 ro_vim_item_update
.get("vim_message")
290 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
296 return task_status
, ro_vim_item_update
298 def delete(self
, ro_task
, task_index
):
299 task
= ro_task
["tasks"][task_index
]
300 task_id
= task
["task_id"]
301 net_vim_id
= ro_task
["vim_info"]["vim_id"]
302 ro_vim_item_update_ok
= {
303 "vim_status": "DELETED",
305 "vim_message": "DELETED",
310 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
311 target_vim
= self
.my_vims
[ro_task
["target_id"]]
312 target_vim
.delete_network(
313 net_vim_id
, ro_task
["vim_info"]["created_items"]
315 except vimconn
.VimConnNotFoundException
:
316 ro_vim_item_update_ok
["vim_message"] = "already deleted"
317 except vimconn
.VimConnException
as e
:
319 "ro_task={} vim={} del-net={}: {}".format(
320 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
323 ro_vim_item_update
= {
324 "vim_status": "VIM_ERROR",
325 "vim_message": "Error while deleting: {}".format(e
),
328 return "FAILED", ro_vim_item_update
331 "task={} {} del-net={} {}".format(
333 ro_task
["target_id"],
335 ro_vim_item_update_ok
.get("vim_message", ""),
339 return "DONE", ro_vim_item_update_ok
342 class VimInteractionClassification(VimInteractionBase
):
343 def new(self
, ro_task
, task_index
, task_depends
):
344 task
= ro_task
["tasks"][task_index
]
345 task_id
= task
["task_id"]
347 target_vim
= self
.my_vims
[ro_task
["target_id"]]
351 params
= task
["params"]
352 params_copy
= deepcopy(params
)
354 name
= params_copy
.pop("name")
355 logical_source_port_index
= int(
356 params_copy
.pop("logical_source_port_index")
358 logical_source_port
= params_copy
["logical_source_port"]
360 if logical_source_port
.startswith("TASK-"):
361 vm_id
= task_depends
[logical_source_port
]
362 params_copy
["logical_source_port"] = target_vim
.refresh_vms_status(
364 )[vm_id
]["interfaces"][logical_source_port_index
]["vim_interface_id"]
366 vim_classification_id
= target_vim
.new_classification(
367 name
, "legacy_flow_classifier", params_copy
370 ro_vim_item_update
= {
371 "vim_id": vim_classification_id
,
372 "vim_status": "DONE",
378 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
381 return "DONE", ro_vim_item_update
382 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
383 self
.logger
.debug(traceback
.format_exc())
385 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
387 ro_vim_item_update
= {
388 "vim_status": "VIM_ERROR",
390 "vim_message": str(e
),
393 return "FAILED", ro_vim_item_update
395 def delete(self
, ro_task
, task_index
):
396 task
= ro_task
["tasks"][task_index
]
397 task_id
= task
["task_id"]
398 classification_vim_id
= ro_task
["vim_info"]["vim_id"]
399 ro_vim_item_update_ok
= {
400 "vim_status": "DELETED",
402 "vim_message": "DELETED",
407 if classification_vim_id
:
408 target_vim
= self
.my_vims
[ro_task
["target_id"]]
409 target_vim
.delete_classification(classification_vim_id
)
410 except vimconn
.VimConnNotFoundException
:
411 ro_vim_item_update_ok
["vim_message"] = "already deleted"
412 except vimconn
.VimConnException
as e
:
414 "ro_task={} vim={} del-classification={}: {}".format(
415 ro_task
["_id"], ro_task
["target_id"], classification_vim_id
, e
418 ro_vim_item_update
= {
419 "vim_status": "VIM_ERROR",
420 "vim_message": "Error while deleting: {}".format(e
),
423 return "FAILED", ro_vim_item_update
426 "task={} {} del-classification={} {}".format(
428 ro_task
["target_id"],
429 classification_vim_id
,
430 ro_vim_item_update_ok
.get("vim_message", ""),
434 return "DONE", ro_vim_item_update_ok
437 class VimInteractionSfi(VimInteractionBase
):
438 def new(self
, ro_task
, task_index
, task_depends
):
439 task
= ro_task
["tasks"][task_index
]
440 task_id
= task
["task_id"]
442 target_vim
= self
.my_vims
[ro_task
["target_id"]]
446 params
= task
["params"]
447 params_copy
= deepcopy(params
)
448 name
= params_copy
["name"]
449 ingress_port
= params_copy
["ingress_port"]
450 egress_port
= params_copy
["egress_port"]
451 ingress_port_index
= params_copy
["ingress_port_index"]
452 egress_port_index
= params_copy
["egress_port_index"]
454 ingress_port_id
= ingress_port
455 egress_port_id
= egress_port
457 vm_id
= task_depends
[ingress_port
]
459 if ingress_port
.startswith("TASK-"):
460 ingress_port_id
= target_vim
.refresh_vms_status([vm_id
])[vm_id
][
462 ][ingress_port_index
]["vim_interface_id"]
464 if ingress_port
== egress_port
:
465 egress_port_id
= ingress_port_id
467 if egress_port
.startswith("TASK-"):
468 egress_port_id
= target_vim
.refresh_vms_status([vm_id
])[vm_id
][
470 ][egress_port_index
]["vim_interface_id"]
472 ingress_port_id_list
= [ingress_port_id
]
473 egress_port_id_list
= [egress_port_id
]
475 vim_sfi_id
= target_vim
.new_sfi(
476 name
, ingress_port_id_list
, egress_port_id_list
, sfc_encap
=False
479 ro_vim_item_update
= {
480 "vim_id": vim_sfi_id
,
481 "vim_status": "DONE",
487 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
490 return "DONE", ro_vim_item_update
491 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
492 self
.logger
.debug(traceback
.format_exc())
494 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
496 ro_vim_item_update
= {
497 "vim_status": "VIM_ERROR",
499 "vim_message": str(e
),
502 return "FAILED", ro_vim_item_update
504 def delete(self
, ro_task
, task_index
):
505 task
= ro_task
["tasks"][task_index
]
506 task_id
= task
["task_id"]
507 sfi_vim_id
= ro_task
["vim_info"]["vim_id"]
508 ro_vim_item_update_ok
= {
509 "vim_status": "DELETED",
511 "vim_message": "DELETED",
517 target_vim
= self
.my_vims
[ro_task
["target_id"]]
518 target_vim
.delete_sfi(sfi_vim_id
)
519 except vimconn
.VimConnNotFoundException
:
520 ro_vim_item_update_ok
["vim_message"] = "already deleted"
521 except vimconn
.VimConnException
as e
:
523 "ro_task={} vim={} del-sfi={}: {}".format(
524 ro_task
["_id"], ro_task
["target_id"], sfi_vim_id
, e
527 ro_vim_item_update
= {
528 "vim_status": "VIM_ERROR",
529 "vim_message": "Error while deleting: {}".format(e
),
532 return "FAILED", ro_vim_item_update
535 "task={} {} del-sfi={} {}".format(
537 ro_task
["target_id"],
539 ro_vim_item_update_ok
.get("vim_message", ""),
543 return "DONE", ro_vim_item_update_ok
546 class VimInteractionSf(VimInteractionBase
):
547 def new(self
, ro_task
, task_index
, task_depends
):
548 task
= ro_task
["tasks"][task_index
]
549 task_id
= task
["task_id"]
551 target_vim
= self
.my_vims
[ro_task
["target_id"]]
555 params
= task
["params"]
556 params_copy
= deepcopy(params
)
557 name
= params_copy
["name"]
558 sfi_list
= params_copy
["sfis"]
562 sfi_id
= task_depends
[sfi
] if sfi
.startswith("TASK-") else sfi
563 sfi_id_list
.append(sfi_id
)
565 vim_sf_id
= target_vim
.new_sf(name
, sfi_id_list
, sfc_encap
=False)
567 ro_vim_item_update
= {
569 "vim_status": "DONE",
575 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
578 return "DONE", ro_vim_item_update
579 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
580 self
.logger
.debug(traceback
.format_exc())
582 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
584 ro_vim_item_update
= {
585 "vim_status": "VIM_ERROR",
587 "vim_message": str(e
),
590 return "FAILED", ro_vim_item_update
592 def delete(self
, ro_task
, task_index
):
593 task
= ro_task
["tasks"][task_index
]
594 task_id
= task
["task_id"]
595 sf_vim_id
= ro_task
["vim_info"]["vim_id"]
596 ro_vim_item_update_ok
= {
597 "vim_status": "DELETED",
599 "vim_message": "DELETED",
605 target_vim
= self
.my_vims
[ro_task
["target_id"]]
606 target_vim
.delete_sf(sf_vim_id
)
607 except vimconn
.VimConnNotFoundException
:
608 ro_vim_item_update_ok
["vim_message"] = "already deleted"
609 except vimconn
.VimConnException
as e
:
611 "ro_task={} vim={} del-sf={}: {}".format(
612 ro_task
["_id"], ro_task
["target_id"], sf_vim_id
, e
615 ro_vim_item_update
= {
616 "vim_status": "VIM_ERROR",
617 "vim_message": "Error while deleting: {}".format(e
),
620 return "FAILED", ro_vim_item_update
623 "task={} {} del-sf={} {}".format(
625 ro_task
["target_id"],
627 ro_vim_item_update_ok
.get("vim_message", ""),
631 return "DONE", ro_vim_item_update_ok
634 class VimInteractionSfp(VimInteractionBase
):
635 def new(self
, ro_task
, task_index
, task_depends
):
636 task
= ro_task
["tasks"][task_index
]
637 task_id
= task
["task_id"]
639 target_vim
= self
.my_vims
[ro_task
["target_id"]]
643 params
= task
["params"]
644 params_copy
= deepcopy(params
)
645 name
= params_copy
["name"]
646 sf_list
= params_copy
["sfs"]
647 classification_list
= params_copy
["classifications"]
649 classification_id_list
= []
652 for classification
in classification_list
:
654 task_depends
[classification
]
655 if classification
.startswith("TASK-")
658 classification_id_list
.append(classi_id
)
661 sf_id
= task_depends
[sf
] if sf
.startswith("TASK-") else sf
662 sf_id_list
.append(sf_id
)
664 vim_sfp_id
= target_vim
.new_sfp(
665 name
, classification_id_list
, sf_id_list
, sfc_encap
=False
668 ro_vim_item_update
= {
669 "vim_id": vim_sfp_id
,
670 "vim_status": "DONE",
676 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
679 return "DONE", ro_vim_item_update
680 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
681 self
.logger
.debug(traceback
.format_exc())
683 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
685 ro_vim_item_update
= {
686 "vim_status": "VIM_ERROR",
688 "vim_message": str(e
),
691 return "FAILED", ro_vim_item_update
693 def delete(self
, ro_task
, task_index
):
694 task
= ro_task
["tasks"][task_index
]
695 task_id
= task
["task_id"]
696 sfp_vim_id
= ro_task
["vim_info"]["vim_id"]
697 ro_vim_item_update_ok
= {
698 "vim_status": "DELETED",
700 "vim_message": "DELETED",
706 target_vim
= self
.my_vims
[ro_task
["target_id"]]
707 target_vim
.delete_sfp(sfp_vim_id
)
708 except vimconn
.VimConnNotFoundException
:
709 ro_vim_item_update_ok
["vim_message"] = "already deleted"
710 except vimconn
.VimConnException
as e
:
712 "ro_task={} vim={} del-sfp={}: {}".format(
713 ro_task
["_id"], ro_task
["target_id"], sfp_vim_id
, e
716 ro_vim_item_update
= {
717 "vim_status": "VIM_ERROR",
718 "vim_message": "Error while deleting: {}".format(e
),
721 return "FAILED", ro_vim_item_update
724 "task={} {} del-sfp={} {}".format(
726 ro_task
["target_id"],
728 ro_vim_item_update_ok
.get("vim_message", ""),
732 return "DONE", ro_vim_item_update_ok
735 class VimInteractionVdu(VimInteractionBase
):
736 max_retries_inject_ssh_key
= 20 # 20 times
737 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
739 def new(self
, ro_task
, task_index
, task_depends
):
740 task
= ro_task
["tasks"][task_index
]
741 task_id
= task
["task_id"]
743 target_vim
= self
.my_vims
[ro_task
["target_id"]]
746 params
= task
["params"]
747 params_copy
= deepcopy(params
)
748 net_list
= params_copy
["net_list"]
751 # change task_id into network_id
752 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
753 network_id
= task_depends
[net
["net_id"]]
756 raise NsWorkerException(
757 "Cannot create VM because depends on a network not created or found "
758 "for {}".format(net
["net_id"])
761 net
["net_id"] = network_id
763 if params_copy
["image_id"].startswith("TASK-"):
764 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
766 if params_copy
["flavor_id"].startswith("TASK-"):
767 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
769 affinity_group_list
= params_copy
["affinity_group_list"]
770 for affinity_group
in affinity_group_list
:
771 # change task_id into affinity_group_id
772 if "affinity_group_id" in affinity_group
and affinity_group
[
774 ].startswith("TASK-"):
775 affinity_group_id
= task_depends
[
776 affinity_group
["affinity_group_id"]
779 if not affinity_group_id
:
780 raise NsWorkerException(
781 "found for {}".format(affinity_group
["affinity_group_id"])
784 affinity_group
["affinity_group_id"] = affinity_group_id
785 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
786 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
788 # add to created items previous_created_volumes (healing)
789 if task
.get("previous_created_volumes"):
790 for k
, v
in task
["previous_created_volumes"].items():
793 ro_vim_item_update
= {
795 "vim_status": "BUILD",
797 "created_items": created_items
,
800 "interfaces_vim_ids": interfaces
,
802 "interfaces_backup": [],
805 "task={} {} new-vm={} created={}".format(
806 task_id
, ro_task
["target_id"], vim_vm_id
, created
810 return "BUILD", ro_vim_item_update
811 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
812 self
.logger
.debug(traceback
.format_exc())
814 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
816 ro_vim_item_update
= {
817 "vim_status": "VIM_ERROR",
819 "vim_message": str(e
),
822 return "FAILED", ro_vim_item_update
824 def delete(self
, ro_task
, task_index
):
825 task
= ro_task
["tasks"][task_index
]
826 task_id
= task
["task_id"]
827 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
828 ro_vim_item_update_ok
= {
829 "vim_status": "DELETED",
831 "vim_message": "DELETED",
837 "delete_vminstance: vm_vim_id={} created_items={}".format(
838 vm_vim_id
, ro_task
["vim_info"]["created_items"]
841 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
842 target_vim
= self
.my_vims
[ro_task
["target_id"]]
843 target_vim
.delete_vminstance(
845 ro_task
["vim_info"]["created_items"],
846 ro_task
["vim_info"].get("volumes_to_hold", []),
848 except vimconn
.VimConnNotFoundException
:
849 ro_vim_item_update_ok
["vim_message"] = "already deleted"
850 except vimconn
.VimConnException
as e
:
852 "ro_task={} vim={} del-vm={}: {}".format(
853 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
856 ro_vim_item_update
= {
857 "vim_status": "VIM_ERROR",
858 "vim_message": "Error while deleting: {}".format(e
),
861 return "FAILED", ro_vim_item_update
864 "task={} {} del-vm={} {}".format(
866 ro_task
["target_id"],
868 ro_vim_item_update_ok
.get("vim_message", ""),
872 return "DONE", ro_vim_item_update_ok
874 def refresh(self
, ro_task
):
875 """Call VIM to get vm status"""
876 ro_task_id
= ro_task
["_id"]
877 target_vim
= self
.my_vims
[ro_task
["target_id"]]
878 vim_id
= ro_task
["vim_info"]["vim_id"]
883 vm_to_refresh_list
= [vim_id
]
885 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
886 vim_info
= vim_dict
[vim_id
]
888 if vim_info
["status"] == "ACTIVE":
890 elif vim_info
["status"] == "BUILD":
891 task_status
= "BUILD"
893 task_status
= "FAILED"
895 # try to load and parse vim_information
897 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
898 if vim_info_info
.get("name"):
899 vim_info
["name"] = vim_info_info
["name"]
900 except Exception as vim_info_error
:
901 self
.logger
.exception(
902 f
"{vim_info_error} occured while getting the vim_info from yaml"
904 except vimconn
.VimConnException
as e
:
905 # Mark all tasks at VIM_ERROR status
907 "ro_task={} vim={} get-vm={}: {}".format(
908 ro_task_id
, ro_task
["target_id"], vim_id
, e
911 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
912 task_status
= "FAILED"
914 ro_vim_item_update
= {}
916 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
918 if vim_info
.get("interfaces"):
919 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
923 for iface
in vim_info
["interfaces"]
924 if vim_iface_id
== iface
["vim_interface_id"]
929 # iface.pop("vim_info", None)
930 vim_interfaces
.append(iface
)
934 for t
in ro_task
["tasks"]
935 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
937 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
938 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
942 mgmt_vdu_iface
= task_create
.get(
943 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
946 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
948 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
949 ro_vim_item_update
["interfaces"] = vim_interfaces
951 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
952 ro_vim_item_update
["vim_status"] = vim_info
["status"]
954 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
955 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
957 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
958 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
959 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
960 elif vim_info
["status"] == "DELETED":
961 ro_vim_item_update
["vim_id"] = None
962 ro_vim_item_update
["vim_message"] = "Deleted externally"
964 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
965 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
967 if ro_vim_item_update
:
969 "ro_task={} {} get-vm={}: status={} {}".format(
971 ro_task
["target_id"],
973 ro_vim_item_update
.get("vim_status"),
975 ro_vim_item_update
.get("vim_message")
976 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
982 return task_status
, ro_vim_item_update
984 def exec(self
, ro_task
, task_index
, task_depends
):
985 task
= ro_task
["tasks"][task_index
]
986 task_id
= task
["task_id"]
987 target_vim
= self
.my_vims
[ro_task
["target_id"]]
988 db_task_update
= {"retries": 0}
989 retries
= task
.get("retries", 0)
992 params
= task
["params"]
993 params_copy
= deepcopy(params
)
994 params_copy
["ro_key"] = self
.db
.decrypt(
995 params_copy
.pop("private_key"),
996 params_copy
.pop("schema_version"),
997 params_copy
.pop("salt"),
999 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
1000 target_vim
.inject_user_key(**params_copy
)
1002 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
1009 ) # params_copy["key"]
1010 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1013 self
.logger
.debug(traceback
.format_exc())
1014 if retries
< self
.max_retries_inject_ssh_key
:
1020 "next_retry": self
.time_retries_inject_ssh_key
,
1025 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
1027 ro_vim_item_update
= {"vim_message": str(e
)}
1029 return "FAILED", ro_vim_item_update
, db_task_update
1032 class VimInteractionImage(VimInteractionBase
):
1033 def new(self
, ro_task
, task_index
, task_depends
):
1034 task
= ro_task
["tasks"][task_index
]
1035 task_id
= task
["task_id"]
1038 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1043 if task
.get("find_params"):
1044 vim_images
= target_vim
.get_image_list(
1045 task
["find_params"].get("filter_dict", {})
1049 raise NsWorkerExceptionNotFound(
1050 "Image not found with this criteria: '{}'".format(
1054 elif len(vim_images
) > 1:
1055 raise NsWorkerException(
1056 "More than one image found with this criteria: '{}'".format(
1061 vim_image_id
= vim_images
[0]["id"]
1063 ro_vim_item_update
= {
1064 "vim_id": vim_image_id
,
1065 "vim_status": "ACTIVE",
1067 "created_items": created_items
,
1068 "vim_details": None,
1069 "vim_message": None,
1072 "task={} {} new-image={} created={}".format(
1073 task_id
, ro_task
["target_id"], vim_image_id
, created
1077 return "DONE", ro_vim_item_update
1078 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
1080 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
1082 ro_vim_item_update
= {
1083 "vim_status": "VIM_ERROR",
1085 "vim_message": str(e
),
1088 return "FAILED", ro_vim_item_update
1091 class VimInteractionSharedVolume(VimInteractionBase
):
1092 def delete(self
, ro_task
, task_index
):
1093 task
= ro_task
["tasks"][task_index
]
1094 task_id
= task
["task_id"]
1095 shared_volume_vim_id
= ro_task
["vim_info"]["vim_id"]
1096 created_items
= ro_task
["vim_info"]["created_items"]
1097 ro_vim_item_update_ok
= {
1098 "vim_status": "DELETED",
1100 "vim_message": "DELETED",
1103 if created_items
and created_items
.get(shared_volume_vim_id
).get("keep"):
1104 ro_vim_item_update_ok
= {
1105 "vim_status": "ACTIVE",
1107 "vim_message": None,
1109 return "DONE", ro_vim_item_update_ok
1111 if shared_volume_vim_id
:
1112 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1113 target_vim
.delete_shared_volumes(shared_volume_vim_id
)
1114 except vimconn
.VimConnNotFoundException
:
1115 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1116 except vimconn
.VimConnException
as e
:
1118 "ro_task={} vim={} del-shared-volume={}: {}".format(
1119 ro_task
["_id"], ro_task
["target_id"], shared_volume_vim_id
, e
1122 ro_vim_item_update
= {
1123 "vim_status": "VIM_ERROR",
1124 "vim_message": "Error while deleting: {}".format(e
),
1127 return "FAILED", ro_vim_item_update
1130 "task={} {} del-shared-volume={} {}".format(
1132 ro_task
["target_id"],
1133 shared_volume_vim_id
,
1134 ro_vim_item_update_ok
.get("vim_message", ""),
1138 return "DONE", ro_vim_item_update_ok
1140 def new(self
, ro_task
, task_index
, task_depends
):
1141 task
= ro_task
["tasks"][task_index
]
1142 task_id
= task
["task_id"]
1145 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1148 shared_volume_vim_id
= None
1149 shared_volume_data
= None
1151 if task
.get("params"):
1152 shared_volume_data
= task
["params"]
1154 if shared_volume_data
:
1156 f
"Creating the new shared_volume for {shared_volume_data}\n"
1160 shared_volume_vim_id
,
1161 ) = target_vim
.new_shared_volumes(shared_volume_data
)
1163 created_items
[shared_volume_vim_id
] = {
1164 "name": shared_volume_name
,
1165 "keep": shared_volume_data
.get("keep"),
1168 ro_vim_item_update
= {
1169 "vim_id": shared_volume_vim_id
,
1170 "vim_status": "ACTIVE",
1172 "created_items": created_items
,
1173 "vim_details": None,
1174 "vim_message": None,
1177 "task={} {} new-shared-volume={} created={}".format(
1178 task_id
, ro_task
["target_id"], shared_volume_vim_id
, created
1182 return "DONE", ro_vim_item_update
1183 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1185 "task={} vim={} new-shared-volume:"
1186 " {}".format(task_id
, ro_task
["target_id"], e
)
1188 ro_vim_item_update
= {
1189 "vim_status": "VIM_ERROR",
1191 "vim_message": str(e
),
1194 return "FAILED", ro_vim_item_update
1197 class VimInteractionFlavor(VimInteractionBase
):
1198 def delete(self
, ro_task
, task_index
):
1199 task
= ro_task
["tasks"][task_index
]
1200 task_id
= task
["task_id"]
1201 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
1202 ro_vim_item_update_ok
= {
1203 "vim_status": "DELETED",
1205 "vim_message": "DELETED",
1211 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1212 target_vim
.delete_flavor(flavor_vim_id
)
1213 except vimconn
.VimConnNotFoundException
:
1214 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1215 except vimconn
.VimConnException
as e
:
1217 "ro_task={} vim={} del-flavor={}: {}".format(
1218 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
1221 ro_vim_item_update
= {
1222 "vim_status": "VIM_ERROR",
1223 "vim_message": "Error while deleting: {}".format(e
),
1226 return "FAILED", ro_vim_item_update
1229 "task={} {} del-flavor={} {}".format(
1231 ro_task
["target_id"],
1233 ro_vim_item_update_ok
.get("vim_message", ""),
1237 return "DONE", ro_vim_item_update_ok
1239 def new(self
, ro_task
, task_index
, task_depends
):
1240 task
= ro_task
["tasks"][task_index
]
1241 task_id
= task
["task_id"]
1244 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1247 vim_flavor_id
= None
1249 if task
.get("find_params", {}).get("vim_flavor_id"):
1250 vim_flavor_id
= task
["find_params"]["vim_flavor_id"]
1251 elif task
.get("find_params", {}).get("flavor_data"):
1253 flavor_data
= task
["find_params"]["flavor_data"]
1254 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
1255 except vimconn
.VimConnNotFoundException
as flavor_not_found_msg
:
1256 self
.logger
.warning(
1257 f
"VimConnNotFoundException occured: {flavor_not_found_msg}"
1260 if not vim_flavor_id
and task
.get("params"):
1262 flavor_data
= task
["params"]["flavor_data"]
1263 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
1266 ro_vim_item_update
= {
1267 "vim_id": vim_flavor_id
,
1268 "vim_status": "ACTIVE",
1270 "created_items": created_items
,
1271 "vim_details": None,
1272 "vim_message": None,
1275 "task={} {} new-flavor={} created={}".format(
1276 task_id
, ro_task
["target_id"], vim_flavor_id
, created
1280 return "DONE", ro_vim_item_update
1281 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1283 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
1285 ro_vim_item_update
= {
1286 "vim_status": "VIM_ERROR",
1288 "vim_message": str(e
),
1291 return "FAILED", ro_vim_item_update
1294 class VimInteractionAffinityGroup(VimInteractionBase
):
1295 def delete(self
, ro_task
, task_index
):
1296 task
= ro_task
["tasks"][task_index
]
1297 task_id
= task
["task_id"]
1298 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
1299 ro_vim_item_update_ok
= {
1300 "vim_status": "DELETED",
1302 "vim_message": "DELETED",
1307 if affinity_group_vim_id
:
1308 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1309 target_vim
.delete_affinity_group(affinity_group_vim_id
)
1310 except vimconn
.VimConnNotFoundException
:
1311 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1312 except vimconn
.VimConnException
as e
:
1314 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
1315 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
1318 ro_vim_item_update
= {
1319 "vim_status": "VIM_ERROR",
1320 "vim_message": "Error while deleting: {}".format(e
),
1323 return "FAILED", ro_vim_item_update
1326 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
1328 ro_task
["target_id"],
1329 affinity_group_vim_id
,
1330 ro_vim_item_update_ok
.get("vim_message", ""),
1334 return "DONE", ro_vim_item_update_ok
1336 def new(self
, ro_task
, task_index
, task_depends
):
1337 task
= ro_task
["tasks"][task_index
]
1338 task_id
= task
["task_id"]
1341 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1344 affinity_group_vim_id
= None
1345 affinity_group_data
= None
1346 param_affinity_group_id
= ""
1348 if task
.get("params"):
1349 affinity_group_data
= task
["params"].get("affinity_group_data")
1351 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
1353 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
1354 "vim-affinity-group-id"
1356 affinity_group_vim_id
= target_vim
.get_affinity_group(
1357 param_affinity_group_id
1359 except vimconn
.VimConnNotFoundException
:
1361 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
1362 "could not be found at VIM. Creating a new one.".format(
1363 task_id
, ro_task
["target_id"], param_affinity_group_id
1367 if not affinity_group_vim_id
and affinity_group_data
:
1368 affinity_group_vim_id
= target_vim
.new_affinity_group(
1373 ro_vim_item_update
= {
1374 "vim_id": affinity_group_vim_id
,
1375 "vim_status": "ACTIVE",
1377 "created_items": created_items
,
1378 "vim_details": None,
1379 "vim_message": None,
1382 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
1383 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
1387 return "DONE", ro_vim_item_update
1388 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1390 "task={} vim={} new-affinity-or-anti-affinity-group:"
1391 " {}".format(task_id
, ro_task
["target_id"], e
)
1393 ro_vim_item_update
= {
1394 "vim_status": "VIM_ERROR",
1396 "vim_message": str(e
),
1399 return "FAILED", ro_vim_item_update
1402 class VimInteractionUpdateVdu(VimInteractionBase
):
1403 def exec(self
, ro_task
, task_index
, task_depends
):
1404 task
= ro_task
["tasks"][task_index
]
1405 task_id
= task
["task_id"]
1406 db_task_update
= {"retries": 0}
1409 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1413 if task
.get("params"):
1414 vim_vm_id
= task
["params"].get("vim_vm_id")
1415 action
= task
["params"].get("action")
1416 context
= {action
: action
}
1417 target_vim
.action_vminstance(vim_vm_id
, context
)
1419 ro_vim_item_update
= {
1420 "vim_id": vim_vm_id
,
1421 "vim_status": "ACTIVE",
1423 "created_items": created_items
,
1424 "vim_details": None,
1425 "vim_message": None,
1428 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1430 return "DONE", ro_vim_item_update
, db_task_update
1431 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1433 "task={} vim={} VM Migration:"
1434 " {}".format(task_id
, ro_task
["target_id"], e
)
1436 ro_vim_item_update
= {
1437 "vim_status": "VIM_ERROR",
1439 "vim_message": str(e
),
1442 return "FAILED", ro_vim_item_update
, db_task_update
1445 class VimInteractionSdnNet(VimInteractionBase
):
1447 def _match_pci(port_pci
, mapping
):
1449 Check if port_pci matches with mapping.
1450 The mapping can have brackets to indicate that several chars are accepted. e.g
1451 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1452 :param port_pci: text
1453 :param mapping: text, can contain brackets to indicate several chars are available
1454 :return: True if matches, False otherwise
1456 if not port_pci
or not mapping
:
1458 if port_pci
== mapping
:
1464 bracket_start
= mapping
.find("[", mapping_index
)
1466 if bracket_start
== -1:
1469 bracket_end
= mapping
.find("]", bracket_start
)
1470 if bracket_end
== -1:
1473 length
= bracket_start
- mapping_index
1476 and port_pci
[pci_index
: pci_index
+ length
]
1477 != mapping
[mapping_index
:bracket_start
]
1482 port_pci
[pci_index
+ length
]
1483 not in mapping
[bracket_start
+ 1 : bracket_end
]
1487 pci_index
+= length
+ 1
1488 mapping_index
= bracket_end
+ 1
1490 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
1495 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
1497 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1498 :param vim_account_id:
1503 for vld
in vlds_to_connect
:
1504 table
, _
, db_id
= vld
.partition(":")
1505 db_id
, _
, vld
= db_id
.partition(":")
1506 _
, _
, vld_id
= vld
.partition(".")
1508 if table
== "vnfrs":
1509 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
1510 iface_key
= "vnf-vld-id"
1511 else: # table == "nsrs"
1512 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1513 iface_key
= "ns-vld-id"
1515 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1517 for db_vnfr
in db_vnfrs
:
1518 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1519 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1520 if interface
.get(iface_key
) == vld_id
and interface
.get(
1522 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1524 interface_
= interface
.copy()
1525 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1526 db_vnfr
["_id"], vdu_index
, iface_index
1529 if vdur
.get("status") == "ERROR":
1530 interface_
["status"] = "ERROR"
1532 interfaces
.append(interface_
)
1536 def refresh(self
, ro_task
):
1537 # look for task create
1538 task_create_index
, _
= next(
1540 for i_t
in enumerate(ro_task
["tasks"])
1542 and i_t
[1]["action"] == "CREATE"
1543 and i_t
[1]["status"] != "FINISHED"
1546 return self
.new(ro_task
, task_create_index
, None)
1548 def new(self
, ro_task
, task_index
, task_depends
):
1549 task
= ro_task
["tasks"][task_index
]
1550 task_id
= task
["task_id"]
1551 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1553 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1555 created_items
= ro_task
["vim_info"].get("created_items")
1556 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1557 new_connected_ports
= []
1558 last_update
= ro_task
["vim_info"].get("last_update", 0)
1559 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1561 created
= ro_task
["vim_info"].get("created", False)
1566 params
= task
["params"]
1567 vlds_to_connect
= params
.get("vlds", [])
1568 associated_vim
= params
.get("target_vim")
1569 # external additional ports
1570 additional_ports
= params
.get("sdn-ports") or ()
1571 _
, _
, vim_account_id
= (
1573 if associated_vim
is None
1574 else associated_vim
.partition(":")
1578 # get associated VIM
1579 if associated_vim
not in self
.db_vims
:
1580 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1581 "vim_accounts", {"_id": vim_account_id
}
1584 db_vim
= self
.db_vims
[associated_vim
]
1586 # look for ports to connect
1587 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1591 pending_ports
= error_ports
= 0
1593 sdn_need_update
= False
1596 vlan_used
= port
.get("vlan") or vlan_used
1598 # TODO. Do not connect if already done
1599 if not port
.get("compute_node") or not port
.get("pci"):
1600 if port
.get("status") == "ERROR":
1607 compute_node_mappings
= next(
1610 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1611 if c
and c
["compute_node"] == port
["compute_node"]
1616 if compute_node_mappings
:
1617 # process port_mapping pci of type 0000:af:1[01].[1357]
1621 for p
in compute_node_mappings
["ports"]
1622 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1628 if not db_vim
["config"].get("mapping_not_needed"):
1630 "Port mapping not found for compute_node={} pci={}".format(
1631 port
["compute_node"], port
["pci"]
1638 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1640 "service_endpoint_id": pmap
.get("service_endpoint_id")
1641 or service_endpoint_id
,
1642 "service_endpoint_encapsulation_type": (
1643 "dot1q" if port
["type"] == "SR-IOV" else None
1645 "service_endpoint_encapsulation_info": {
1646 "vlan": port
.get("vlan"),
1647 "mac": port
.get("mac-address"),
1648 "device_id": pmap
.get("device_id") or port
["compute_node"],
1649 "device_interface_id": pmap
.get("device_interface_id")
1651 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1652 "switch_port": pmap
.get("switch_port"),
1653 "service_mapping_info": pmap
.get("service_mapping_info"),
1658 # if port["modified_at"] > last_update:
1659 # sdn_need_update = True
1660 new_connected_ports
.append(port
["id"]) # TODO
1661 sdn_ports
.append(new_port
)
1665 "{} interfaces have not been created as VDU is on ERROR status".format(
1670 # connect external ports
1671 for index
, additional_port
in enumerate(additional_ports
):
1672 additional_port_id
= additional_port
.get(
1673 "service_endpoint_id"
1674 ) or "external-{}".format(index
)
1677 "service_endpoint_id": additional_port_id
,
1678 "service_endpoint_encapsulation_type": additional_port
.get(
1679 "service_endpoint_encapsulation_type", "dot1q"
1681 "service_endpoint_encapsulation_info": {
1682 "vlan": additional_port
.get("vlan") or vlan_used
,
1683 "mac": additional_port
.get("mac_address"),
1684 "device_id": additional_port
.get("device_id"),
1685 "device_interface_id": additional_port
.get(
1686 "device_interface_id"
1688 "switch_dpid": additional_port
.get("switch_dpid")
1689 or additional_port
.get("switch_id"),
1690 "switch_port": additional_port
.get("switch_port"),
1691 "service_mapping_info": additional_port
.get(
1692 "service_mapping_info"
1697 new_connected_ports
.append(additional_port_id
)
1700 # if there are more ports to connect or they have been modified, call create/update
1702 sdn_status
= "ERROR"
1703 sdn_info
= "; ".join(error_list
)
1704 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1705 last_update
= time
.time()
1708 if len(sdn_ports
) < 2:
1709 sdn_status
= "ACTIVE"
1711 if not pending_ports
:
1713 "task={} {} new-sdn-net done, less than 2 ports".format(
1714 task_id
, ro_task
["target_id"]
1718 net_type
= params
.get("type") or "ELAN"
1722 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1725 "task={} {} new-sdn-net={} created={}".format(
1726 task_id
, ro_task
["target_id"], sdn_net_id
, created
1730 created_items
= target_vim
.edit_connectivity_service(
1731 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1735 "task={} {} update-sdn-net={} created={}".format(
1736 task_id
, ro_task
["target_id"], sdn_net_id
, created
1740 connected_ports
= new_connected_ports
1742 wim_status_dict
= target_vim
.get_connectivity_service_status(
1743 sdn_net_id
, conn_info
=created_items
1745 sdn_status
= wim_status_dict
["sdn_status"]
1747 if wim_status_dict
.get("sdn_info"):
1748 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1750 if wim_status_dict
.get("error_msg"):
1751 sdn_info
= wim_status_dict
.get("error_msg") or ""
1754 if sdn_status
!= "ERROR":
1755 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1756 len(ports
) - pending_ports
, len(ports
)
1759 if sdn_status
== "ACTIVE":
1760 sdn_status
= "BUILD"
1762 ro_vim_item_update
= {
1763 "vim_id": sdn_net_id
,
1764 "vim_status": sdn_status
,
1766 "created_items": created_items
,
1767 "connected_ports": connected_ports
,
1768 "vim_details": sdn_info
,
1769 "vim_message": None,
1770 "last_update": last_update
,
1773 return sdn_status
, ro_vim_item_update
1774 except Exception as e
:
1776 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1777 exc_info
=not isinstance(
1778 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1781 ro_vim_item_update
= {
1782 "vim_status": "VIM_ERROR",
1784 "vim_message": str(e
),
1787 return "FAILED", ro_vim_item_update
1789 def delete(self
, ro_task
, task_index
):
1790 task
= ro_task
["tasks"][task_index
]
1791 task_id
= task
["task_id"]
1792 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1793 ro_vim_item_update_ok
= {
1794 "vim_status": "DELETED",
1796 "vim_message": "DELETED",
1802 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1803 target_vim
.delete_connectivity_service(
1804 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1807 except Exception as e
:
1809 isinstance(e
, sdnconn
.SdnConnectorError
)
1810 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1812 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1815 "ro_task={} vim={} del-sdn-net={}: {}".format(
1816 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1818 exc_info
=not isinstance(
1819 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1822 ro_vim_item_update
= {
1823 "vim_status": "VIM_ERROR",
1824 "vim_message": "Error while deleting: {}".format(e
),
1827 return "FAILED", ro_vim_item_update
1830 "task={} {} del-sdn-net={} {}".format(
1832 ro_task
["target_id"],
1834 ro_vim_item_update_ok
.get("vim_message", ""),
1838 return "DONE", ro_vim_item_update_ok
1841 class VimInteractionMigration(VimInteractionBase
):
1842 def exec(self
, ro_task
, task_index
, task_depends
):
1843 task
= ro_task
["tasks"][task_index
]
1844 task_id
= task
["task_id"]
1845 db_task_update
= {"retries": 0}
1846 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1850 refreshed_vim_info
= {}
1854 if task
.get("params"):
1855 vim_vm_id
= task
["params"].get("vim_vm_id")
1856 migrate_host
= task
["params"].get("migrate_host")
1857 _
, migrated_compute_node
= target_vim
.migrate_instance(
1858 vim_vm_id
, migrate_host
1861 if migrated_compute_node
:
1862 # When VM is migrated, vdu["vim_info"] needs to be updated
1863 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1864 ro_task
["target_id"]
1867 # Refresh VM to get new vim_info
1868 vm_to_refresh_list
= [vim_vm_id
]
1869 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1870 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1872 if refreshed_vim_info
.get("interfaces"):
1873 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1877 for iface
in refreshed_vim_info
["interfaces"]
1878 if old_iface
["vim_interface_id"]
1879 == iface
["vim_interface_id"]
1883 vim_interfaces
.append(iface
)
1885 ro_vim_item_update
= {
1886 "vim_id": vim_vm_id
,
1887 "vim_status": "ACTIVE",
1889 "created_items": created_items
,
1890 "vim_details": None,
1891 "vim_message": None,
1894 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1898 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1901 ro_vim_item_update
["interfaces"] = vim_interfaces
1904 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1907 return "DONE", ro_vim_item_update
, db_task_update
1909 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1911 "task={} vim={} VM Migration:"
1912 " {}".format(task_id
, ro_task
["target_id"], e
)
1914 ro_vim_item_update
= {
1915 "vim_status": "VIM_ERROR",
1917 "vim_message": str(e
),
1920 return "FAILED", ro_vim_item_update
, db_task_update
1923 class VimInteractionResize(VimInteractionBase
):
1924 def exec(self
, ro_task
, task_index
, task_depends
):
1925 task
= ro_task
["tasks"][task_index
]
1926 task_id
= task
["task_id"]
1927 db_task_update
= {"retries": 0}
1929 target_flavor_uuid
= None
1931 refreshed_vim_info
= {}
1932 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1935 params
= task
["params"]
1936 params_copy
= deepcopy(params
)
1937 target_flavor_uuid
= task_depends
[params_copy
["flavor_id"]]
1939 if task
.get("params"):
1940 self
.logger
.info("vim_vm_id %s", vim_vm_id
)
1942 if target_flavor_uuid
is not None:
1943 resized_status
= target_vim
.resize_instance(
1944 vim_vm_id
, target_flavor_uuid
1948 # Refresh VM to get new vim_info
1949 vm_to_refresh_list
= [vim_vm_id
]
1950 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1951 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1953 ro_vim_item_update
= {
1954 "vim_id": vim_vm_id
,
1955 "vim_status": "ACTIVE",
1957 "created_items": created_items
,
1958 "vim_details": None,
1959 "vim_message": None,
1962 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1966 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1969 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1971 return "DONE", ro_vim_item_update
, db_task_update
1972 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1974 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1976 ro_vim_item_update
= {
1977 "vim_status": "VIM_ERROR",
1979 "vim_message": str(e
),
1982 return "FAILED", ro_vim_item_update
, db_task_update
1985 class ConfigValidate
:
1986 def __init__(self
, config
: Dict
):
1991 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1993 self
.conf
["period"]["refresh_active"] >= 60
1994 or self
.conf
["period"]["refresh_active"] == -1
1996 return self
.conf
["period"]["refresh_active"]
2002 return self
.conf
["period"]["refresh_build"]
2006 return self
.conf
["period"]["refresh_image"]
2010 return self
.conf
["period"]["refresh_error"]
2013 def queue_size(self
):
2014 return self
.conf
["period"]["queue_size"]
2017 class NsWorker(threading
.Thread
):
2018 def __init__(self
, worker_index
, config
, plugins
, db
):
2020 :param worker_index: thread index
2021 :param config: general configuration of RO, among others the process_id with the docker id where it runs
2022 :param plugins: global shared dict with the loaded plugins
2023 :param db: database class instance to use
2025 threading
.Thread
.__init
__(self
)
2026 self
.config
= config
2027 self
.plugins
= plugins
2028 self
.plugin_name
= "unknown"
2029 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
2030 self
.worker_index
= worker_index
2031 # refresh periods for created items
2032 self
.refresh_config
= ConfigValidate(config
)
2033 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
2034 # targetvim: vimplugin class
2036 # targetvim: vim information from database
2039 self
.vim_targets
= []
2040 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
2043 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2044 "shared-volumes": VimInteractionSharedVolume(
2045 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2047 "classification": VimInteractionClassification(
2048 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2050 "sfi": VimInteractionSfi(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2051 "sf": VimInteractionSf(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2052 "sfp": VimInteractionSfp(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2053 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2054 "image": VimInteractionImage(
2055 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2057 "flavor": VimInteractionFlavor(
2058 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2060 "sdn_net": VimInteractionSdnNet(
2061 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2063 "update": VimInteractionUpdateVdu(
2064 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2066 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
2067 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2069 "migrate": VimInteractionMigration(
2070 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2072 "verticalscale": VimInteractionResize(
2073 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2076 self
.time_last_task_processed
= None
2077 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
2078 self
.tasks_to_delete
= []
2079 # it is idle when there are not vim_targets associated
2081 self
.task_locked_time
= config
["global"]["task_locked_time"]
2083 def insert_task(self
, task
):
2085 self
.task_queue
.put(task
, False)
2088 raise NsWorkerException("timeout inserting a task")
2090 def terminate(self
):
2091 self
.insert_task("exit")
2093 def del_task(self
, task
):
2094 with self
.task_lock
:
2095 if task
["status"] == "SCHEDULED":
2096 task
["status"] = "SUPERSEDED"
2098 else: # task["status"] == "processing"
2099 self
.task_lock
.release()
2102 def _process_vim_config(self
, target_id
: str, db_vim
: dict) -> None:
2104 Process vim config, creating vim configuration files as ca_cert
2105 :param target_id: vim/sdn/wim + id
2106 :param db_vim: Vim dictionary obtained from database
2107 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
2109 if not db_vim
.get("config"):
2113 work_dir
= "/app/osm_ro/certs"
2116 if db_vim
["config"].get("ca_cert_content"):
2117 file_name
= f
"{work_dir}/{target_id}:{self.worker_index}"
2119 if not path
.isdir(file_name
):
2122 file_name
= file_name
+ "/ca_cert"
2124 with
open(file_name
, "w") as f
:
2125 f
.write(db_vim
["config"]["ca_cert_content"])
2126 del db_vim
["config"]["ca_cert_content"]
2127 db_vim
["config"]["ca_cert"] = file_name
2128 except Exception as e
:
2129 raise NsWorkerException(
2130 "Error writing to file '{}': {}".format(file_name
, e
)
2133 def _load_plugin(self
, name
, type="vim"):
2134 # type can be vim or sdn
2135 if "rovim_dummy" not in self
.plugins
:
2136 self
.plugins
["rovim_dummy"] = VimDummyConnector
2138 if "rosdn_dummy" not in self
.plugins
:
2139 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
2141 if name
in self
.plugins
:
2142 return self
.plugins
[name
]
2145 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
2146 self
.plugins
[name
] = ep
.load()
2147 except Exception as e
:
2148 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
2150 if name
and name
not in self
.plugins
:
2151 raise NsWorkerException(
2152 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
2155 return self
.plugins
[name
]
2157 def _unload_vim(self
, target_id
):
2159 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
2160 :param target_id: Contains type:_id; where type can be 'vim', ...
2164 self
.db_vims
.pop(target_id
, None)
2165 self
.my_vims
.pop(target_id
, None)
2167 if target_id
in self
.vim_targets
:
2168 self
.vim_targets
.remove(target_id
)
2170 self
.logger
.info("Unloaded {}".format(target_id
))
2171 except Exception as e
:
2172 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
2174 def _check_vim(self
, target_id
):
2176 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
2177 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
2180 target
, _
, _id
= target_id
.partition(":")
2186 loaded
= target_id
in self
.vim_targets
2190 else "wim_accounts" if target
== "wim" else "sdns"
2195 step
= "Getting {} from db".format(target_id
)
2196 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
2198 for op_index
, operation
in enumerate(
2199 db_vim
["_admin"].get("operations", ())
2201 if operation
["operationState"] != "PROCESSING":
2204 locked_at
= operation
.get("locked_at")
2206 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
2207 # some other thread is doing this operation
2211 op_text
= "_admin.operations.{}.".format(op_index
)
2213 if not self
.db
.set_one(
2217 op_text
+ "operationState": "PROCESSING",
2218 op_text
+ "locked_at": locked_at
,
2221 op_text
+ "locked_at": now
,
2222 "admin.current_operation": op_index
,
2224 fail_on_empty
=False,
2228 unset_dict
[op_text
+ "locked_at"] = None
2229 unset_dict
["current_operation"] = None
2230 step
= "Loading " + target_id
2231 error_text
= self
._load
_vim
(target_id
)
2234 step
= "Checking connectivity"
2237 self
.my_vims
[target_id
].check_vim_connectivity()
2239 self
.my_vims
[target_id
].check_credentials()
2241 update_dict
["_admin.operationalState"] = "ENABLED"
2242 update_dict
["_admin.detailed-status"] = ""
2243 unset_dict
[op_text
+ "detailed-status"] = None
2244 update_dict
[op_text
+ "operationState"] = "COMPLETED"
2248 except Exception as e
:
2249 error_text
= "{}: {}".format(step
, e
)
2250 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
2253 if update_dict
or unset_dict
:
2255 update_dict
[op_text
+ "operationState"] = "FAILED"
2256 update_dict
[op_text
+ "detailed-status"] = error_text
2257 unset_dict
.pop(op_text
+ "detailed-status", None)
2258 update_dict
["_admin.operationalState"] = "ERROR"
2259 update_dict
["_admin.detailed-status"] = error_text
2262 update_dict
[op_text
+ "statusEnteredTime"] = now
2266 q_filter
={"_id": _id
},
2267 update_dict
=update_dict
,
2269 fail_on_empty
=False,
2273 self
._unload
_vim
(target_id
)
2275 def _reload_vim(self
, target_id
):
2276 if target_id
in self
.vim_targets
:
2277 self
._load
_vim
(target_id
)
2279 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
2280 # just remove it to force load again next time it is needed
2281 self
.db_vims
.pop(target_id
, None)
2283 def _load_vim(self
, target_id
):
2285 Load or reload a vim_account, sdn_controller or wim_account.
2286 Read content from database, load the plugin if not loaded.
2287 In case of error loading the plugin, it loads a failing VIM_connector
2288 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
2289 :param target_id: Contains type:_id; where type can be 'vim', ...
2290 :return: None if ok, descriptive text if error
2292 target
, _
, _id
= target_id
.partition(":")
2296 else "wim_accounts" if target
== "wim" else "sdns"
2300 step
= "Getting {}={} from db".format(target
, _id
)
2303 # TODO process for wim, sdnc, ...
2304 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
2306 # if deep_get(vim, "config", "sdn-controller"):
2307 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
2308 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
2310 step
= "Decrypting password"
2311 schema_version
= vim
.get("schema_version")
2312 self
.db
.encrypt_decrypt_fields(
2315 fields
=("password", "secret"),
2316 schema_version
=schema_version
,
2319 self
._process
_vim
_config
(target_id
, vim
)
2322 plugin_name
= "rovim_" + vim
["vim_type"]
2323 step
= "Loading plugin '{}'".format(plugin_name
)
2324 vim_module_conn
= self
._load
_plugin
(plugin_name
)
2325 step
= "Loading {}'".format(target_id
)
2326 self
.my_vims
[target_id
] = vim_module_conn(
2329 tenant_id
=vim
.get("vim_tenant_id"),
2330 tenant_name
=vim
.get("vim_tenant_name"),
2333 user
=vim
["vim_user"],
2334 passwd
=vim
["vim_password"],
2335 config
=vim
.get("config") or {},
2339 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
2340 step
= "Loading plugin '{}'".format(plugin_name
)
2341 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
2342 step
= "Loading {}'".format(target_id
)
2344 wim_config
= wim
.pop("config", {}) or {}
2345 wim
["uuid"] = wim
["_id"]
2346 if "url" in wim
and "wim_url" not in wim
:
2347 wim
["wim_url"] = wim
["url"]
2348 elif "url" not in wim
and "wim_url" in wim
:
2349 wim
["url"] = wim
["wim_url"]
2352 wim_config
["dpid"] = wim
.pop("dpid")
2354 if wim
.get("switch_id"):
2355 wim_config
["switch_id"] = wim
.pop("switch_id")
2357 # wim, wim_account, config
2358 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
2359 self
.db_vims
[target_id
] = vim
2360 self
.error_status
= None
2363 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
2365 except Exception as e
:
2367 "Cannot load {} plugin={}: {} {}".format(
2368 target_id
, plugin_name
, step
, e
2372 self
.db_vims
[target_id
] = vim
or {}
2373 self
.db_vims
[target_id
] = FailingConnector(str(e
))
2374 error_status
= "{} Error: {}".format(step
, e
)
2378 if target_id
not in self
.vim_targets
:
2379 self
.vim_targets
.append(target_id
)
2381 def _get_db_task(self
):
2383 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
2388 if not self
.time_last_task_processed
:
2389 self
.time_last_task_processed
= now
2394 # Log RO tasks only when loglevel is DEBUG
2395 if self.logger.getEffectiveLevel() == logging.DEBUG:
2402 + str(self.task_locked_time)
2404 + "time_last_task_processed="
2405 + str(self.time_last_task_processed)
2411 locked
= self
.db
.set_one(
2414 "target_id": self
.vim_targets
,
2415 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2416 "locked_at.lt": now
- self
.task_locked_time
,
2417 "to_check_at.lt": self
.time_last_task_processed
,
2418 "to_check_at.gt": -1,
2420 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
2421 fail_on_empty
=False,
2426 ro_task
= self
.db
.get_one(
2429 "target_id": self
.vim_targets
,
2430 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2436 if self
.time_last_task_processed
== now
:
2437 self
.time_last_task_processed
= None
2440 self
.time_last_task_processed
= now
2441 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2443 except DbException
as e
:
2444 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
2445 except Exception as e
:
2446 self
.logger
.critical(
2447 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
2452 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2454 Determine if this task need to be done or superseded
2457 my_task
= ro_task
["tasks"][task_index
]
2458 task_id
= my_task
["task_id"]
2459 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
2460 "created_items", False
2463 self
.logger
.debug("Needed delete: {}".format(needed_delete
))
2464 if my_task
["status"] == "FAILED":
2465 return None, None # TODO need to be retry??
2468 for index
, task
in enumerate(ro_task
["tasks"]):
2469 if index
== task_index
or not task
:
2473 my_task
["target_record"] == task
["target_record"]
2474 and task
["action"] == "CREATE"
2477 db_update
["tasks.{}.status".format(index
)] = task
["status"] = (
2480 elif task
["action"] == "CREATE" and task
["status"] not in (
2484 needed_delete
= False
2488 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
2490 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
2492 return "SUPERSEDED", None
2493 except Exception as e
:
2494 if not isinstance(e
, NsWorkerException
):
2495 self
.logger
.critical(
2496 "Unexpected exception at _delete_task task={}: {}".format(
2502 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2504 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2506 Determine if this task need to create something at VIM
2509 my_task
= ro_task
["tasks"][task_index
]
2510 task_id
= my_task
["task_id"]
2512 if my_task
["status"] == "FAILED":
2513 return None, None # TODO need to be retry??
2514 elif my_task
["status"] == "SCHEDULED":
2515 # check if already created by another task
2516 for index
, task
in enumerate(ro_task
["tasks"]):
2517 if index
== task_index
or not task
:
2520 if task
["action"] == "CREATE" and task
["status"] not in (
2525 return task
["status"], "COPY_VIM_INFO"
2528 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2529 ro_task
, task_index
, task_depends
2531 # TODO update other CREATE tasks
2532 except Exception as e
:
2533 if not isinstance(e
, NsWorkerException
):
2535 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2538 task_status
= "FAILED"
2539 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2540 # TODO update ro_vim_item_update
2542 return task_status
, ro_vim_item_update
2546 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2548 Look for dependency task
2549 :param task_id: Can be one of
2550 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2551 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2552 3. task.task_id: "<action_id>:number"
2555 :return: database ro_task plus index of task
2558 task_id
.startswith("vim:")
2559 or task_id
.startswith("sdn:")
2560 or task_id
.startswith("wim:")
2562 target_id
, _
, task_id
= task_id
.partition(" ")
2564 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2565 ro_task_dependency
= self
.db
.get_one(
2567 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2568 fail_on_empty
=False,
2571 if ro_task_dependency
:
2572 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2573 if task
["target_record_id"] == task_id
:
2574 return ro_task_dependency
, task_index
2578 for task_index
, task
in enumerate(ro_task
["tasks"]):
2579 if task
and task
["task_id"] == task_id
:
2580 return ro_task
, task_index
2582 ro_task_dependency
= self
.db
.get_one(
2585 "tasks.ANYINDEX.task_id": task_id
,
2586 "tasks.ANYINDEX.target_record.ne": None,
2588 fail_on_empty
=False,
2591 self
.logger
.debug("ro_task_dependency={}".format(ro_task_dependency
))
2592 if ro_task_dependency
:
2593 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2594 if task
["task_id"] == task_id
:
2595 return ro_task_dependency
, task_index
2596 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2598 def update_vm_refresh(self
, ro_task
):
2599 """Enables the VM status updates if self.refresh_config.active parameter
2600 is not -1 and then updates the DB accordingly
2604 self
.logger
.debug("Checking if VM status update config")
2605 next_refresh
= time
.time()
2606 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2608 if next_refresh
!= -1:
2609 db_ro_task_update
= {}
2611 next_check_at
= now
+ (24 * 60 * 60)
2612 next_check_at
= min(next_check_at
, next_refresh
)
2613 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2614 db_ro_task_update
["to_check_at"] = next_check_at
2617 "Finding tasks which to be updated to enable VM status updates"
2619 refresh_tasks
= self
.db
.get_list(
2622 "tasks.status": "DONE",
2623 "to_check_at.lt": 0,
2626 self
.logger
.debug("Updating tasks to change the to_check_at status")
2627 for task
in refresh_tasks
:
2634 update_dict
=db_ro_task_update
,
2638 except Exception as e
:
2639 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2641 def _get_next_refresh(self
, ro_task
: dict, next_refresh
: float):
2642 """Decide the next_refresh according to vim type and refresh config period.
2644 ro_task (dict): ro_task details
2645 next_refresh (float): next refresh time as epoch format
2648 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2650 target_vim
= ro_task
["target_id"]
2651 vim_type
= self
.db_vims
[target_vim
]["vim_type"]
2652 if self
.refresh_config
.active
== -1 or vim_type
== "openstack":
2655 next_refresh
+= self
.refresh_config
.active
2658 def _process_pending_tasks(self
, ro_task
):
2659 ro_task_id
= ro_task
["_id"]
2662 next_check_at
= now
+ (24 * 60 * 60)
2663 db_ro_task_update
= {}
2665 def _update_refresh(new_status
):
2666 # compute next_refresh
2668 nonlocal next_check_at
2669 nonlocal db_ro_task_update
2672 next_refresh
= time
.time()
2674 if task
["item"] in ("image", "flavor"):
2675 next_refresh
+= self
.refresh_config
.image
2676 elif new_status
== "BUILD":
2677 next_refresh
+= self
.refresh_config
.build
2678 elif new_status
== "DONE":
2679 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2681 next_refresh
+= self
.refresh_config
.error
2683 next_check_at
= min(next_check_at
, next_refresh
)
2684 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2685 ro_task
["vim_info"]["refresh_at"] = next_refresh
2689 # Log RO tasks only when loglevel is DEBUG
2690 if self.logger.getEffectiveLevel() == logging.DEBUG:
2691 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2693 # Check if vim status refresh is enabled again
2694 self
.update_vm_refresh(ro_task
)
2695 # 0: get task_status_create
2697 task_status_create
= None
2701 for t
in ro_task
["tasks"]
2703 and t
["action"] == "CREATE"
2704 and t
["status"] in ("BUILD", "DONE")
2710 task_status_create
= task_create
["status"]
2712 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2713 for task_action
in ("DELETE", "CREATE", "EXEC"):
2714 db_vim_update
= None
2717 for task_index
, task
in enumerate(ro_task
["tasks"]):
2719 continue # task deleted
2722 target_update
= None
2726 task_action
in ("DELETE", "EXEC")
2727 and task
["status"] not in ("SCHEDULED", "BUILD")
2729 or task
["action"] != task_action
2731 task_action
== "CREATE"
2732 and task
["status"] in ("FINISHED", "SUPERSEDED")
2737 task_path
= "tasks.{}.status".format(task_index
)
2739 db_vim_info_update
= None
2740 dependency_ro_task
= {}
2742 if task
["status"] == "SCHEDULED":
2743 # check if tasks that this depends on have been completed
2744 dependency_not_completed
= False
2746 for dependency_task_id
in task
.get("depends_on") or ():
2749 dependency_task_index
,
2750 ) = self
._get
_dependency
(
2751 dependency_task_id
, target_id
=ro_task
["target_id"]
2753 dependency_task
= dependency_ro_task
["tasks"][
2754 dependency_task_index
2757 "dependency_ro_task={} dependency_task_index={}".format(
2758 dependency_ro_task
, dependency_task_index
2762 if dependency_task
["status"] == "SCHEDULED":
2763 dependency_not_completed
= True
2764 next_check_at
= min(
2765 next_check_at
, dependency_ro_task
["to_check_at"]
2767 # must allow dependent task to be processed first
2768 # to do this set time after last_task_processed
2769 next_check_at
= max(
2770 self
.time_last_task_processed
, next_check_at
2773 elif dependency_task
["status"] == "FAILED":
2774 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2777 dependency_task
["action"],
2778 dependency_task
["item"],
2780 dependency_ro_task
["vim_info"].get(
2785 "task={} {}".format(task
["task_id"], error_text
)
2787 raise NsWorkerException(error_text
)
2789 task_depends
[dependency_task_id
] = dependency_ro_task
[
2792 task_depends
["TASK-{}".format(dependency_task_id
)] = (
2793 dependency_ro_task
["vim_info"]["vim_id"]
2796 if dependency_not_completed
:
2797 self
.logger
.warning(
2798 "DEPENDENCY NOT COMPLETED {}".format(
2799 dependency_ro_task
["vim_info"]["vim_id"]
2802 # TODO set at vim_info.vim_details that it is waiting
2805 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2806 # the task of renew this locking. It will update database locket_at periodically
2808 lock_object
= LockRenew
.add_lock_object(
2809 "ro_tasks", ro_task
, self
2811 if task
["action"] == "DELETE":
2815 ) = self
._delete
_task
(
2816 ro_task
, task_index
, task_depends
, db_ro_task_update
2819 "FINISHED" if new_status
== "DONE" else new_status
2821 # ^with FINISHED instead of DONE it will not be refreshing
2823 if new_status
in ("FINISHED", "SUPERSEDED"):
2824 target_update
= "DELETE"
2825 elif task
["action"] == "EXEC":
2830 ) = self
.item2class
[task
["item"]].exec(
2831 ro_task
, task_index
, task_depends
2834 "FINISHED" if new_status
== "DONE" else new_status
2836 # ^with FINISHED instead of DONE it will not be refreshing
2839 # load into database the modified db_task_update "retries" and "next_retry"
2840 if db_task_update
.get("retries"):
2842 "tasks.{}.retries".format(task_index
)
2843 ] = db_task_update
["retries"]
2845 next_check_at
= time
.time() + db_task_update
.get(
2848 target_update
= None
2849 elif task
["action"] == "CREATE":
2850 if task
["status"] == "SCHEDULED":
2851 if task_status_create
:
2852 new_status
= task_status_create
2853 target_update
= "COPY_VIM_INFO"
2855 new_status
, db_vim_info_update
= self
.item2class
[
2857 ].new(ro_task
, task_index
, task_depends
)
2858 _update_refresh(new_status
)
2860 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2861 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2865 ) = self
.item2class
[
2868 _update_refresh(new_status
)
2870 # The refresh is updated to avoid set the value of "refresh_at" to
2871 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2872 # because it can happen that in this case the task is never processed
2873 _update_refresh(task
["status"])
2875 except Exception as e
:
2876 new_status
= "FAILED"
2877 db_vim_info_update
= {
2878 "vim_status": "VIM_ERROR",
2879 "vim_message": str(e
),
2883 e
, (NsWorkerException
, vimconn
.VimConnException
)
2886 "Unexpected exception at _delete_task task={}: {}".format(
2893 if db_vim_info_update
:
2894 db_vim_update
= db_vim_info_update
.copy()
2895 db_ro_task_update
.update(
2898 for k
, v
in db_vim_info_update
.items()
2901 ro_task
["vim_info"].update(db_vim_info_update
)
2904 if task_action
== "CREATE":
2905 task_status_create
= new_status
2906 db_ro_task_update
[task_path
] = new_status
2908 if target_update
or db_vim_update
:
2909 if target_update
== "DELETE":
2910 self
._update
_target
(task
, None)
2911 elif target_update
== "COPY_VIM_INFO":
2912 self
._update
_target
(task
, ro_task
["vim_info"])
2914 self
._update
_target
(task
, db_vim_update
)
2916 except Exception as e
:
2918 isinstance(e
, DbException
)
2919 and e
.http_code
== HTTPStatus
.NOT_FOUND
2921 # if the vnfrs or nsrs has been removed from database, this task must be removed
2923 "marking to delete task={}".format(task
["task_id"])
2925 self
.tasks_to_delete
.append(task
)
2928 "Unexpected exception at _update_target task={}: {}".format(
2934 locked_at
= ro_task
["locked_at"]
2938 lock_object
["locked_at"],
2939 lock_object
["locked_at"] + self
.task_locked_time
,
2941 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2942 # contain exactly locked_at + self.task_locked_time
2943 LockRenew
.remove_lock_object(lock_object
)
2946 "_id": ro_task
["_id"],
2947 "to_check_at": ro_task
["to_check_at"],
2948 "locked_at": locked_at
,
2950 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2951 # outside this task (by ro_nbi) do not update it
2952 db_ro_task_update
["locked_by"] = None
2953 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2954 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2955 db_ro_task_update
["modified_at"] = now
2956 db_ro_task_update
["to_check_at"] = next_check_at
2959 # Log RO tasks only when loglevel is DEBUG
2960 if self.logger.getEffectiveLevel() == logging.DEBUG:
2961 db_ro_task_update_log = db_ro_task_update.copy()
2962 db_ro_task_update_log["_id"] = q_filter["_id"]
2963 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2966 if not self
.db
.set_one(
2968 update_dict
=db_ro_task_update
,
2970 fail_on_empty
=False,
2972 del db_ro_task_update
["to_check_at"]
2973 del q_filter
["to_check_at"]
2975 # Log RO tasks only when loglevel is DEBUG
2976 if self.logger.getEffectiveLevel() == logging.DEBUG:
2979 db_ro_task_update_log,
2982 "SET_TASK " + str(q_filter),
2988 update_dict
=db_ro_task_update
,
2991 except DbException
as e
:
2993 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2995 except Exception as e
:
2997 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
3000 def _update_target(self
, task
, ro_vim_item_update
):
3001 table
, _
, temp
= task
["target_record"].partition(":")
3002 _id
, _
, path_vim_status
= temp
.partition(":")
3003 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
3004 path_item
= path_item
[: path_item
.rfind(".")]
3005 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
3006 # path_item: dot separated list targeting record information, e.g. "vdur.10"
3008 if ro_vim_item_update
:
3010 path_vim_status
+ "." + k
: v
3011 for k
, v
in ro_vim_item_update
.items()
3020 "interfaces_backup",
3024 if path_vim_status
.startswith("vdur."):
3025 # for backward compatibility, add vdur.name apart from vdur.vim_name
3026 if ro_vim_item_update
.get("vim_name"):
3027 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
3029 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
3030 if ro_vim_item_update
.get("vim_id"):
3031 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
3033 # update general status
3034 if ro_vim_item_update
.get("vim_status"):
3035 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
3039 if ro_vim_item_update
.get("interfaces"):
3040 path_interfaces
= path_item
+ ".interfaces"
3042 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
3046 path_interfaces
+ ".{}.".format(i
) + k
: v
3047 for k
, v
in iface
.items()
3048 if k
in ("vlan", "compute_node", "pci")
3052 # put ip_address and mac_address with ip-address and mac-address
3053 if iface
.get("ip_address"):
3055 path_interfaces
+ ".{}.".format(i
) + "ip-address"
3056 ] = iface
["ip_address"]
3058 if iface
.get("mac_address"):
3060 path_interfaces
+ ".{}.".format(i
) + "mac-address"
3061 ] = iface
["mac_address"]
3063 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
3064 update_dict
["ip-address"] = iface
.get("ip_address").split(
3068 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
3069 update_dict
[path_item
+ ".ip-address"] = iface
.get(
3073 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
3075 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
3076 if ro_vim_item_update
.get("interfaces"):
3077 search_key
= path_vim_status
+ ".interfaces"
3078 if update_dict
.get(search_key
):
3079 interfaces_backup_update
= {
3080 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
3085 q_filter
={"_id": _id
},
3086 update_dict
=interfaces_backup_update
,
3090 update_dict
= {path_item
+ ".status": "DELETED"}
3093 q_filter
={"_id": _id
},
3094 update_dict
=update_dict
,
3095 unset
={path_vim_status
: None},
3098 def _process_delete_db_tasks(self
):
3100 Delete task from database because vnfrs or nsrs or both have been deleted
3101 :return: None. Uses and modify self.tasks_to_delete
3103 while self
.tasks_to_delete
:
3104 task
= self
.tasks_to_delete
[0]
3105 vnfrs_deleted
= None
3106 nsr_id
= task
["nsr_id"]
3108 if task
["target_record"].startswith("vnfrs:"):
3109 # check if nsrs is present
3110 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
3111 vnfrs_deleted
= task
["target_record"].split(":")[1]
3114 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
3115 except Exception as e
:
3117 "Error deleting task={}: {}".format(task
["task_id"], e
)
3119 self
.tasks_to_delete
.pop(0)
3122 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
3124 Static method because it is called from osm_ng_ro.ns
3125 :param db: instance of database to use
3126 :param nsr_id: affected nsrs id
3127 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
3128 :return: None, exception is fails
3131 for retry
in range(retries
):
3132 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
3136 for ro_task
in ro_tasks
:
3138 to_delete_ro_task
= True
3140 for index
, task
in enumerate(ro_task
["tasks"]):
3143 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
3145 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
3147 db_update
["tasks.{}".format(index
)] = None
3149 # used by other nsr, ro_task cannot be deleted
3150 to_delete_ro_task
= False
3152 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
3153 if to_delete_ro_task
:
3157 "_id": ro_task
["_id"],
3158 "modified_at": ro_task
["modified_at"],
3160 fail_on_empty
=False,
3164 db_update
["modified_at"] = now
3168 "_id": ro_task
["_id"],
3169 "modified_at": ro_task
["modified_at"],
3171 update_dict
=db_update
,
3172 fail_on_empty
=False,
3178 raise NsWorkerException("Exceeded {} retries".format(retries
))
3182 self
.logger
.info("Starting")
3184 # step 1: get commands from queue
3186 if self
.vim_targets
:
3187 task
= self
.task_queue
.get(block
=False)
3190 self
.logger
.debug("enters in idle state")
3192 task
= self
.task_queue
.get(block
=True)
3195 if task
[0] == "terminate":
3197 elif task
[0] == "load_vim":
3198 self
.logger
.info("order to load vim {}".format(task
[1]))
3199 self
._load
_vim
(task
[1])
3200 elif task
[0] == "unload_vim":
3201 self
.logger
.info("order to unload vim {}".format(task
[1]))
3202 self
._unload
_vim
(task
[1])
3203 elif task
[0] == "reload_vim":
3204 self
._reload
_vim
(task
[1])
3205 elif task
[0] == "check_vim":
3206 self
.logger
.info("order to check vim {}".format(task
[1]))
3207 self
._check
_vim
(task
[1])
3209 except Exception as e
:
3210 if isinstance(e
, queue
.Empty
):
3213 self
.logger
.critical(
3214 "Error processing task: {}".format(e
), exc_info
=True
3217 # step 2: process pending_tasks, delete not needed tasks
3219 if self
.tasks_to_delete
:
3220 self
._process
_delete
_db
_tasks
()
3223 # Log RO tasks only when loglevel is DEBUG
3224 if self.logger.getEffectiveLevel() == logging.DEBUG:
3225 _ = self._get_db_all_tasks()
3227 ro_task
= self
._get
_db
_task
()
3229 self
.logger
.debug("Task to process: {}".format(ro_task
))
3231 self
._process
_pending
_tasks
(ro_task
)
3235 except Exception as e
:
3236 self
.logger
.critical(
3237 "Unexpected exception at run: " + str(e
), exc_info
=True
3240 self
.logger
.info("Finishing")