1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
30 from os
import makedirs
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
43 from osm_ro_plugin
import vimconn
44 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
45 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
219 "task={} {} new-net={} created={}".format(
220 task_id
, ro_task
["target_id"], vim_net_id
, created
224 return "BUILD", ro_vim_item_update
225 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
227 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
229 ro_vim_item_update
= {
230 "vim_status": "VIM_ERROR",
232 "vim_message": str(e
),
235 return "FAILED", ro_vim_item_update
237 def refresh(self
, ro_task
):
238 """Call VIM to get network status"""
239 ro_task_id
= ro_task
["_id"]
240 target_vim
= self
.my_vims
[ro_task
["target_id"]]
241 vim_id
= ro_task
["vim_info"]["vim_id"]
242 net_to_refresh_list
= [vim_id
]
245 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
246 vim_info
= vim_dict
[vim_id
]
248 if vim_info
["status"] == "ACTIVE":
250 elif vim_info
["status"] == "BUILD":
251 task_status
= "BUILD"
253 task_status
= "FAILED"
254 except vimconn
.VimConnException
as e
:
255 # Mark all tasks at VIM_ERROR status
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id
, ro_task
["target_id"], vim_id
, e
261 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
262 task_status
= "FAILED"
264 ro_vim_item_update
= {}
265 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
266 ro_vim_item_update
["vim_status"] = vim_info
["status"]
268 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
269 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
271 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
273 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
274 elif vim_info
["status"] == "DELETED":
275 ro_vim_item_update
["vim_id"] = None
276 ro_vim_item_update
["vim_message"] = "Deleted externally"
278 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
279 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
281 if ro_vim_item_update
:
283 "ro_task={} {} get-net={}: status={} {}".format(
285 ro_task
["target_id"],
287 ro_vim_item_update
.get("vim_status"),
288 ro_vim_item_update
.get("vim_message")
289 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
294 return task_status
, ro_vim_item_update
296 def delete(self
, ro_task
, task_index
):
297 task
= ro_task
["tasks"][task_index
]
298 task_id
= task
["task_id"]
299 net_vim_id
= ro_task
["vim_info"]["vim_id"]
300 ro_vim_item_update_ok
= {
301 "vim_status": "DELETED",
303 "vim_message": "DELETED",
308 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
309 target_vim
= self
.my_vims
[ro_task
["target_id"]]
310 target_vim
.delete_network(
311 net_vim_id
, ro_task
["vim_info"]["created_items"]
313 except vimconn
.VimConnNotFoundException
:
314 ro_vim_item_update_ok
["vim_message"] = "already deleted"
315 except vimconn
.VimConnException
as e
:
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
321 ro_vim_item_update
= {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e
),
326 return "FAILED", ro_vim_item_update
329 "task={} {} del-net={} {}".format(
331 ro_task
["target_id"],
333 ro_vim_item_update_ok
.get("vim_message", ""),
337 return "DONE", ro_vim_item_update_ok
340 class VimInteractionClassification(VimInteractionBase
):
341 def new(self
, ro_task
, task_index
, task_depends
):
342 task
= ro_task
["tasks"][task_index
]
343 task_id
= task
["task_id"]
345 target_vim
= self
.my_vims
[ro_task
["target_id"]]
349 params
= task
["params"]
350 params_copy
= deepcopy(params
)
352 name
= params_copy
.pop("name")
353 logical_source_port_index
= int(
354 params_copy
.pop("logical_source_port_index")
356 logical_source_port
= params_copy
["logical_source_port"]
358 if logical_source_port
.startswith("TASK-"):
359 vm_id
= task_depends
[logical_source_port
]
360 params_copy
["logical_source_port"] = target_vim
.refresh_vms_status(
362 )[vm_id
]["interfaces"][logical_source_port_index
]["vim_interface_id"]
364 vim_classification_id
= target_vim
.new_classification(
365 name
, "legacy_flow_classifier", params_copy
368 ro_vim_item_update
= {
369 "vim_id": vim_classification_id
,
370 "vim_status": "DONE",
376 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
379 return "DONE", ro_vim_item_update
380 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
381 self
.logger
.debug(traceback
.format_exc())
383 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
385 ro_vim_item_update
= {
386 "vim_status": "VIM_ERROR",
388 "vim_message": str(e
),
391 return "FAILED", ro_vim_item_update
393 def delete(self
, ro_task
, task_index
):
394 task
= ro_task
["tasks"][task_index
]
395 task_id
= task
["task_id"]
396 classification_vim_id
= ro_task
["vim_info"]["vim_id"]
397 ro_vim_item_update_ok
= {
398 "vim_status": "DELETED",
400 "vim_message": "DELETED",
405 if classification_vim_id
:
406 target_vim
= self
.my_vims
[ro_task
["target_id"]]
407 target_vim
.delete_classification(classification_vim_id
)
408 except vimconn
.VimConnNotFoundException
:
409 ro_vim_item_update_ok
["vim_message"] = "already deleted"
410 except vimconn
.VimConnException
as e
:
412 "ro_task={} vim={} del-classification={}: {}".format(
413 ro_task
["_id"], ro_task
["target_id"], classification_vim_id
, e
416 ro_vim_item_update
= {
417 "vim_status": "VIM_ERROR",
418 "vim_message": "Error while deleting: {}".format(e
),
421 return "FAILED", ro_vim_item_update
424 "task={} {} del-classification={} {}".format(
426 ro_task
["target_id"],
427 classification_vim_id
,
428 ro_vim_item_update_ok
.get("vim_message", ""),
432 return "DONE", ro_vim_item_update_ok
435 class VimInteractionSfi(VimInteractionBase
):
436 def new(self
, ro_task
, task_index
, task_depends
):
437 task
= ro_task
["tasks"][task_index
]
438 task_id
= task
["task_id"]
440 target_vim
= self
.my_vims
[ro_task
["target_id"]]
444 params
= task
["params"]
445 params_copy
= deepcopy(params
)
446 name
= params_copy
["name"]
447 ingress_port
= params_copy
["ingress_port"]
448 egress_port
= params_copy
["egress_port"]
449 ingress_port_index
= params_copy
["ingress_port_index"]
450 egress_port_index
= params_copy
["egress_port_index"]
452 ingress_port_id
= ingress_port
453 egress_port_id
= egress_port
455 vm_id
= task_depends
[ingress_port
]
457 if ingress_port
.startswith("TASK-"):
458 ingress_port_id
= target_vim
.refresh_vms_status([vm_id
])[vm_id
][
460 ][ingress_port_index
]["vim_interface_id"]
462 if ingress_port
== egress_port
:
463 egress_port_id
= ingress_port_id
465 if egress_port
.startswith("TASK-"):
466 egress_port_id
= target_vim
.refresh_vms_status([vm_id
])[vm_id
][
468 ][egress_port_index
]["vim_interface_id"]
470 ingress_port_id_list
= [ingress_port_id
]
471 egress_port_id_list
= [egress_port_id
]
473 vim_sfi_id
= target_vim
.new_sfi(
474 name
, ingress_port_id_list
, egress_port_id_list
, sfc_encap
=False
477 ro_vim_item_update
= {
478 "vim_id": vim_sfi_id
,
479 "vim_status": "DONE",
485 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
488 return "DONE", ro_vim_item_update
489 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
490 self
.logger
.debug(traceback
.format_exc())
492 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
494 ro_vim_item_update
= {
495 "vim_status": "VIM_ERROR",
497 "vim_message": str(e
),
500 return "FAILED", ro_vim_item_update
502 def delete(self
, ro_task
, task_index
):
503 task
= ro_task
["tasks"][task_index
]
504 task_id
= task
["task_id"]
505 sfi_vim_id
= ro_task
["vim_info"]["vim_id"]
506 ro_vim_item_update_ok
= {
507 "vim_status": "DELETED",
509 "vim_message": "DELETED",
515 target_vim
= self
.my_vims
[ro_task
["target_id"]]
516 target_vim
.delete_sfi(sfi_vim_id
)
517 except vimconn
.VimConnNotFoundException
:
518 ro_vim_item_update_ok
["vim_message"] = "already deleted"
519 except vimconn
.VimConnException
as e
:
521 "ro_task={} vim={} del-sfi={}: {}".format(
522 ro_task
["_id"], ro_task
["target_id"], sfi_vim_id
, e
525 ro_vim_item_update
= {
526 "vim_status": "VIM_ERROR",
527 "vim_message": "Error while deleting: {}".format(e
),
530 return "FAILED", ro_vim_item_update
533 "task={} {} del-sfi={} {}".format(
535 ro_task
["target_id"],
537 ro_vim_item_update_ok
.get("vim_message", ""),
541 return "DONE", ro_vim_item_update_ok
544 class VimInteractionSf(VimInteractionBase
):
545 def new(self
, ro_task
, task_index
, task_depends
):
546 task
= ro_task
["tasks"][task_index
]
547 task_id
= task
["task_id"]
549 target_vim
= self
.my_vims
[ro_task
["target_id"]]
553 params
= task
["params"]
554 params_copy
= deepcopy(params
)
555 name
= params_copy
["name"]
556 sfi_list
= params_copy
["sfis"]
560 sfi_id
= task_depends
[sfi
] if sfi
.startswith("TASK-") else sfi
561 sfi_id_list
.append(sfi_id
)
563 vim_sf_id
= target_vim
.new_sf(name
, sfi_id_list
, sfc_encap
=False)
565 ro_vim_item_update
= {
567 "vim_status": "DONE",
573 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
576 return "DONE", ro_vim_item_update
577 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
578 self
.logger
.debug(traceback
.format_exc())
580 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
582 ro_vim_item_update
= {
583 "vim_status": "VIM_ERROR",
585 "vim_message": str(e
),
588 return "FAILED", ro_vim_item_update
590 def delete(self
, ro_task
, task_index
):
591 task
= ro_task
["tasks"][task_index
]
592 task_id
= task
["task_id"]
593 sf_vim_id
= ro_task
["vim_info"]["vim_id"]
594 ro_vim_item_update_ok
= {
595 "vim_status": "DELETED",
597 "vim_message": "DELETED",
603 target_vim
= self
.my_vims
[ro_task
["target_id"]]
604 target_vim
.delete_sf(sf_vim_id
)
605 except vimconn
.VimConnNotFoundException
:
606 ro_vim_item_update_ok
["vim_message"] = "already deleted"
607 except vimconn
.VimConnException
as e
:
609 "ro_task={} vim={} del-sf={}: {}".format(
610 ro_task
["_id"], ro_task
["target_id"], sf_vim_id
, e
613 ro_vim_item_update
= {
614 "vim_status": "VIM_ERROR",
615 "vim_message": "Error while deleting: {}".format(e
),
618 return "FAILED", ro_vim_item_update
621 "task={} {} del-sf={} {}".format(
623 ro_task
["target_id"],
625 ro_vim_item_update_ok
.get("vim_message", ""),
629 return "DONE", ro_vim_item_update_ok
632 class VimInteractionSfp(VimInteractionBase
):
633 def new(self
, ro_task
, task_index
, task_depends
):
634 task
= ro_task
["tasks"][task_index
]
635 task_id
= task
["task_id"]
637 target_vim
= self
.my_vims
[ro_task
["target_id"]]
641 params
= task
["params"]
642 params_copy
= deepcopy(params
)
643 name
= params_copy
["name"]
644 sf_list
= params_copy
["sfs"]
645 classification_list
= params_copy
["classifications"]
647 classification_id_list
= []
650 for classification
in classification_list
:
652 task_depends
[classification
]
653 if classification
.startswith("TASK-")
656 classification_id_list
.append(classi_id
)
659 sf_id
= task_depends
[sf
] if sf
.startswith("TASK-") else sf
660 sf_id_list
.append(sf_id
)
662 vim_sfp_id
= target_vim
.new_sfp(
663 name
, classification_id_list
, sf_id_list
, sfc_encap
=False
666 ro_vim_item_update
= {
667 "vim_id": vim_sfp_id
,
668 "vim_status": "DONE",
674 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
677 return "DONE", ro_vim_item_update
678 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
679 self
.logger
.debug(traceback
.format_exc())
681 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
683 ro_vim_item_update
= {
684 "vim_status": "VIM_ERROR",
686 "vim_message": str(e
),
689 return "FAILED", ro_vim_item_update
691 def delete(self
, ro_task
, task_index
):
692 task
= ro_task
["tasks"][task_index
]
693 task_id
= task
["task_id"]
694 sfp_vim_id
= ro_task
["vim_info"]["vim_id"]
695 ro_vim_item_update_ok
= {
696 "vim_status": "DELETED",
698 "vim_message": "DELETED",
704 target_vim
= self
.my_vims
[ro_task
["target_id"]]
705 target_vim
.delete_sfp(sfp_vim_id
)
706 except vimconn
.VimConnNotFoundException
:
707 ro_vim_item_update_ok
["vim_message"] = "already deleted"
708 except vimconn
.VimConnException
as e
:
710 "ro_task={} vim={} del-sfp={}: {}".format(
711 ro_task
["_id"], ro_task
["target_id"], sfp_vim_id
, e
714 ro_vim_item_update
= {
715 "vim_status": "VIM_ERROR",
716 "vim_message": "Error while deleting: {}".format(e
),
719 return "FAILED", ro_vim_item_update
722 "task={} {} del-sfp={} {}".format(
724 ro_task
["target_id"],
726 ro_vim_item_update_ok
.get("vim_message", ""),
730 return "DONE", ro_vim_item_update_ok
733 class VimInteractionVdu(VimInteractionBase
):
734 max_retries_inject_ssh_key
= 20 # 20 times
735 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
737 def new(self
, ro_task
, task_index
, task_depends
):
738 task
= ro_task
["tasks"][task_index
]
739 task_id
= task
["task_id"]
741 target_vim
= self
.my_vims
[ro_task
["target_id"]]
744 params
= task
["params"]
745 params_copy
= deepcopy(params
)
746 net_list
= params_copy
["net_list"]
749 # change task_id into network_id
750 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
751 network_id
= task_depends
[net
["net_id"]]
754 raise NsWorkerException(
755 "Cannot create VM because depends on a network not created or found "
756 "for {}".format(net
["net_id"])
759 net
["net_id"] = network_id
761 if params_copy
["image_id"].startswith("TASK-"):
762 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
764 if params_copy
["flavor_id"].startswith("TASK-"):
765 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
767 affinity_group_list
= params_copy
["affinity_group_list"]
768 for affinity_group
in affinity_group_list
:
769 # change task_id into affinity_group_id
770 if "affinity_group_id" in affinity_group
and affinity_group
[
772 ].startswith("TASK-"):
773 affinity_group_id
= task_depends
[
774 affinity_group
["affinity_group_id"]
777 if not affinity_group_id
:
778 raise NsWorkerException(
779 "found for {}".format(affinity_group
["affinity_group_id"])
782 affinity_group
["affinity_group_id"] = affinity_group_id
783 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
784 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
786 # add to created items previous_created_volumes (healing)
787 if task
.get("previous_created_volumes"):
788 for k
, v
in task
["previous_created_volumes"].items():
791 ro_vim_item_update
= {
793 "vim_status": "BUILD",
795 "created_items": created_items
,
798 "interfaces_vim_ids": interfaces
,
800 "interfaces_backup": [],
803 "task={} {} new-vm={} created={}".format(
804 task_id
, ro_task
["target_id"], vim_vm_id
, created
808 return "BUILD", ro_vim_item_update
809 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
810 self
.logger
.debug(traceback
.format_exc())
812 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
814 ro_vim_item_update
= {
815 "vim_status": "VIM_ERROR",
817 "vim_message": str(e
),
820 return "FAILED", ro_vim_item_update
822 def delete(self
, ro_task
, task_index
):
823 task
= ro_task
["tasks"][task_index
]
824 task_id
= task
["task_id"]
825 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
826 ro_vim_item_update_ok
= {
827 "vim_status": "DELETED",
829 "vim_message": "DELETED",
835 "delete_vminstance: vm_vim_id={} created_items={}".format(
836 vm_vim_id
, ro_task
["vim_info"]["created_items"]
839 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
840 target_vim
= self
.my_vims
[ro_task
["target_id"]]
841 target_vim
.delete_vminstance(
843 ro_task
["vim_info"]["created_items"],
844 ro_task
["vim_info"].get("volumes_to_hold", []),
846 except vimconn
.VimConnNotFoundException
:
847 ro_vim_item_update_ok
["vim_message"] = "already deleted"
848 except vimconn
.VimConnException
as e
:
850 "ro_task={} vim={} del-vm={}: {}".format(
851 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
854 ro_vim_item_update
= {
855 "vim_status": "VIM_ERROR",
856 "vim_message": "Error while deleting: {}".format(e
),
859 return "FAILED", ro_vim_item_update
862 "task={} {} del-vm={} {}".format(
864 ro_task
["target_id"],
866 ro_vim_item_update_ok
.get("vim_message", ""),
870 return "DONE", ro_vim_item_update_ok
872 def refresh(self
, ro_task
):
873 """Call VIM to get vm status"""
874 ro_task_id
= ro_task
["_id"]
875 target_vim
= self
.my_vims
[ro_task
["target_id"]]
876 vim_id
= ro_task
["vim_info"]["vim_id"]
881 vm_to_refresh_list
= [vim_id
]
883 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
884 vim_info
= vim_dict
[vim_id
]
886 if vim_info
["status"] == "ACTIVE":
888 elif vim_info
["status"] == "BUILD":
889 task_status
= "BUILD"
891 task_status
= "FAILED"
893 # try to load and parse vim_information
895 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
896 if vim_info_info
.get("name"):
897 vim_info
["name"] = vim_info_info
["name"]
898 except Exception as vim_info_error
:
899 self
.logger
.exception(
900 f
"{vim_info_error} occured while getting the vim_info from yaml"
902 except vimconn
.VimConnException
as e
:
903 # Mark all tasks at VIM_ERROR status
905 "ro_task={} vim={} get-vm={}: {}".format(
906 ro_task_id
, ro_task
["target_id"], vim_id
, e
909 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
910 task_status
= "FAILED"
912 ro_vim_item_update
= {}
914 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
916 if vim_info
.get("interfaces"):
917 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
921 for iface
in vim_info
["interfaces"]
922 if vim_iface_id
== iface
["vim_interface_id"]
927 # iface.pop("vim_info", None)
928 vim_interfaces
.append(iface
)
932 for t
in ro_task
["tasks"]
933 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
935 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
936 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
940 mgmt_vdu_iface
= task_create
.get(
941 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
944 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
946 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
947 ro_vim_item_update
["interfaces"] = vim_interfaces
949 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
950 ro_vim_item_update
["vim_status"] = vim_info
["status"]
952 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
953 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
955 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
956 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
957 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
958 elif vim_info
["status"] == "DELETED":
959 ro_vim_item_update
["vim_id"] = None
960 ro_vim_item_update
["vim_message"] = "Deleted externally"
962 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
963 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
965 if ro_vim_item_update
:
967 "ro_task={} {} get-vm={}: status={} {}".format(
969 ro_task
["target_id"],
971 ro_vim_item_update
.get("vim_status"),
972 ro_vim_item_update
.get("vim_message")
973 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
978 return task_status
, ro_vim_item_update
980 def exec(self
, ro_task
, task_index
, task_depends
):
981 task
= ro_task
["tasks"][task_index
]
982 task_id
= task
["task_id"]
983 target_vim
= self
.my_vims
[ro_task
["target_id"]]
984 db_task_update
= {"retries": 0}
985 retries
= task
.get("retries", 0)
988 params
= task
["params"]
989 params_copy
= deepcopy(params
)
990 params_copy
["ro_key"] = self
.db
.decrypt(
991 params_copy
.pop("private_key"),
992 params_copy
.pop("schema_version"),
993 params_copy
.pop("salt"),
995 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
996 target_vim
.inject_user_key(**params_copy
)
998 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
1005 ) # params_copy["key"]
1006 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1009 self
.logger
.debug(traceback
.format_exc())
1010 if retries
< self
.max_retries_inject_ssh_key
:
1016 "next_retry": self
.time_retries_inject_ssh_key
,
1021 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
1023 ro_vim_item_update
= {"vim_message": str(e
)}
1025 return "FAILED", ro_vim_item_update
, db_task_update
1028 class VimInteractionImage(VimInteractionBase
):
1029 def new(self
, ro_task
, task_index
, task_depends
):
1030 task
= ro_task
["tasks"][task_index
]
1031 task_id
= task
["task_id"]
1034 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1039 if task
.get("find_params"):
1040 vim_images
= target_vim
.get_image_list(
1041 task
["find_params"].get("filter_dict", {})
1045 raise NsWorkerExceptionNotFound(
1046 "Image not found with this criteria: '{}'".format(
1050 elif len(vim_images
) > 1:
1051 raise NsWorkerException(
1052 "More than one image found with this criteria: '{}'".format(
1057 vim_image_id
= vim_images
[0]["id"]
1059 ro_vim_item_update
= {
1060 "vim_id": vim_image_id
,
1061 "vim_status": "ACTIVE",
1063 "created_items": created_items
,
1064 "vim_details": None,
1065 "vim_message": None,
1068 "task={} {} new-image={} created={}".format(
1069 task_id
, ro_task
["target_id"], vim_image_id
, created
1073 return "DONE", ro_vim_item_update
1074 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
1076 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
1078 ro_vim_item_update
= {
1079 "vim_status": "VIM_ERROR",
1081 "vim_message": str(e
),
1084 return "FAILED", ro_vim_item_update
1087 class VimInteractionSharedVolume(VimInteractionBase
):
1088 def delete(self
, ro_task
, task_index
):
1089 task
= ro_task
["tasks"][task_index
]
1090 task_id
= task
["task_id"]
1091 shared_volume_vim_id
= ro_task
["vim_info"]["vim_id"]
1092 created_items
= ro_task
["vim_info"]["created_items"]
1093 ro_vim_item_update_ok
= {
1094 "vim_status": "DELETED",
1096 "vim_message": "DELETED",
1099 if created_items
and created_items
.get(shared_volume_vim_id
).get("keep"):
1100 ro_vim_item_update_ok
= {
1101 "vim_status": "ACTIVE",
1103 "vim_message": None,
1105 return "DONE", ro_vim_item_update_ok
1107 if shared_volume_vim_id
:
1108 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1109 target_vim
.delete_shared_volumes(shared_volume_vim_id
)
1110 except vimconn
.VimConnNotFoundException
:
1111 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1112 except vimconn
.VimConnException
as e
:
1114 "ro_task={} vim={} del-shared-volume={}: {}".format(
1115 ro_task
["_id"], ro_task
["target_id"], shared_volume_vim_id
, e
1118 ro_vim_item_update
= {
1119 "vim_status": "VIM_ERROR",
1120 "vim_message": "Error while deleting: {}".format(e
),
1123 return "FAILED", ro_vim_item_update
1126 "task={} {} del-shared-volume={} {}".format(
1128 ro_task
["target_id"],
1129 shared_volume_vim_id
,
1130 ro_vim_item_update_ok
.get("vim_message", ""),
1134 return "DONE", ro_vim_item_update_ok
1136 def new(self
, ro_task
, task_index
, task_depends
):
1137 task
= ro_task
["tasks"][task_index
]
1138 task_id
= task
["task_id"]
1141 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1144 shared_volume_vim_id
= None
1145 shared_volume_data
= None
1147 if task
.get("params"):
1148 shared_volume_data
= task
["params"]
1150 if shared_volume_data
:
1152 f
"Creating the new shared_volume for {shared_volume_data}\n"
1156 shared_volume_vim_id
,
1157 ) = target_vim
.new_shared_volumes(shared_volume_data
)
1159 created_items
[shared_volume_vim_id
] = {
1160 "name": shared_volume_name
,
1161 "keep": shared_volume_data
.get("keep"),
1164 ro_vim_item_update
= {
1165 "vim_id": shared_volume_vim_id
,
1166 "vim_status": "ACTIVE",
1168 "created_items": created_items
,
1169 "vim_details": None,
1170 "vim_message": None,
1173 "task={} {} new-shared-volume={} created={}".format(
1174 task_id
, ro_task
["target_id"], shared_volume_vim_id
, created
1178 return "DONE", ro_vim_item_update
1179 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1181 "task={} vim={} new-shared-volume:"
1182 " {}".format(task_id
, ro_task
["target_id"], e
)
1184 ro_vim_item_update
= {
1185 "vim_status": "VIM_ERROR",
1187 "vim_message": str(e
),
1190 return "FAILED", ro_vim_item_update
1193 class VimInteractionFlavor(VimInteractionBase
):
1194 def delete(self
, ro_task
, task_index
):
1195 task
= ro_task
["tasks"][task_index
]
1196 task_id
= task
["task_id"]
1197 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
1198 ro_vim_item_update_ok
= {
1199 "vim_status": "DELETED",
1201 "vim_message": "DELETED",
1207 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1208 target_vim
.delete_flavor(flavor_vim_id
)
1209 except vimconn
.VimConnNotFoundException
:
1210 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1211 except vimconn
.VimConnException
as e
:
1213 "ro_task={} vim={} del-flavor={}: {}".format(
1214 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
1217 ro_vim_item_update
= {
1218 "vim_status": "VIM_ERROR",
1219 "vim_message": "Error while deleting: {}".format(e
),
1222 return "FAILED", ro_vim_item_update
1225 "task={} {} del-flavor={} {}".format(
1227 ro_task
["target_id"],
1229 ro_vim_item_update_ok
.get("vim_message", ""),
1233 return "DONE", ro_vim_item_update_ok
1235 def new(self
, ro_task
, task_index
, task_depends
):
1236 task
= ro_task
["tasks"][task_index
]
1237 task_id
= task
["task_id"]
1240 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1243 vim_flavor_id
= None
1245 if task
.get("find_params", {}).get("vim_flavor_id"):
1246 vim_flavor_id
= task
["find_params"]["vim_flavor_id"]
1247 elif task
.get("find_params", {}).get("flavor_data"):
1249 flavor_data
= task
["find_params"]["flavor_data"]
1250 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
1251 except vimconn
.VimConnNotFoundException
as flavor_not_found_msg
:
1252 self
.logger
.warning(
1253 f
"VimConnNotFoundException occured: {flavor_not_found_msg}"
1256 if not vim_flavor_id
and task
.get("params"):
1258 flavor_data
= task
["params"]["flavor_data"]
1259 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
1262 ro_vim_item_update
= {
1263 "vim_id": vim_flavor_id
,
1264 "vim_status": "ACTIVE",
1266 "created_items": created_items
,
1267 "vim_details": None,
1268 "vim_message": None,
1271 "task={} {} new-flavor={} created={}".format(
1272 task_id
, ro_task
["target_id"], vim_flavor_id
, created
1276 return "DONE", ro_vim_item_update
1277 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1279 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
1281 ro_vim_item_update
= {
1282 "vim_status": "VIM_ERROR",
1284 "vim_message": str(e
),
1287 return "FAILED", ro_vim_item_update
1290 class VimInteractionAffinityGroup(VimInteractionBase
):
1291 def delete(self
, ro_task
, task_index
):
1292 task
= ro_task
["tasks"][task_index
]
1293 task_id
= task
["task_id"]
1294 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
1295 ro_vim_item_update_ok
= {
1296 "vim_status": "DELETED",
1298 "vim_message": "DELETED",
1303 if affinity_group_vim_id
:
1304 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1305 target_vim
.delete_affinity_group(affinity_group_vim_id
)
1306 except vimconn
.VimConnNotFoundException
:
1307 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1308 except vimconn
.VimConnException
as e
:
1310 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
1311 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
1314 ro_vim_item_update
= {
1315 "vim_status": "VIM_ERROR",
1316 "vim_message": "Error while deleting: {}".format(e
),
1319 return "FAILED", ro_vim_item_update
1322 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
1324 ro_task
["target_id"],
1325 affinity_group_vim_id
,
1326 ro_vim_item_update_ok
.get("vim_message", ""),
1330 return "DONE", ro_vim_item_update_ok
1332 def new(self
, ro_task
, task_index
, task_depends
):
1333 task
= ro_task
["tasks"][task_index
]
1334 task_id
= task
["task_id"]
1337 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1340 affinity_group_vim_id
= None
1341 affinity_group_data
= None
1342 param_affinity_group_id
= ""
1344 if task
.get("params"):
1345 affinity_group_data
= task
["params"].get("affinity_group_data")
1347 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
1349 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
1350 "vim-affinity-group-id"
1352 affinity_group_vim_id
= target_vim
.get_affinity_group(
1353 param_affinity_group_id
1355 except vimconn
.VimConnNotFoundException
:
1357 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
1358 "could not be found at VIM. Creating a new one.".format(
1359 task_id
, ro_task
["target_id"], param_affinity_group_id
1363 if not affinity_group_vim_id
and affinity_group_data
:
1364 affinity_group_vim_id
= target_vim
.new_affinity_group(
1369 ro_vim_item_update
= {
1370 "vim_id": affinity_group_vim_id
,
1371 "vim_status": "ACTIVE",
1373 "created_items": created_items
,
1374 "vim_details": None,
1375 "vim_message": None,
1378 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
1379 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
1383 return "DONE", ro_vim_item_update
1384 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1386 "task={} vim={} new-affinity-or-anti-affinity-group:"
1387 " {}".format(task_id
, ro_task
["target_id"], e
)
1389 ro_vim_item_update
= {
1390 "vim_status": "VIM_ERROR",
1392 "vim_message": str(e
),
1395 return "FAILED", ro_vim_item_update
1398 class VimInteractionUpdateVdu(VimInteractionBase
):
1399 def exec(self
, ro_task
, task_index
, task_depends
):
1400 task
= ro_task
["tasks"][task_index
]
1401 task_id
= task
["task_id"]
1402 db_task_update
= {"retries": 0}
1405 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1409 if task
.get("params"):
1410 vim_vm_id
= task
["params"].get("vim_vm_id")
1411 action
= task
["params"].get("action")
1412 context
= {action
: action
}
1413 target_vim
.action_vminstance(vim_vm_id
, context
)
1415 ro_vim_item_update
= {
1416 "vim_id": vim_vm_id
,
1417 "vim_status": "ACTIVE",
1419 "created_items": created_items
,
1420 "vim_details": None,
1421 "vim_message": None,
1424 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1426 return "DONE", ro_vim_item_update
, db_task_update
1427 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1429 "task={} vim={} VM Migration:"
1430 " {}".format(task_id
, ro_task
["target_id"], e
)
1432 ro_vim_item_update
= {
1433 "vim_status": "VIM_ERROR",
1435 "vim_message": str(e
),
1438 return "FAILED", ro_vim_item_update
, db_task_update
1441 class VimInteractionSdnNet(VimInteractionBase
):
1443 def _match_pci(port_pci
, mapping
):
1445 Check if port_pci matches with mapping.
1446 The mapping can have brackets to indicate that several chars are accepted. e.g
1447 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1448 :param port_pci: text
1449 :param mapping: text, can contain brackets to indicate several chars are available
1450 :return: True if matches, False otherwise
1452 if not port_pci
or not mapping
:
1454 if port_pci
== mapping
:
1460 bracket_start
= mapping
.find("[", mapping_index
)
1462 if bracket_start
== -1:
1465 bracket_end
= mapping
.find("]", bracket_start
)
1466 if bracket_end
== -1:
1469 length
= bracket_start
- mapping_index
1472 and port_pci
[pci_index
: pci_index
+ length
]
1473 != mapping
[mapping_index
:bracket_start
]
1478 port_pci
[pci_index
+ length
]
1479 not in mapping
[bracket_start
+ 1 : bracket_end
]
1483 pci_index
+= length
+ 1
1484 mapping_index
= bracket_end
+ 1
1486 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
1491 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
1493 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1494 :param vim_account_id:
1499 for vld
in vlds_to_connect
:
1500 table
, _
, db_id
= vld
.partition(":")
1501 db_id
, _
, vld
= db_id
.partition(":")
1502 _
, _
, vld_id
= vld
.partition(".")
1504 if table
== "vnfrs":
1505 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
1506 iface_key
= "vnf-vld-id"
1507 else: # table == "nsrs"
1508 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1509 iface_key
= "ns-vld-id"
1511 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1513 for db_vnfr
in db_vnfrs
:
1514 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1515 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1516 if interface
.get(iface_key
) == vld_id
and interface
.get(
1518 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1520 interface_
= interface
.copy()
1521 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1522 db_vnfr
["_id"], vdu_index
, iface_index
1525 if vdur
.get("status") == "ERROR":
1526 interface_
["status"] = "ERROR"
1528 interfaces
.append(interface_
)
1532 def refresh(self
, ro_task
):
1533 # look for task create
1534 task_create_index
, _
= next(
1536 for i_t
in enumerate(ro_task
["tasks"])
1538 and i_t
[1]["action"] == "CREATE"
1539 and i_t
[1]["status"] != "FINISHED"
1542 return self
.new(ro_task
, task_create_index
, None)
1544 def new(self
, ro_task
, task_index
, task_depends
):
1545 task
= ro_task
["tasks"][task_index
]
1546 task_id
= task
["task_id"]
1547 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1549 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1551 created_items
= ro_task
["vim_info"].get("created_items")
1552 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1553 new_connected_ports
= []
1554 last_update
= ro_task
["vim_info"].get("last_update", 0)
1555 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1557 created
= ro_task
["vim_info"].get("created", False)
1562 params
= task
["params"]
1563 vlds_to_connect
= params
.get("vlds", [])
1564 associated_vim
= params
.get("target_vim")
1565 # external additional ports
1566 additional_ports
= params
.get("sdn-ports") or ()
1567 _
, _
, vim_account_id
= (
1569 if associated_vim
is None
1570 else associated_vim
.partition(":")
1574 # get associated VIM
1575 if associated_vim
not in self
.db_vims
:
1576 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1577 "vim_accounts", {"_id": vim_account_id
}
1580 db_vim
= self
.db_vims
[associated_vim
]
1582 # look for ports to connect
1583 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1587 pending_ports
= error_ports
= 0
1589 sdn_need_update
= False
1592 vlan_used
= port
.get("vlan") or vlan_used
1594 # TODO. Do not connect if already done
1595 if not port
.get("compute_node") or not port
.get("pci"):
1596 if port
.get("status") == "ERROR":
1603 compute_node_mappings
= next(
1606 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1607 if c
and c
["compute_node"] == port
["compute_node"]
1612 if compute_node_mappings
:
1613 # process port_mapping pci of type 0000:af:1[01].[1357]
1617 for p
in compute_node_mappings
["ports"]
1618 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1624 if not db_vim
["config"].get("mapping_not_needed"):
1626 "Port mapping not found for compute_node={} pci={}".format(
1627 port
["compute_node"], port
["pci"]
1634 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1636 "service_endpoint_id": pmap
.get("service_endpoint_id")
1637 or service_endpoint_id
,
1638 "service_endpoint_encapsulation_type": "dot1q"
1639 if port
["type"] == "SR-IOV"
1641 "service_endpoint_encapsulation_info": {
1642 "vlan": port
.get("vlan"),
1643 "mac": port
.get("mac-address"),
1644 "device_id": pmap
.get("device_id") or port
["compute_node"],
1645 "device_interface_id": pmap
.get("device_interface_id")
1647 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1648 "switch_port": pmap
.get("switch_port"),
1649 "service_mapping_info": pmap
.get("service_mapping_info"),
1654 # if port["modified_at"] > last_update:
1655 # sdn_need_update = True
1656 new_connected_ports
.append(port
["id"]) # TODO
1657 sdn_ports
.append(new_port
)
1661 "{} interfaces have not been created as VDU is on ERROR status".format(
1666 # connect external ports
1667 for index
, additional_port
in enumerate(additional_ports
):
1668 additional_port_id
= additional_port
.get(
1669 "service_endpoint_id"
1670 ) or "external-{}".format(index
)
1673 "service_endpoint_id": additional_port_id
,
1674 "service_endpoint_encapsulation_type": additional_port
.get(
1675 "service_endpoint_encapsulation_type", "dot1q"
1677 "service_endpoint_encapsulation_info": {
1678 "vlan": additional_port
.get("vlan") or vlan_used
,
1679 "mac": additional_port
.get("mac_address"),
1680 "device_id": additional_port
.get("device_id"),
1681 "device_interface_id": additional_port
.get(
1682 "device_interface_id"
1684 "switch_dpid": additional_port
.get("switch_dpid")
1685 or additional_port
.get("switch_id"),
1686 "switch_port": additional_port
.get("switch_port"),
1687 "service_mapping_info": additional_port
.get(
1688 "service_mapping_info"
1693 new_connected_ports
.append(additional_port_id
)
1696 # if there are more ports to connect or they have been modified, call create/update
1698 sdn_status
= "ERROR"
1699 sdn_info
= "; ".join(error_list
)
1700 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1701 last_update
= time
.time()
1704 if len(sdn_ports
) < 2:
1705 sdn_status
= "ACTIVE"
1707 if not pending_ports
:
1709 "task={} {} new-sdn-net done, less than 2 ports".format(
1710 task_id
, ro_task
["target_id"]
1714 net_type
= params
.get("type") or "ELAN"
1718 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1721 "task={} {} new-sdn-net={} created={}".format(
1722 task_id
, ro_task
["target_id"], sdn_net_id
, created
1726 created_items
= target_vim
.edit_connectivity_service(
1727 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1731 "task={} {} update-sdn-net={} created={}".format(
1732 task_id
, ro_task
["target_id"], sdn_net_id
, created
1736 connected_ports
= new_connected_ports
1738 wim_status_dict
= target_vim
.get_connectivity_service_status(
1739 sdn_net_id
, conn_info
=created_items
1741 sdn_status
= wim_status_dict
["sdn_status"]
1743 if wim_status_dict
.get("sdn_info"):
1744 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1746 if wim_status_dict
.get("error_msg"):
1747 sdn_info
= wim_status_dict
.get("error_msg") or ""
1750 if sdn_status
!= "ERROR":
1751 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1752 len(ports
) - pending_ports
, len(ports
)
1755 if sdn_status
== "ACTIVE":
1756 sdn_status
= "BUILD"
1758 ro_vim_item_update
= {
1759 "vim_id": sdn_net_id
,
1760 "vim_status": sdn_status
,
1762 "created_items": created_items
,
1763 "connected_ports": connected_ports
,
1764 "vim_details": sdn_info
,
1765 "vim_message": None,
1766 "last_update": last_update
,
1769 return sdn_status
, ro_vim_item_update
1770 except Exception as e
:
1772 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1773 exc_info
=not isinstance(
1774 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1777 ro_vim_item_update
= {
1778 "vim_status": "VIM_ERROR",
1780 "vim_message": str(e
),
1783 return "FAILED", ro_vim_item_update
1785 def delete(self
, ro_task
, task_index
):
1786 task
= ro_task
["tasks"][task_index
]
1787 task_id
= task
["task_id"]
1788 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1789 ro_vim_item_update_ok
= {
1790 "vim_status": "DELETED",
1792 "vim_message": "DELETED",
1798 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1799 target_vim
.delete_connectivity_service(
1800 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1803 except Exception as e
:
1805 isinstance(e
, sdnconn
.SdnConnectorError
)
1806 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1808 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1811 "ro_task={} vim={} del-sdn-net={}: {}".format(
1812 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1814 exc_info
=not isinstance(
1815 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1818 ro_vim_item_update
= {
1819 "vim_status": "VIM_ERROR",
1820 "vim_message": "Error while deleting: {}".format(e
),
1823 return "FAILED", ro_vim_item_update
1826 "task={} {} del-sdn-net={} {}".format(
1828 ro_task
["target_id"],
1830 ro_vim_item_update_ok
.get("vim_message", ""),
1834 return "DONE", ro_vim_item_update_ok
1837 class VimInteractionMigration(VimInteractionBase
):
1838 def exec(self
, ro_task
, task_index
, task_depends
):
1839 task
= ro_task
["tasks"][task_index
]
1840 task_id
= task
["task_id"]
1841 db_task_update
= {"retries": 0}
1842 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1846 refreshed_vim_info
= {}
1850 if task
.get("params"):
1851 vim_vm_id
= task
["params"].get("vim_vm_id")
1852 migrate_host
= task
["params"].get("migrate_host")
1853 _
, migrated_compute_node
= target_vim
.migrate_instance(
1854 vim_vm_id
, migrate_host
1857 if migrated_compute_node
:
1858 # When VM is migrated, vdu["vim_info"] needs to be updated
1859 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1860 ro_task
["target_id"]
1863 # Refresh VM to get new vim_info
1864 vm_to_refresh_list
= [vim_vm_id
]
1865 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1866 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1868 if refreshed_vim_info
.get("interfaces"):
1869 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1873 for iface
in refreshed_vim_info
["interfaces"]
1874 if old_iface
["vim_interface_id"]
1875 == iface
["vim_interface_id"]
1879 vim_interfaces
.append(iface
)
1881 ro_vim_item_update
= {
1882 "vim_id": vim_vm_id
,
1883 "vim_status": "ACTIVE",
1885 "created_items": created_items
,
1886 "vim_details": None,
1887 "vim_message": None,
1890 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1894 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1897 ro_vim_item_update
["interfaces"] = vim_interfaces
1900 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1903 return "DONE", ro_vim_item_update
, db_task_update
1905 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1907 "task={} vim={} VM Migration:"
1908 " {}".format(task_id
, ro_task
["target_id"], e
)
1910 ro_vim_item_update
= {
1911 "vim_status": "VIM_ERROR",
1913 "vim_message": str(e
),
1916 return "FAILED", ro_vim_item_update
, db_task_update
1919 class VimInteractionResize(VimInteractionBase
):
1920 def exec(self
, ro_task
, task_index
, task_depends
):
1921 task
= ro_task
["tasks"][task_index
]
1922 task_id
= task
["task_id"]
1923 db_task_update
= {"retries": 0}
1925 target_flavor_uuid
= None
1927 refreshed_vim_info
= {}
1928 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1931 params
= task
["params"]
1932 params_copy
= deepcopy(params
)
1933 target_flavor_uuid
= task_depends
[params_copy
["flavor_id"]]
1935 if task
.get("params"):
1936 self
.logger
.info("vim_vm_id %s", vim_vm_id
)
1938 if target_flavor_uuid
is not None:
1939 resized_status
= target_vim
.resize_instance(
1940 vim_vm_id
, target_flavor_uuid
1944 # Refresh VM to get new vim_info
1945 vm_to_refresh_list
= [vim_vm_id
]
1946 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1947 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1949 ro_vim_item_update
= {
1950 "vim_id": vim_vm_id
,
1951 "vim_status": "ACTIVE",
1953 "created_items": created_items
,
1954 "vim_details": None,
1955 "vim_message": None,
1958 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1962 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1965 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1967 return "DONE", ro_vim_item_update
, db_task_update
1968 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1970 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1972 ro_vim_item_update
= {
1973 "vim_status": "VIM_ERROR",
1975 "vim_message": str(e
),
1978 return "FAILED", ro_vim_item_update
, db_task_update
1981 class ConfigValidate
:
1982 def __init__(self
, config
: Dict
):
1987 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1989 self
.conf
["period"]["refresh_active"] >= 60
1990 or self
.conf
["period"]["refresh_active"] == -1
1992 return self
.conf
["period"]["refresh_active"]
1998 return self
.conf
["period"]["refresh_build"]
2002 return self
.conf
["period"]["refresh_image"]
2006 return self
.conf
["period"]["refresh_error"]
2009 def queue_size(self
):
2010 return self
.conf
["period"]["queue_size"]
2013 class NsWorker(threading
.Thread
):
2014 def __init__(self
, worker_index
, config
, plugins
, db
):
2016 :param worker_index: thread index
2017 :param config: general configuration of RO, among others the process_id with the docker id where it runs
2018 :param plugins: global shared dict with the loaded plugins
2019 :param db: database class instance to use
2021 threading
.Thread
.__init
__(self
)
2022 self
.config
= config
2023 self
.plugins
= plugins
2024 self
.plugin_name
= "unknown"
2025 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
2026 self
.worker_index
= worker_index
2027 # refresh periods for created items
2028 self
.refresh_config
= ConfigValidate(config
)
2029 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
2030 # targetvim: vimplugin class
2032 # targetvim: vim information from database
2035 self
.vim_targets
= []
2036 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
2039 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2040 "shared-volumes": VimInteractionSharedVolume(
2041 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2043 "classification": VimInteractionClassification(
2044 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2046 "sfi": VimInteractionSfi(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2047 "sf": VimInteractionSf(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2048 "sfp": VimInteractionSfp(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2049 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2050 "image": VimInteractionImage(
2051 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2053 "flavor": VimInteractionFlavor(
2054 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2056 "sdn_net": VimInteractionSdnNet(
2057 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2059 "update": VimInteractionUpdateVdu(
2060 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2062 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
2063 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2065 "migrate": VimInteractionMigration(
2066 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2068 "verticalscale": VimInteractionResize(
2069 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2072 self
.time_last_task_processed
= None
2073 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
2074 self
.tasks_to_delete
= []
2075 # it is idle when there are not vim_targets associated
2077 self
.task_locked_time
= config
["global"]["task_locked_time"]
2079 def insert_task(self
, task
):
2081 self
.task_queue
.put(task
, False)
2084 raise NsWorkerException("timeout inserting a task")
2086 def terminate(self
):
2087 self
.insert_task("exit")
2089 def del_task(self
, task
):
2090 with self
.task_lock
:
2091 if task
["status"] == "SCHEDULED":
2092 task
["status"] = "SUPERSEDED"
2094 else: # task["status"] == "processing"
2095 self
.task_lock
.release()
2098 def _process_vim_config(self
, target_id
: str, db_vim
: dict) -> None:
2100 Process vim config, creating vim configuration files as ca_cert
2101 :param target_id: vim/sdn/wim + id
2102 :param db_vim: Vim dictionary obtained from database
2103 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
2105 if not db_vim
.get("config"):
2109 work_dir
= "/app/osm_ro/certs"
2112 if db_vim
["config"].get("ca_cert_content"):
2113 file_name
= f
"{work_dir}/{target_id}:{self.worker_index}"
2115 if not path
.isdir(file_name
):
2118 file_name
= file_name
+ "/ca_cert"
2120 with
open(file_name
, "w") as f
:
2121 f
.write(db_vim
["config"]["ca_cert_content"])
2122 del db_vim
["config"]["ca_cert_content"]
2123 db_vim
["config"]["ca_cert"] = file_name
2124 except Exception as e
:
2125 raise NsWorkerException(
2126 "Error writing to file '{}': {}".format(file_name
, e
)
2129 def _load_plugin(self
, name
, type="vim"):
2130 # type can be vim or sdn
2131 if "rovim_dummy" not in self
.plugins
:
2132 self
.plugins
["rovim_dummy"] = VimDummyConnector
2134 if "rosdn_dummy" not in self
.plugins
:
2135 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
2137 if name
in self
.plugins
:
2138 return self
.plugins
[name
]
2141 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
2142 self
.plugins
[name
] = ep
.load()
2143 except Exception as e
:
2144 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
2146 if name
and name
not in self
.plugins
:
2147 raise NsWorkerException(
2148 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
2151 return self
.plugins
[name
]
2153 def _unload_vim(self
, target_id
):
2155 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
2156 :param target_id: Contains type:_id; where type can be 'vim', ...
2160 self
.db_vims
.pop(target_id
, None)
2161 self
.my_vims
.pop(target_id
, None)
2163 if target_id
in self
.vim_targets
:
2164 self
.vim_targets
.remove(target_id
)
2166 self
.logger
.info("Unloaded {}".format(target_id
))
2167 except Exception as e
:
2168 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
2170 def _check_vim(self
, target_id
):
2172 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
2173 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
2176 target
, _
, _id
= target_id
.partition(":")
2182 loaded
= target_id
in self
.vim_targets
2193 step
= "Getting {} from db".format(target_id
)
2194 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
2196 for op_index
, operation
in enumerate(
2197 db_vim
["_admin"].get("operations", ())
2199 if operation
["operationState"] != "PROCESSING":
2202 locked_at
= operation
.get("locked_at")
2204 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
2205 # some other thread is doing this operation
2209 op_text
= "_admin.operations.{}.".format(op_index
)
2211 if not self
.db
.set_one(
2215 op_text
+ "operationState": "PROCESSING",
2216 op_text
+ "locked_at": locked_at
,
2219 op_text
+ "locked_at": now
,
2220 "admin.current_operation": op_index
,
2222 fail_on_empty
=False,
2226 unset_dict
[op_text
+ "locked_at"] = None
2227 unset_dict
["current_operation"] = None
2228 step
= "Loading " + target_id
2229 error_text
= self
._load
_vim
(target_id
)
2232 step
= "Checking connectivity"
2235 self
.my_vims
[target_id
].check_vim_connectivity()
2237 self
.my_vims
[target_id
].check_credentials()
2239 update_dict
["_admin.operationalState"] = "ENABLED"
2240 update_dict
["_admin.detailed-status"] = ""
2241 unset_dict
[op_text
+ "detailed-status"] = None
2242 update_dict
[op_text
+ "operationState"] = "COMPLETED"
2246 except Exception as e
:
2247 error_text
= "{}: {}".format(step
, e
)
2248 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
2251 if update_dict
or unset_dict
:
2253 update_dict
[op_text
+ "operationState"] = "FAILED"
2254 update_dict
[op_text
+ "detailed-status"] = error_text
2255 unset_dict
.pop(op_text
+ "detailed-status", None)
2256 update_dict
["_admin.operationalState"] = "ERROR"
2257 update_dict
["_admin.detailed-status"] = error_text
2260 update_dict
[op_text
+ "statusEnteredTime"] = now
2264 q_filter
={"_id": _id
},
2265 update_dict
=update_dict
,
2267 fail_on_empty
=False,
2271 self
._unload
_vim
(target_id
)
2273 def _reload_vim(self
, target_id
):
2274 if target_id
in self
.vim_targets
:
2275 self
._load
_vim
(target_id
)
2277 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
2278 # just remove it to force load again next time it is needed
2279 self
.db_vims
.pop(target_id
, None)
2281 def _load_vim(self
, target_id
):
2283 Load or reload a vim_account, sdn_controller or wim_account.
2284 Read content from database, load the plugin if not loaded.
2285 In case of error loading the plugin, it loads a failing VIM_connector
2286 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
2287 :param target_id: Contains type:_id; where type can be 'vim', ...
2288 :return: None if ok, descriptive text if error
2290 target
, _
, _id
= target_id
.partition(":")
2300 step
= "Getting {}={} from db".format(target
, _id
)
2303 # TODO process for wim, sdnc, ...
2304 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
2306 # if deep_get(vim, "config", "sdn-controller"):
2307 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
2308 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
2310 step
= "Decrypting password"
2311 schema_version
= vim
.get("schema_version")
2312 self
.db
.encrypt_decrypt_fields(
2315 fields
=("password", "secret"),
2316 schema_version
=schema_version
,
2319 self
._process
_vim
_config
(target_id
, vim
)
2322 plugin_name
= "rovim_" + vim
["vim_type"]
2323 step
= "Loading plugin '{}'".format(plugin_name
)
2324 vim_module_conn
= self
._load
_plugin
(plugin_name
)
2325 step
= "Loading {}'".format(target_id
)
2326 self
.my_vims
[target_id
] = vim_module_conn(
2329 tenant_id
=vim
.get("vim_tenant_id"),
2330 tenant_name
=vim
.get("vim_tenant_name"),
2333 user
=vim
["vim_user"],
2334 passwd
=vim
["vim_password"],
2335 config
=vim
.get("config") or {},
2339 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
2340 step
= "Loading plugin '{}'".format(plugin_name
)
2341 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
2342 step
= "Loading {}'".format(target_id
)
2344 wim_config
= wim
.pop("config", {}) or {}
2345 wim
["uuid"] = wim
["_id"]
2346 if "url" in wim
and "wim_url" not in wim
:
2347 wim
["wim_url"] = wim
["url"]
2348 elif "url" not in wim
and "wim_url" in wim
:
2349 wim
["url"] = wim
["wim_url"]
2352 wim_config
["dpid"] = wim
.pop("dpid")
2354 if wim
.get("switch_id"):
2355 wim_config
["switch_id"] = wim
.pop("switch_id")
2357 # wim, wim_account, config
2358 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
2359 self
.db_vims
[target_id
] = vim
2360 self
.error_status
= None
2363 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
2365 except Exception as e
:
2367 "Cannot load {} plugin={}: {} {}".format(
2368 target_id
, plugin_name
, step
, e
2372 self
.db_vims
[target_id
] = vim
or {}
2373 self
.db_vims
[target_id
] = FailingConnector(str(e
))
2374 error_status
= "{} Error: {}".format(step
, e
)
2378 if target_id
not in self
.vim_targets
:
2379 self
.vim_targets
.append(target_id
)
2381 def _get_db_task(self
):
2383 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
2388 if not self
.time_last_task_processed
:
2389 self
.time_last_task_processed
= now
2394 # Log RO tasks only when loglevel is DEBUG
2395 if self.logger.getEffectiveLevel() == logging.DEBUG:
2402 + str(self.task_locked_time)
2404 + "time_last_task_processed="
2405 + str(self.time_last_task_processed)
2411 locked
= self
.db
.set_one(
2414 "target_id": self
.vim_targets
,
2415 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2416 "locked_at.lt": now
- self
.task_locked_time
,
2417 "to_check_at.lt": self
.time_last_task_processed
,
2418 "to_check_at.gt": -1,
2420 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
2421 fail_on_empty
=False,
2426 ro_task
= self
.db
.get_one(
2429 "target_id": self
.vim_targets
,
2430 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2436 if self
.time_last_task_processed
== now
:
2437 self
.time_last_task_processed
= None
2440 self
.time_last_task_processed
= now
2441 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2443 except DbException
as e
:
2444 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
2445 except Exception as e
:
2446 self
.logger
.critical(
2447 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
2452 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2454 Determine if this task need to be done or superseded
2457 my_task
= ro_task
["tasks"][task_index
]
2458 task_id
= my_task
["task_id"]
2459 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
2460 "created_items", False
2463 self
.logger
.debug("Needed delete: {}".format(needed_delete
))
2464 if my_task
["status"] == "FAILED":
2465 return None, None # TODO need to be retry??
2468 for index
, task
in enumerate(ro_task
["tasks"]):
2469 if index
== task_index
or not task
:
2473 my_task
["target_record"] == task
["target_record"]
2474 and task
["action"] == "CREATE"
2477 db_update
["tasks.{}.status".format(index
)] = task
[
2480 elif task
["action"] == "CREATE" and task
["status"] not in (
2484 needed_delete
= False
2488 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
2490 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
2492 return "SUPERSEDED", None
2493 except Exception as e
:
2494 if not isinstance(e
, NsWorkerException
):
2495 self
.logger
.critical(
2496 "Unexpected exception at _delete_task task={}: {}".format(
2502 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2504 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2506 Determine if this task need to create something at VIM
2509 my_task
= ro_task
["tasks"][task_index
]
2510 task_id
= my_task
["task_id"]
2512 if my_task
["status"] == "FAILED":
2513 return None, None # TODO need to be retry??
2514 elif my_task
["status"] == "SCHEDULED":
2515 # check if already created by another task
2516 for index
, task
in enumerate(ro_task
["tasks"]):
2517 if index
== task_index
or not task
:
2520 if task
["action"] == "CREATE" and task
["status"] not in (
2525 return task
["status"], "COPY_VIM_INFO"
2528 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2529 ro_task
, task_index
, task_depends
2531 # TODO update other CREATE tasks
2532 except Exception as e
:
2533 if not isinstance(e
, NsWorkerException
):
2535 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2538 task_status
= "FAILED"
2539 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2540 # TODO update ro_vim_item_update
2542 return task_status
, ro_vim_item_update
2546 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2548 Look for dependency task
2549 :param task_id: Can be one of
2550 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2551 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2552 3. task.task_id: "<action_id>:number"
2555 :return: database ro_task plus index of task
2558 task_id
.startswith("vim:")
2559 or task_id
.startswith("sdn:")
2560 or task_id
.startswith("wim:")
2562 target_id
, _
, task_id
= task_id
.partition(" ")
2564 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2565 ro_task_dependency
= self
.db
.get_one(
2567 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2568 fail_on_empty
=False,
2571 if ro_task_dependency
:
2572 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2573 if task
["target_record_id"] == task_id
:
2574 return ro_task_dependency
, task_index
2578 for task_index
, task
in enumerate(ro_task
["tasks"]):
2579 if task
and task
["task_id"] == task_id
:
2580 return ro_task
, task_index
2582 ro_task_dependency
= self
.db
.get_one(
2585 "tasks.ANYINDEX.task_id": task_id
,
2586 "tasks.ANYINDEX.target_record.ne": None,
2588 fail_on_empty
=False,
2591 self
.logger
.debug("ro_task_dependency={}".format(ro_task_dependency
))
2592 if ro_task_dependency
:
2593 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2594 if task
["task_id"] == task_id
:
2595 return ro_task_dependency
, task_index
2596 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2598 def update_vm_refresh(self
, ro_task
):
2599 """Enables the VM status updates if self.refresh_config.active parameter
2600 is not -1 and then updates the DB accordingly
2604 self
.logger
.debug("Checking if VM status update config")
2605 next_refresh
= time
.time()
2606 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2608 if next_refresh
!= -1:
2609 db_ro_task_update
= {}
2611 next_check_at
= now
+ (24 * 60 * 60)
2612 next_check_at
= min(next_check_at
, next_refresh
)
2613 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2614 db_ro_task_update
["to_check_at"] = next_check_at
2617 "Finding tasks which to be updated to enable VM status updates"
2619 refresh_tasks
= self
.db
.get_list(
2622 "tasks.status": "DONE",
2623 "to_check_at.lt": 0,
2626 self
.logger
.debug("Updating tasks to change the to_check_at status")
2627 for task
in refresh_tasks
:
2634 update_dict
=db_ro_task_update
,
2638 except Exception as e
:
2639 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2641 def _get_next_refresh(self
, ro_task
: dict, next_refresh
: float):
2642 """Decide the next_refresh according to vim type and refresh config period.
2644 ro_task (dict): ro_task details
2645 next_refresh (float): next refresh time as epoch format
2648 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2650 target_vim
= ro_task
["target_id"]
2651 vim_type
= self
.db_vims
[target_vim
]["vim_type"]
2652 if self
.refresh_config
.active
== -1 or vim_type
== "openstack":
2655 next_refresh
+= self
.refresh_config
.active
2658 def _process_pending_tasks(self
, ro_task
):
2659 ro_task_id
= ro_task
["_id"]
2662 next_check_at
= now
+ (24 * 60 * 60)
2663 db_ro_task_update
= {}
2665 def _update_refresh(new_status
):
2666 # compute next_refresh
2668 nonlocal next_check_at
2669 nonlocal db_ro_task_update
2672 next_refresh
= time
.time()
2674 if task
["item"] in ("image", "flavor"):
2675 next_refresh
+= self
.refresh_config
.image
2676 elif new_status
== "BUILD":
2677 next_refresh
+= self
.refresh_config
.build
2678 elif new_status
== "DONE":
2679 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2681 next_refresh
+= self
.refresh_config
.error
2683 next_check_at
= min(next_check_at
, next_refresh
)
2684 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2685 ro_task
["vim_info"]["refresh_at"] = next_refresh
2689 # Log RO tasks only when loglevel is DEBUG
2690 if self.logger.getEffectiveLevel() == logging.DEBUG:
2691 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2693 # Check if vim status refresh is enabled again
2694 self
.update_vm_refresh(ro_task
)
2695 # 0: get task_status_create
2697 task_status_create
= None
2701 for t
in ro_task
["tasks"]
2703 and t
["action"] == "CREATE"
2704 and t
["status"] in ("BUILD", "DONE")
2710 task_status_create
= task_create
["status"]
2712 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2713 for task_action
in ("DELETE", "CREATE", "EXEC"):
2714 db_vim_update
= None
2717 for task_index
, task
in enumerate(ro_task
["tasks"]):
2719 continue # task deleted
2722 target_update
= None
2726 task_action
in ("DELETE", "EXEC")
2727 and task
["status"] not in ("SCHEDULED", "BUILD")
2729 or task
["action"] != task_action
2731 task_action
== "CREATE"
2732 and task
["status"] in ("FINISHED", "SUPERSEDED")
2737 task_path
= "tasks.{}.status".format(task_index
)
2739 db_vim_info_update
= None
2740 dependency_ro_task
= {}
2742 if task
["status"] == "SCHEDULED":
2743 # check if tasks that this depends on have been completed
2744 dependency_not_completed
= False
2746 for dependency_task_id
in task
.get("depends_on") or ():
2749 dependency_task_index
,
2750 ) = self
._get
_dependency
(
2751 dependency_task_id
, target_id
=ro_task
["target_id"]
2753 dependency_task
= dependency_ro_task
["tasks"][
2754 dependency_task_index
2757 "dependency_ro_task={} dependency_task_index={}".format(
2758 dependency_ro_task
, dependency_task_index
2762 if dependency_task
["status"] == "SCHEDULED":
2763 dependency_not_completed
= True
2764 next_check_at
= min(
2765 next_check_at
, dependency_ro_task
["to_check_at"]
2767 # must allow dependent task to be processed first
2768 # to do this set time after last_task_processed
2769 next_check_at
= max(
2770 self
.time_last_task_processed
, next_check_at
2773 elif dependency_task
["status"] == "FAILED":
2774 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2777 dependency_task
["action"],
2778 dependency_task
["item"],
2780 dependency_ro_task
["vim_info"].get(
2785 "task={} {}".format(task
["task_id"], error_text
)
2787 raise NsWorkerException(error_text
)
2789 task_depends
[dependency_task_id
] = dependency_ro_task
[
2793 "TASK-{}".format(dependency_task_id
)
2794 ] = dependency_ro_task
["vim_info"]["vim_id"]
2796 if dependency_not_completed
:
2797 self
.logger
.warning(
2798 "DEPENDENCY NOT COMPLETED {}".format(
2799 dependency_ro_task
["vim_info"]["vim_id"]
2802 # TODO set at vim_info.vim_details that it is waiting
2805 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2806 # the task of renew this locking. It will update database locket_at periodically
2808 lock_object
= LockRenew
.add_lock_object(
2809 "ro_tasks", ro_task
, self
2811 if task
["action"] == "DELETE":
2815 ) = self
._delete
_task
(
2816 ro_task
, task_index
, task_depends
, db_ro_task_update
2819 "FINISHED" if new_status
== "DONE" else new_status
2821 # ^with FINISHED instead of DONE it will not be refreshing
2823 if new_status
in ("FINISHED", "SUPERSEDED"):
2824 target_update
= "DELETE"
2825 elif task
["action"] == "EXEC":
2830 ) = self
.item2class
[task
["item"]].exec(
2831 ro_task
, task_index
, task_depends
2834 "FINISHED" if new_status
== "DONE" else new_status
2836 # ^with FINISHED instead of DONE it will not be refreshing
2839 # load into database the modified db_task_update "retries" and "next_retry"
2840 if db_task_update
.get("retries"):
2842 "tasks.{}.retries".format(task_index
)
2843 ] = db_task_update
["retries"]
2845 next_check_at
= time
.time() + db_task_update
.get(
2848 target_update
= None
2849 elif task
["action"] == "CREATE":
2850 if task
["status"] == "SCHEDULED":
2851 if task_status_create
:
2852 new_status
= task_status_create
2853 target_update
= "COPY_VIM_INFO"
2855 new_status
, db_vim_info_update
= self
.item2class
[
2857 ].new(ro_task
, task_index
, task_depends
)
2858 _update_refresh(new_status
)
2860 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2861 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2865 ) = self
.item2class
[
2868 _update_refresh(new_status
)
2870 # The refresh is updated to avoid set the value of "refresh_at" to
2871 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2872 # because it can happen that in this case the task is never processed
2873 _update_refresh(task
["status"])
2875 except Exception as e
:
2876 new_status
= "FAILED"
2877 db_vim_info_update
= {
2878 "vim_status": "VIM_ERROR",
2879 "vim_message": str(e
),
2883 e
, (NsWorkerException
, vimconn
.VimConnException
)
2886 "Unexpected exception at _delete_task task={}: {}".format(
2893 if db_vim_info_update
:
2894 db_vim_update
= db_vim_info_update
.copy()
2895 db_ro_task_update
.update(
2898 for k
, v
in db_vim_info_update
.items()
2901 ro_task
["vim_info"].update(db_vim_info_update
)
2904 if task_action
== "CREATE":
2905 task_status_create
= new_status
2906 db_ro_task_update
[task_path
] = new_status
2908 if target_update
or db_vim_update
:
2909 if target_update
== "DELETE":
2910 self
._update
_target
(task
, None)
2911 elif target_update
== "COPY_VIM_INFO":
2912 self
._update
_target
(task
, ro_task
["vim_info"])
2914 self
._update
_target
(task
, db_vim_update
)
2916 except Exception as e
:
2918 isinstance(e
, DbException
)
2919 and e
.http_code
== HTTPStatus
.NOT_FOUND
2921 # if the vnfrs or nsrs has been removed from database, this task must be removed
2923 "marking to delete task={}".format(task
["task_id"])
2925 self
.tasks_to_delete
.append(task
)
2928 "Unexpected exception at _update_target task={}: {}".format(
2934 locked_at
= ro_task
["locked_at"]
2938 lock_object
["locked_at"],
2939 lock_object
["locked_at"] + self
.task_locked_time
,
2941 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2942 # contain exactly locked_at + self.task_locked_time
2943 LockRenew
.remove_lock_object(lock_object
)
2946 "_id": ro_task
["_id"],
2947 "to_check_at": ro_task
["to_check_at"],
2948 "locked_at": locked_at
,
2950 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2951 # outside this task (by ro_nbi) do not update it
2952 db_ro_task_update
["locked_by"] = None
2953 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2954 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2955 db_ro_task_update
["modified_at"] = now
2956 db_ro_task_update
["to_check_at"] = next_check_at
2959 # Log RO tasks only when loglevel is DEBUG
2960 if self.logger.getEffectiveLevel() == logging.DEBUG:
2961 db_ro_task_update_log = db_ro_task_update.copy()
2962 db_ro_task_update_log["_id"] = q_filter["_id"]
2963 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2966 if not self
.db
.set_one(
2968 update_dict
=db_ro_task_update
,
2970 fail_on_empty
=False,
2972 del db_ro_task_update
["to_check_at"]
2973 del q_filter
["to_check_at"]
2975 # Log RO tasks only when loglevel is DEBUG
2976 if self.logger.getEffectiveLevel() == logging.DEBUG:
2979 db_ro_task_update_log,
2982 "SET_TASK " + str(q_filter),
2988 update_dict
=db_ro_task_update
,
2991 except DbException
as e
:
2993 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2995 except Exception as e
:
2997 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
3000 def _update_target(self
, task
, ro_vim_item_update
):
3001 table
, _
, temp
= task
["target_record"].partition(":")
3002 _id
, _
, path_vim_status
= temp
.partition(":")
3003 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
3004 path_item
= path_item
[: path_item
.rfind(".")]
3005 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
3006 # path_item: dot separated list targeting record information, e.g. "vdur.10"
3008 if ro_vim_item_update
:
3010 path_vim_status
+ "." + k
: v
3011 for k
, v
in ro_vim_item_update
.items()
3020 "interfaces_backup",
3024 if path_vim_status
.startswith("vdur."):
3025 # for backward compatibility, add vdur.name apart from vdur.vim_name
3026 if ro_vim_item_update
.get("vim_name"):
3027 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
3029 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
3030 if ro_vim_item_update
.get("vim_id"):
3031 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
3033 # update general status
3034 if ro_vim_item_update
.get("vim_status"):
3035 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
3039 if ro_vim_item_update
.get("interfaces"):
3040 path_interfaces
= path_item
+ ".interfaces"
3042 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
3046 path_interfaces
+ ".{}.".format(i
) + k
: v
3047 for k
, v
in iface
.items()
3048 if k
in ("vlan", "compute_node", "pci")
3052 # put ip_address and mac_address with ip-address and mac-address
3053 if iface
.get("ip_address"):
3055 path_interfaces
+ ".{}.".format(i
) + "ip-address"
3056 ] = iface
["ip_address"]
3058 if iface
.get("mac_address"):
3060 path_interfaces
+ ".{}.".format(i
) + "mac-address"
3061 ] = iface
["mac_address"]
3063 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
3064 update_dict
["ip-address"] = iface
.get("ip_address").split(
3068 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
3069 update_dict
[path_item
+ ".ip-address"] = iface
.get(
3073 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
3075 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
3076 if ro_vim_item_update
.get("interfaces"):
3077 search_key
= path_vim_status
+ ".interfaces"
3078 if update_dict
.get(search_key
):
3079 interfaces_backup_update
= {
3080 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
3085 q_filter
={"_id": _id
},
3086 update_dict
=interfaces_backup_update
,
3090 update_dict
= {path_item
+ ".status": "DELETED"}
3093 q_filter
={"_id": _id
},
3094 update_dict
=update_dict
,
3095 unset
={path_vim_status
: None},
3098 def _process_delete_db_tasks(self
):
3100 Delete task from database because vnfrs or nsrs or both have been deleted
3101 :return: None. Uses and modify self.tasks_to_delete
3103 while self
.tasks_to_delete
:
3104 task
= self
.tasks_to_delete
[0]
3105 vnfrs_deleted
= None
3106 nsr_id
= task
["nsr_id"]
3108 if task
["target_record"].startswith("vnfrs:"):
3109 # check if nsrs is present
3110 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
3111 vnfrs_deleted
= task
["target_record"].split(":")[1]
3114 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
3115 except Exception as e
:
3117 "Error deleting task={}: {}".format(task
["task_id"], e
)
3119 self
.tasks_to_delete
.pop(0)
3122 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
3124 Static method because it is called from osm_ng_ro.ns
3125 :param db: instance of database to use
3126 :param nsr_id: affected nsrs id
3127 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
3128 :return: None, exception is fails
3131 for retry
in range(retries
):
3132 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
3136 for ro_task
in ro_tasks
:
3138 to_delete_ro_task
= True
3140 for index
, task
in enumerate(ro_task
["tasks"]):
3143 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
3145 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
3147 db_update
["tasks.{}".format(index
)] = None
3149 # used by other nsr, ro_task cannot be deleted
3150 to_delete_ro_task
= False
3152 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
3153 if to_delete_ro_task
:
3157 "_id": ro_task
["_id"],
3158 "modified_at": ro_task
["modified_at"],
3160 fail_on_empty
=False,
3164 db_update
["modified_at"] = now
3168 "_id": ro_task
["_id"],
3169 "modified_at": ro_task
["modified_at"],
3171 update_dict
=db_update
,
3172 fail_on_empty
=False,
3178 raise NsWorkerException("Exceeded {} retries".format(retries
))
3182 self
.logger
.info("Starting")
3184 # step 1: get commands from queue
3186 if self
.vim_targets
:
3187 task
= self
.task_queue
.get(block
=False)
3190 self
.logger
.debug("enters in idle state")
3192 task
= self
.task_queue
.get(block
=True)
3195 if task
[0] == "terminate":
3197 elif task
[0] == "load_vim":
3198 self
.logger
.info("order to load vim {}".format(task
[1]))
3199 self
._load
_vim
(task
[1])
3200 elif task
[0] == "unload_vim":
3201 self
.logger
.info("order to unload vim {}".format(task
[1]))
3202 self
._unload
_vim
(task
[1])
3203 elif task
[0] == "reload_vim":
3204 self
._reload
_vim
(task
[1])
3205 elif task
[0] == "check_vim":
3206 self
.logger
.info("order to check vim {}".format(task
[1]))
3207 self
._check
_vim
(task
[1])
3209 except Exception as e
:
3210 if isinstance(e
, queue
.Empty
):
3213 self
.logger
.critical(
3214 "Error processing task: {}".format(e
), exc_info
=True
3217 # step 2: process pending_tasks, delete not needed tasks
3219 if self
.tasks_to_delete
:
3220 self
._process
_delete
_db
_tasks
()
3223 # Log RO tasks only when loglevel is DEBUG
3224 if self.logger.getEffectiveLevel() == logging.DEBUG:
3225 _ = self._get_db_all_tasks()
3227 ro_task
= self
._get
_db
_task
()
3229 self
.logger
.debug("Task to process: {}".format(ro_task
))
3231 self
._process
_pending
_tasks
(ro_task
)
3235 except Exception as e
:
3236 self
.logger
.critical(
3237 "Unexpected exception at run: " + str(e
), exc_info
=True
3240 self
.logger
.info("Finishing")