Keep vim_details while reporting VM deletion
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn
43 from osm_ro_plugin import vimconn
44 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 import yaml
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
223
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
234
235 return "FAILED", ro_vim_item_update
236
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
243
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
247
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
263
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 ro_vim_item_update.get("vim_message")
289 if ro_vim_item_update.get("vim_status") != "ACTIVE"
290 else "",
291 )
292 )
293
294 return task_status, ro_vim_item_update
295
296 def delete(self, ro_task, task_index):
297 task = ro_task["tasks"][task_index]
298 task_id = task["task_id"]
299 net_vim_id = ro_task["vim_info"]["vim_id"]
300 ro_vim_item_update_ok = {
301 "vim_status": "DELETED",
302 "created": False,
303 "vim_message": "DELETED",
304 "vim_id": None,
305 }
306
307 try:
308 if net_vim_id or ro_task["vim_info"]["created_items"]:
309 target_vim = self.my_vims[ro_task["target_id"]]
310 target_vim.delete_network(
311 net_vim_id, ro_task["vim_info"]["created_items"]
312 )
313 except vimconn.VimConnNotFoundException:
314 ro_vim_item_update_ok["vim_message"] = "already deleted"
315 except vimconn.VimConnException as e:
316 self.logger.error(
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task["_id"], ro_task["target_id"], net_vim_id, e
319 )
320 )
321 ro_vim_item_update = {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e),
324 }
325
326 return "FAILED", ro_vim_item_update
327
328 self.logger.debug(
329 "task={} {} del-net={} {}".format(
330 task_id,
331 ro_task["target_id"],
332 net_vim_id,
333 ro_vim_item_update_ok.get("vim_message", ""),
334 )
335 )
336
337 return "DONE", ro_vim_item_update_ok
338
339
340 class VimInteractionVdu(VimInteractionBase):
341 max_retries_inject_ssh_key = 20 # 20 times
342 time_retries_inject_ssh_key = 30 # wevery 30 seconds
343
344 def new(self, ro_task, task_index, task_depends):
345 task = ro_task["tasks"][task_index]
346 task_id = task["task_id"]
347 created = False
348 created_items = {}
349 target_vim = self.my_vims[ro_task["target_id"]]
350
351 try:
352 created = True
353 params = task["params"]
354 params_copy = deepcopy(params)
355 net_list = params_copy["net_list"]
356
357 for net in net_list:
358 # change task_id into network_id
359 if "net_id" in net and net["net_id"].startswith("TASK-"):
360 network_id = task_depends[net["net_id"]]
361
362 if not network_id:
363 raise NsWorkerException(
364 "Cannot create VM because depends on a network not created or found "
365 "for {}".format(net["net_id"])
366 )
367
368 net["net_id"] = network_id
369
370 if params_copy["image_id"].startswith("TASK-"):
371 params_copy["image_id"] = task_depends[params_copy["image_id"]]
372
373 if params_copy["flavor_id"].startswith("TASK-"):
374 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
375
376 affinity_group_list = params_copy["affinity_group_list"]
377 for affinity_group in affinity_group_list:
378 # change task_id into affinity_group_id
379 if "affinity_group_id" in affinity_group and affinity_group[
380 "affinity_group_id"
381 ].startswith("TASK-"):
382 affinity_group_id = task_depends[
383 affinity_group["affinity_group_id"]
384 ]
385
386 if not affinity_group_id:
387 raise NsWorkerException(
388 "found for {}".format(affinity_group["affinity_group_id"])
389 )
390
391 affinity_group["affinity_group_id"] = affinity_group_id
392
393 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
394 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
395
396 # add to created items previous_created_volumes (healing)
397 if task.get("previous_created_volumes"):
398 for k, v in task["previous_created_volumes"].items():
399 created_items[k] = v
400
401 ro_vim_item_update = {
402 "vim_id": vim_vm_id,
403 "vim_status": "BUILD",
404 "created": created,
405 "created_items": created_items,
406 "vim_details": None,
407 "vim_message": None,
408 "interfaces_vim_ids": interfaces,
409 "interfaces": [],
410 "interfaces_backup": [],
411 }
412 self.logger.debug(
413 "task={} {} new-vm={} created={}".format(
414 task_id, ro_task["target_id"], vim_vm_id, created
415 )
416 )
417
418 return "BUILD", ro_vim_item_update
419 except (vimconn.VimConnException, NsWorkerException) as e:
420 self.logger.debug(traceback.format_exc())
421 self.logger.error(
422 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
423 )
424 ro_vim_item_update = {
425 "vim_status": "VIM_ERROR",
426 "created": created,
427 "vim_message": str(e),
428 }
429
430 return "FAILED", ro_vim_item_update
431
432 def delete(self, ro_task, task_index):
433 task = ro_task["tasks"][task_index]
434 task_id = task["task_id"]
435 vm_vim_id = ro_task["vim_info"]["vim_id"]
436 ro_vim_item_update_ok = {
437 "vim_status": "DELETED",
438 "created": False,
439 "vim_message": "DELETED",
440 "vim_id": None,
441 }
442
443 try:
444 self.logger.debug(
445 "delete_vminstance: vm_vim_id={} created_items={}".format(
446 vm_vim_id, ro_task["vim_info"]["created_items"]
447 )
448 )
449 if vm_vim_id or ro_task["vim_info"]["created_items"]:
450 target_vim = self.my_vims[ro_task["target_id"]]
451 target_vim.delete_vminstance(
452 vm_vim_id,
453 ro_task["vim_info"]["created_items"],
454 ro_task["vim_info"].get("volumes_to_hold", []),
455 )
456 except vimconn.VimConnNotFoundException:
457 ro_vim_item_update_ok["vim_message"] = "already deleted"
458 except vimconn.VimConnException as e:
459 self.logger.error(
460 "ro_task={} vim={} del-vm={}: {}".format(
461 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
462 )
463 )
464 ro_vim_item_update = {
465 "vim_status": "VIM_ERROR",
466 "vim_message": "Error while deleting: {}".format(e),
467 }
468
469 return "FAILED", ro_vim_item_update
470
471 self.logger.debug(
472 "task={} {} del-vm={} {}".format(
473 task_id,
474 ro_task["target_id"],
475 vm_vim_id,
476 ro_vim_item_update_ok.get("vim_message", ""),
477 )
478 )
479
480 return "DONE", ro_vim_item_update_ok
481
482 def refresh(self, ro_task):
483 """Call VIM to get vm status"""
484 ro_task_id = ro_task["_id"]
485 target_vim = self.my_vims[ro_task["target_id"]]
486 vim_id = ro_task["vim_info"]["vim_id"]
487
488 if not vim_id:
489 return None, None
490
491 vm_to_refresh_list = [vim_id]
492 try:
493 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
494 vim_info = vim_dict[vim_id]
495
496 if vim_info["status"] == "ACTIVE":
497 task_status = "DONE"
498 elif vim_info["status"] == "BUILD":
499 task_status = "BUILD"
500 else:
501 task_status = "FAILED"
502
503 # try to load and parse vim_information
504 try:
505 vim_info_info = yaml.safe_load(vim_info["vim_info"])
506 if vim_info_info.get("name"):
507 vim_info["name"] = vim_info_info["name"]
508 except Exception as vim_info_error:
509 self.logger.exception(
510 f"{vim_info_error} occured while getting the vim_info from yaml"
511 )
512 except vimconn.VimConnException as e:
513 # Mark all tasks at VIM_ERROR status
514 self.logger.error(
515 "ro_task={} vim={} get-vm={}: {}".format(
516 ro_task_id, ro_task["target_id"], vim_id, e
517 )
518 )
519 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
520 task_status = "FAILED"
521
522 ro_vim_item_update = {}
523
524 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
525 vim_interfaces = []
526 if vim_info.get("interfaces"):
527 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
528 iface = next(
529 (
530 iface
531 for iface in vim_info["interfaces"]
532 if vim_iface_id == iface["vim_interface_id"]
533 ),
534 None,
535 )
536 # if iface:
537 # iface.pop("vim_info", None)
538 vim_interfaces.append(iface)
539
540 task_create = next(
541 t
542 for t in ro_task["tasks"]
543 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
544 )
545 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
546 vim_interfaces[task_create["mgmt_vnf_interface"]][
547 "mgmt_vnf_interface"
548 ] = True
549
550 mgmt_vdu_iface = task_create.get(
551 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
552 )
553 if vim_interfaces:
554 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
555
556 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
557 ro_vim_item_update["interfaces"] = vim_interfaces
558
559 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
560 ro_vim_item_update["vim_status"] = vim_info["status"]
561
562 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
563 ro_vim_item_update["vim_name"] = vim_info.get("name")
564
565 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
566 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
567 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
568 elif vim_info["status"] == "DELETED":
569 ro_vim_item_update["vim_id"] = None
570 ro_vim_item_update["vim_message"] = "Deleted externally"
571 else:
572 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
573 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
574
575 if ro_vim_item_update:
576 self.logger.debug(
577 "ro_task={} {} get-vm={}: status={} {}".format(
578 ro_task_id,
579 ro_task["target_id"],
580 vim_id,
581 ro_vim_item_update.get("vim_status"),
582 ro_vim_item_update.get("vim_message")
583 if ro_vim_item_update.get("vim_status") != "ACTIVE"
584 else "",
585 )
586 )
587
588 return task_status, ro_vim_item_update
589
590 def exec(self, ro_task, task_index, task_depends):
591 task = ro_task["tasks"][task_index]
592 task_id = task["task_id"]
593 target_vim = self.my_vims[ro_task["target_id"]]
594 db_task_update = {"retries": 0}
595 retries = task.get("retries", 0)
596
597 try:
598 params = task["params"]
599 params_copy = deepcopy(params)
600 params_copy["ro_key"] = self.db.decrypt(
601 params_copy.pop("private_key"),
602 params_copy.pop("schema_version"),
603 params_copy.pop("salt"),
604 )
605 params_copy["ip_addr"] = params_copy.pop("ip_address")
606 target_vim.inject_user_key(**params_copy)
607 self.logger.debug(
608 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
609 )
610
611 return (
612 "DONE",
613 None,
614 db_task_update,
615 ) # params_copy["key"]
616 except (vimconn.VimConnException, NsWorkerException) as e:
617 retries += 1
618
619 self.logger.debug(traceback.format_exc())
620 if retries < self.max_retries_inject_ssh_key:
621 return (
622 "BUILD",
623 None,
624 {
625 "retries": retries,
626 "next_retry": self.time_retries_inject_ssh_key,
627 },
628 )
629
630 self.logger.error(
631 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
632 )
633 ro_vim_item_update = {"vim_message": str(e)}
634
635 return "FAILED", ro_vim_item_update, db_task_update
636
637
638 class VimInteractionImage(VimInteractionBase):
639 def new(self, ro_task, task_index, task_depends):
640 task = ro_task["tasks"][task_index]
641 task_id = task["task_id"]
642 created = False
643 created_items = {}
644 target_vim = self.my_vims[ro_task["target_id"]]
645
646 try:
647 # FIND
648 if task.get("find_params"):
649 vim_images = target_vim.get_image_list(**task["find_params"])
650
651 if not vim_images:
652 raise NsWorkerExceptionNotFound(
653 "Image not found with this criteria: '{}'".format(
654 task["find_params"]
655 )
656 )
657 elif len(vim_images) > 1:
658 raise NsWorkerException(
659 "More than one image found with this criteria: '{}'".format(
660 task["find_params"]
661 )
662 )
663 else:
664 vim_image_id = vim_images[0]["id"]
665
666 ro_vim_item_update = {
667 "vim_id": vim_image_id,
668 "vim_status": "DONE",
669 "created": created,
670 "created_items": created_items,
671 "vim_details": None,
672 "vim_message": None,
673 }
674 self.logger.debug(
675 "task={} {} new-image={} created={}".format(
676 task_id, ro_task["target_id"], vim_image_id, created
677 )
678 )
679
680 return "DONE", ro_vim_item_update
681 except (NsWorkerException, vimconn.VimConnException) as e:
682 self.logger.error(
683 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
684 )
685 ro_vim_item_update = {
686 "vim_status": "VIM_ERROR",
687 "created": created,
688 "vim_message": str(e),
689 }
690
691 return "FAILED", ro_vim_item_update
692
693
694 class VimInteractionFlavor(VimInteractionBase):
695 def delete(self, ro_task, task_index):
696 task = ro_task["tasks"][task_index]
697 task_id = task["task_id"]
698 flavor_vim_id = ro_task["vim_info"]["vim_id"]
699 ro_vim_item_update_ok = {
700 "vim_status": "DELETED",
701 "created": False,
702 "vim_message": "DELETED",
703 "vim_id": None,
704 }
705
706 try:
707 if flavor_vim_id:
708 target_vim = self.my_vims[ro_task["target_id"]]
709 target_vim.delete_flavor(flavor_vim_id)
710 except vimconn.VimConnNotFoundException:
711 ro_vim_item_update_ok["vim_message"] = "already deleted"
712 except vimconn.VimConnException as e:
713 self.logger.error(
714 "ro_task={} vim={} del-flavor={}: {}".format(
715 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
716 )
717 )
718 ro_vim_item_update = {
719 "vim_status": "VIM_ERROR",
720 "vim_message": "Error while deleting: {}".format(e),
721 }
722
723 return "FAILED", ro_vim_item_update
724
725 self.logger.debug(
726 "task={} {} del-flavor={} {}".format(
727 task_id,
728 ro_task["target_id"],
729 flavor_vim_id,
730 ro_vim_item_update_ok.get("vim_message", ""),
731 )
732 )
733
734 return "DONE", ro_vim_item_update_ok
735
736 def new(self, ro_task, task_index, task_depends):
737 task = ro_task["tasks"][task_index]
738 task_id = task["task_id"]
739 created = False
740 created_items = {}
741 target_vim = self.my_vims[ro_task["target_id"]]
742
743 try:
744 # FIND
745 vim_flavor_id = None
746
747 if task.get("find_params"):
748 try:
749 flavor_data = task["find_params"]["flavor_data"]
750 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
751 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
752 self.logger.warning(
753 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
754 )
755
756 if not vim_flavor_id and task.get("params"):
757 # CREATE
758 flavor_data = task["params"]["flavor_data"]
759 vim_flavor_id = target_vim.new_flavor(flavor_data)
760 created = True
761
762 ro_vim_item_update = {
763 "vim_id": vim_flavor_id,
764 "vim_status": "DONE",
765 "created": created,
766 "created_items": created_items,
767 "vim_details": None,
768 "vim_message": None,
769 }
770 self.logger.debug(
771 "task={} {} new-flavor={} created={}".format(
772 task_id, ro_task["target_id"], vim_flavor_id, created
773 )
774 )
775
776 return "DONE", ro_vim_item_update
777 except (vimconn.VimConnException, NsWorkerException) as e:
778 self.logger.error(
779 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
780 )
781 ro_vim_item_update = {
782 "vim_status": "VIM_ERROR",
783 "created": created,
784 "vim_message": str(e),
785 }
786
787 return "FAILED", ro_vim_item_update
788
789
790 class VimInteractionAffinityGroup(VimInteractionBase):
791 def delete(self, ro_task, task_index):
792 task = ro_task["tasks"][task_index]
793 task_id = task["task_id"]
794 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
795 ro_vim_item_update_ok = {
796 "vim_status": "DELETED",
797 "created": False,
798 "vim_message": "DELETED",
799 "vim_id": None,
800 }
801
802 try:
803 if affinity_group_vim_id:
804 target_vim = self.my_vims[ro_task["target_id"]]
805 target_vim.delete_affinity_group(affinity_group_vim_id)
806 except vimconn.VimConnNotFoundException:
807 ro_vim_item_update_ok["vim_message"] = "already deleted"
808 except vimconn.VimConnException as e:
809 self.logger.error(
810 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
811 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
812 )
813 )
814 ro_vim_item_update = {
815 "vim_status": "VIM_ERROR",
816 "vim_message": "Error while deleting: {}".format(e),
817 }
818
819 return "FAILED", ro_vim_item_update
820
821 self.logger.debug(
822 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
823 task_id,
824 ro_task["target_id"],
825 affinity_group_vim_id,
826 ro_vim_item_update_ok.get("vim_message", ""),
827 )
828 )
829
830 return "DONE", ro_vim_item_update_ok
831
832 def new(self, ro_task, task_index, task_depends):
833 task = ro_task["tasks"][task_index]
834 task_id = task["task_id"]
835 created = False
836 created_items = {}
837 target_vim = self.my_vims[ro_task["target_id"]]
838
839 try:
840 affinity_group_vim_id = None
841 affinity_group_data = None
842
843 if task.get("params"):
844 affinity_group_data = task["params"].get("affinity_group_data")
845
846 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
847 try:
848 param_affinity_group_id = task["params"]["affinity_group_data"].get(
849 "vim-affinity-group-id"
850 )
851 affinity_group_vim_id = target_vim.get_affinity_group(
852 param_affinity_group_id
853 ).get("id")
854 except vimconn.VimConnNotFoundException:
855 self.logger.error(
856 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
857 "could not be found at VIM. Creating a new one.".format(
858 task_id, ro_task["target_id"], param_affinity_group_id
859 )
860 )
861
862 if not affinity_group_vim_id and affinity_group_data:
863 affinity_group_vim_id = target_vim.new_affinity_group(
864 affinity_group_data
865 )
866 created = True
867
868 ro_vim_item_update = {
869 "vim_id": affinity_group_vim_id,
870 "vim_status": "DONE",
871 "created": created,
872 "created_items": created_items,
873 "vim_details": None,
874 "vim_message": None,
875 }
876 self.logger.debug(
877 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
878 task_id, ro_task["target_id"], affinity_group_vim_id, created
879 )
880 )
881
882 return "DONE", ro_vim_item_update
883 except (vimconn.VimConnException, NsWorkerException) as e:
884 self.logger.error(
885 "task={} vim={} new-affinity-or-anti-affinity-group:"
886 " {}".format(task_id, ro_task["target_id"], e)
887 )
888 ro_vim_item_update = {
889 "vim_status": "VIM_ERROR",
890 "created": created,
891 "vim_message": str(e),
892 }
893
894 return "FAILED", ro_vim_item_update
895
896
897 class VimInteractionUpdateVdu(VimInteractionBase):
898 def exec(self, ro_task, task_index, task_depends):
899 task = ro_task["tasks"][task_index]
900 task_id = task["task_id"]
901 db_task_update = {"retries": 0}
902 created = False
903 created_items = {}
904 target_vim = self.my_vims[ro_task["target_id"]]
905
906 try:
907 if task.get("params"):
908 vim_vm_id = task["params"].get("vim_vm_id")
909 action = task["params"].get("action")
910 context = {action: action}
911 target_vim.action_vminstance(vim_vm_id, context)
912 # created = True
913 ro_vim_item_update = {
914 "vim_id": vim_vm_id,
915 "vim_status": "DONE",
916 "created": created,
917 "created_items": created_items,
918 "vim_details": None,
919 "vim_message": None,
920 }
921 self.logger.debug(
922 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
923 )
924 return "DONE", ro_vim_item_update, db_task_update
925 except (vimconn.VimConnException, NsWorkerException) as e:
926 self.logger.error(
927 "task={} vim={} VM Migration:"
928 " {}".format(task_id, ro_task["target_id"], e)
929 )
930 ro_vim_item_update = {
931 "vim_status": "VIM_ERROR",
932 "created": created,
933 "vim_message": str(e),
934 }
935
936 return "FAILED", ro_vim_item_update, db_task_update
937
938
939 class VimInteractionSdnNet(VimInteractionBase):
940 @staticmethod
941 def _match_pci(port_pci, mapping):
942 """
943 Check if port_pci matches with mapping
944 mapping can have brackets to indicate that several chars are accepted. e.g
945 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
946 :param port_pci: text
947 :param mapping: text, can contain brackets to indicate several chars are available
948 :return: True if matches, False otherwise
949 """
950 if not port_pci or not mapping:
951 return False
952 if port_pci == mapping:
953 return True
954
955 mapping_index = 0
956 pci_index = 0
957 while True:
958 bracket_start = mapping.find("[", mapping_index)
959
960 if bracket_start == -1:
961 break
962
963 bracket_end = mapping.find("]", bracket_start)
964 if bracket_end == -1:
965 break
966
967 length = bracket_start - mapping_index
968 if (
969 length
970 and port_pci[pci_index : pci_index + length]
971 != mapping[mapping_index:bracket_start]
972 ):
973 return False
974
975 if (
976 port_pci[pci_index + length]
977 not in mapping[bracket_start + 1 : bracket_end]
978 ):
979 return False
980
981 pci_index += length + 1
982 mapping_index = bracket_end + 1
983
984 if port_pci[pci_index:] != mapping[mapping_index:]:
985 return False
986
987 return True
988
989 def _get_interfaces(self, vlds_to_connect, vim_account_id):
990 """
991 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
992 :param vim_account_id:
993 :return:
994 """
995 interfaces = []
996
997 for vld in vlds_to_connect:
998 table, _, db_id = vld.partition(":")
999 db_id, _, vld = db_id.partition(":")
1000 _, _, vld_id = vld.partition(".")
1001
1002 if table == "vnfrs":
1003 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1004 iface_key = "vnf-vld-id"
1005 else: # table == "nsrs"
1006 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1007 iface_key = "ns-vld-id"
1008
1009 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1010
1011 for db_vnfr in db_vnfrs:
1012 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1013 for iface_index, interface in enumerate(vdur["interfaces"]):
1014 if interface.get(iface_key) == vld_id and interface.get(
1015 "type"
1016 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1017 # only SR-IOV o PT
1018 interface_ = interface.copy()
1019 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1020 db_vnfr["_id"], vdu_index, iface_index
1021 )
1022
1023 if vdur.get("status") == "ERROR":
1024 interface_["status"] = "ERROR"
1025
1026 interfaces.append(interface_)
1027
1028 return interfaces
1029
1030 def refresh(self, ro_task):
1031 # look for task create
1032 task_create_index, _ = next(
1033 i_t
1034 for i_t in enumerate(ro_task["tasks"])
1035 if i_t[1]
1036 and i_t[1]["action"] == "CREATE"
1037 and i_t[1]["status"] != "FINISHED"
1038 )
1039
1040 return self.new(ro_task, task_create_index, None)
1041
1042 def new(self, ro_task, task_index, task_depends):
1043 task = ro_task["tasks"][task_index]
1044 task_id = task["task_id"]
1045 target_vim = self.my_vims[ro_task["target_id"]]
1046
1047 sdn_net_id = ro_task["vim_info"]["vim_id"]
1048
1049 created_items = ro_task["vim_info"].get("created_items")
1050 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1051 new_connected_ports = []
1052 last_update = ro_task["vim_info"].get("last_update", 0)
1053 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1054 error_list = []
1055 created = ro_task["vim_info"].get("created", False)
1056
1057 try:
1058 # CREATE
1059 params = task["params"]
1060 vlds_to_connect = params.get("vlds", [])
1061 associated_vim = params.get("target_vim")
1062 # external additional ports
1063 additional_ports = params.get("sdn-ports") or ()
1064 _, _, vim_account_id = (
1065 (None, None, None)
1066 if associated_vim is None
1067 else associated_vim.partition(":")
1068 )
1069
1070 if associated_vim:
1071 # get associated VIM
1072 if associated_vim not in self.db_vims:
1073 self.db_vims[associated_vim] = self.db.get_one(
1074 "vim_accounts", {"_id": vim_account_id}
1075 )
1076
1077 db_vim = self.db_vims[associated_vim]
1078
1079 # look for ports to connect
1080 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1081 # print(ports)
1082
1083 sdn_ports = []
1084 pending_ports = error_ports = 0
1085 vlan_used = None
1086 sdn_need_update = False
1087
1088 for port in ports:
1089 vlan_used = port.get("vlan") or vlan_used
1090
1091 # TODO. Do not connect if already done
1092 if not port.get("compute_node") or not port.get("pci"):
1093 if port.get("status") == "ERROR":
1094 error_ports += 1
1095 else:
1096 pending_ports += 1
1097 continue
1098
1099 pmap = None
1100 compute_node_mappings = next(
1101 (
1102 c
1103 for c in db_vim["config"].get("sdn-port-mapping", ())
1104 if c and c["compute_node"] == port["compute_node"]
1105 ),
1106 None,
1107 )
1108
1109 if compute_node_mappings:
1110 # process port_mapping pci of type 0000:af:1[01].[1357]
1111 pmap = next(
1112 (
1113 p
1114 for p in compute_node_mappings["ports"]
1115 if self._match_pci(port["pci"], p.get("pci"))
1116 ),
1117 None,
1118 )
1119
1120 if not pmap:
1121 if not db_vim["config"].get("mapping_not_needed"):
1122 error_list.append(
1123 "Port mapping not found for compute_node={} pci={}".format(
1124 port["compute_node"], port["pci"]
1125 )
1126 )
1127 continue
1128
1129 pmap = {}
1130
1131 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1132 new_port = {
1133 "service_endpoint_id": pmap.get("service_endpoint_id")
1134 or service_endpoint_id,
1135 "service_endpoint_encapsulation_type": "dot1q"
1136 if port["type"] == "SR-IOV"
1137 else None,
1138 "service_endpoint_encapsulation_info": {
1139 "vlan": port.get("vlan"),
1140 "mac": port.get("mac-address"),
1141 "device_id": pmap.get("device_id") or port["compute_node"],
1142 "device_interface_id": pmap.get("device_interface_id")
1143 or port["pci"],
1144 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1145 "switch_port": pmap.get("switch_port"),
1146 "service_mapping_info": pmap.get("service_mapping_info"),
1147 },
1148 }
1149
1150 # TODO
1151 # if port["modified_at"] > last_update:
1152 # sdn_need_update = True
1153 new_connected_ports.append(port["id"]) # TODO
1154 sdn_ports.append(new_port)
1155
1156 if error_ports:
1157 error_list.append(
1158 "{} interfaces have not been created as VDU is on ERROR status".format(
1159 error_ports
1160 )
1161 )
1162
1163 # connect external ports
1164 for index, additional_port in enumerate(additional_ports):
1165 additional_port_id = additional_port.get(
1166 "service_endpoint_id"
1167 ) or "external-{}".format(index)
1168 sdn_ports.append(
1169 {
1170 "service_endpoint_id": additional_port_id,
1171 "service_endpoint_encapsulation_type": additional_port.get(
1172 "service_endpoint_encapsulation_type", "dot1q"
1173 ),
1174 "service_endpoint_encapsulation_info": {
1175 "vlan": additional_port.get("vlan") or vlan_used,
1176 "mac": additional_port.get("mac_address"),
1177 "device_id": additional_port.get("device_id"),
1178 "device_interface_id": additional_port.get(
1179 "device_interface_id"
1180 ),
1181 "switch_dpid": additional_port.get("switch_dpid")
1182 or additional_port.get("switch_id"),
1183 "switch_port": additional_port.get("switch_port"),
1184 "service_mapping_info": additional_port.get(
1185 "service_mapping_info"
1186 ),
1187 },
1188 }
1189 )
1190 new_connected_ports.append(additional_port_id)
1191 sdn_info = ""
1192
1193 # if there are more ports to connect or they have been modified, call create/update
1194 if error_list:
1195 sdn_status = "ERROR"
1196 sdn_info = "; ".join(error_list)
1197 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1198 last_update = time.time()
1199
1200 if not sdn_net_id:
1201 if len(sdn_ports) < 2:
1202 sdn_status = "ACTIVE"
1203
1204 if not pending_ports:
1205 self.logger.debug(
1206 "task={} {} new-sdn-net done, less than 2 ports".format(
1207 task_id, ro_task["target_id"]
1208 )
1209 )
1210 else:
1211 net_type = params.get("type") or "ELAN"
1212 (
1213 sdn_net_id,
1214 created_items,
1215 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1216 created = True
1217 self.logger.debug(
1218 "task={} {} new-sdn-net={} created={}".format(
1219 task_id, ro_task["target_id"], sdn_net_id, created
1220 )
1221 )
1222 else:
1223 created_items = target_vim.edit_connectivity_service(
1224 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1225 )
1226 created = True
1227 self.logger.debug(
1228 "task={} {} update-sdn-net={} created={}".format(
1229 task_id, ro_task["target_id"], sdn_net_id, created
1230 )
1231 )
1232
1233 connected_ports = new_connected_ports
1234 elif sdn_net_id:
1235 wim_status_dict = target_vim.get_connectivity_service_status(
1236 sdn_net_id, conn_info=created_items
1237 )
1238 sdn_status = wim_status_dict["sdn_status"]
1239
1240 if wim_status_dict.get("sdn_info"):
1241 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1242
1243 if wim_status_dict.get("error_msg"):
1244 sdn_info = wim_status_dict.get("error_msg") or ""
1245
1246 if pending_ports:
1247 if sdn_status != "ERROR":
1248 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1249 len(ports) - pending_ports, len(ports)
1250 )
1251
1252 if sdn_status == "ACTIVE":
1253 sdn_status = "BUILD"
1254
1255 ro_vim_item_update = {
1256 "vim_id": sdn_net_id,
1257 "vim_status": sdn_status,
1258 "created": created,
1259 "created_items": created_items,
1260 "connected_ports": connected_ports,
1261 "vim_details": sdn_info,
1262 "vim_message": None,
1263 "last_update": last_update,
1264 }
1265
1266 return sdn_status, ro_vim_item_update
1267 except Exception as e:
1268 self.logger.error(
1269 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1270 exc_info=not isinstance(
1271 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1272 ),
1273 )
1274 ro_vim_item_update = {
1275 "vim_status": "VIM_ERROR",
1276 "created": created,
1277 "vim_message": str(e),
1278 }
1279
1280 return "FAILED", ro_vim_item_update
1281
1282 def delete(self, ro_task, task_index):
1283 task = ro_task["tasks"][task_index]
1284 task_id = task["task_id"]
1285 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1286 ro_vim_item_update_ok = {
1287 "vim_status": "DELETED",
1288 "created": False,
1289 "vim_message": "DELETED",
1290 "vim_id": None,
1291 }
1292
1293 try:
1294 if sdn_vim_id:
1295 target_vim = self.my_vims[ro_task["target_id"]]
1296 target_vim.delete_connectivity_service(
1297 sdn_vim_id, ro_task["vim_info"].get("created_items")
1298 )
1299
1300 except Exception as e:
1301 if (
1302 isinstance(e, sdnconn.SdnConnectorError)
1303 and e.http_code == HTTPStatus.NOT_FOUND.value
1304 ):
1305 ro_vim_item_update_ok["vim_message"] = "already deleted"
1306 else:
1307 self.logger.error(
1308 "ro_task={} vim={} del-sdn-net={}: {}".format(
1309 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1310 ),
1311 exc_info=not isinstance(
1312 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1313 ),
1314 )
1315 ro_vim_item_update = {
1316 "vim_status": "VIM_ERROR",
1317 "vim_message": "Error while deleting: {}".format(e),
1318 }
1319
1320 return "FAILED", ro_vim_item_update
1321
1322 self.logger.debug(
1323 "task={} {} del-sdn-net={} {}".format(
1324 task_id,
1325 ro_task["target_id"],
1326 sdn_vim_id,
1327 ro_vim_item_update_ok.get("vim_message", ""),
1328 )
1329 )
1330
1331 return "DONE", ro_vim_item_update_ok
1332
1333
1334 class VimInteractionMigration(VimInteractionBase):
1335 def exec(self, ro_task, task_index, task_depends):
1336 task = ro_task["tasks"][task_index]
1337 task_id = task["task_id"]
1338 db_task_update = {"retries": 0}
1339 target_vim = self.my_vims[ro_task["target_id"]]
1340 vim_interfaces = []
1341 created = False
1342 created_items = {}
1343 refreshed_vim_info = {}
1344
1345 try:
1346 if task.get("params"):
1347 vim_vm_id = task["params"].get("vim_vm_id")
1348 migrate_host = task["params"].get("migrate_host")
1349 _, migrated_compute_node = target_vim.migrate_instance(
1350 vim_vm_id, migrate_host
1351 )
1352
1353 if migrated_compute_node:
1354 # When VM is migrated, vdu["vim_info"] needs to be updated
1355 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1356 ro_task["target_id"]
1357 )
1358
1359 # Refresh VM to get new vim_info
1360 vm_to_refresh_list = [vim_vm_id]
1361 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1362 refreshed_vim_info = vim_dict[vim_vm_id]
1363
1364 if refreshed_vim_info.get("interfaces"):
1365 for old_iface in vdu_old_vim_info.get("interfaces"):
1366 iface = next(
1367 (
1368 iface
1369 for iface in refreshed_vim_info["interfaces"]
1370 if old_iface["vim_interface_id"]
1371 == iface["vim_interface_id"]
1372 ),
1373 None,
1374 )
1375 vim_interfaces.append(iface)
1376
1377 ro_vim_item_update = {
1378 "vim_id": vim_vm_id,
1379 "vim_status": "ACTIVE",
1380 "created": created,
1381 "created_items": created_items,
1382 "vim_details": None,
1383 "vim_message": None,
1384 }
1385
1386 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1387 "ERROR",
1388 "VIM_ERROR",
1389 ):
1390 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1391
1392 if vim_interfaces:
1393 ro_vim_item_update["interfaces"] = vim_interfaces
1394
1395 self.logger.debug(
1396 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1397 )
1398
1399 return "DONE", ro_vim_item_update, db_task_update
1400
1401 except (vimconn.VimConnException, NsWorkerException) as e:
1402 self.logger.error(
1403 "task={} vim={} VM Migration:"
1404 " {}".format(task_id, ro_task["target_id"], e)
1405 )
1406 ro_vim_item_update = {
1407 "vim_status": "VIM_ERROR",
1408 "created": created,
1409 "vim_message": str(e),
1410 }
1411
1412 return "FAILED", ro_vim_item_update, db_task_update
1413
1414
1415 class VimInteractionResize(VimInteractionBase):
1416 def exec(self, ro_task, task_index, task_depends):
1417 task = ro_task["tasks"][task_index]
1418 task_id = task["task_id"]
1419 db_task_update = {"retries": 0}
1420 created = False
1421 target_flavor_uuid = None
1422 created_items = {}
1423 refreshed_vim_info = {}
1424 target_vim = self.my_vims[ro_task["target_id"]]
1425
1426 try:
1427 if task.get("params"):
1428 vim_vm_id = task["params"].get("vim_vm_id")
1429 flavor_dict = task["params"].get("flavor_dict")
1430 self.logger.info("flavor_dict %s", flavor_dict)
1431
1432 try:
1433 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1434 except Exception as e:
1435 self.logger.info("Cannot find any flavor matching %s.", str(e))
1436 try:
1437 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1438 except Exception as e:
1439 self.logger.error("Error creating flavor at VIM %s.", str(e))
1440
1441 if target_flavor_uuid is not None:
1442 resized_status = target_vim.resize_instance(
1443 vim_vm_id, target_flavor_uuid
1444 )
1445
1446 if resized_status:
1447 # Refresh VM to get new vim_info
1448 vm_to_refresh_list = [vim_vm_id]
1449 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1450 refreshed_vim_info = vim_dict[vim_vm_id]
1451
1452 ro_vim_item_update = {
1453 "vim_id": vim_vm_id,
1454 "vim_status": "DONE",
1455 "created": created,
1456 "created_items": created_items,
1457 "vim_details": None,
1458 "vim_message": None,
1459 }
1460
1461 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1462 "ERROR",
1463 "VIM_ERROR",
1464 ):
1465 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1466
1467 self.logger.debug(
1468 "task={} {} resize done".format(task_id, ro_task["target_id"])
1469 )
1470 return "DONE", ro_vim_item_update, db_task_update
1471 except (vimconn.VimConnException, NsWorkerException) as e:
1472 self.logger.error(
1473 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1474 )
1475 ro_vim_item_update = {
1476 "vim_status": "VIM_ERROR",
1477 "created": created,
1478 "vim_message": str(e),
1479 }
1480
1481 return "FAILED", ro_vim_item_update, db_task_update
1482
1483
1484 class ConfigValidate:
1485 def __init__(self, config: Dict):
1486 self.conf = config
1487
1488 @property
1489 def active(self):
1490 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1491 if (
1492 self.conf["period"]["refresh_active"] >= 60
1493 or self.conf["period"]["refresh_active"] == -1
1494 ):
1495 return self.conf["period"]["refresh_active"]
1496
1497 return 60
1498
1499 @property
1500 def build(self):
1501 return self.conf["period"]["refresh_build"]
1502
1503 @property
1504 def image(self):
1505 return self.conf["period"]["refresh_image"]
1506
1507 @property
1508 def error(self):
1509 return self.conf["period"]["refresh_error"]
1510
1511 @property
1512 def queue_size(self):
1513 return self.conf["period"]["queue_size"]
1514
1515
1516 class NsWorker(threading.Thread):
1517 def __init__(self, worker_index, config, plugins, db):
1518 """
1519 :param worker_index: thread index
1520 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1521 :param plugins: global shared dict with the loaded plugins
1522 :param db: database class instance to use
1523 """
1524 threading.Thread.__init__(self)
1525 self.config = config
1526 self.plugins = plugins
1527 self.plugin_name = "unknown"
1528 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1529 self.worker_index = worker_index
1530 # refresh periods for created items
1531 self.refresh_config = ConfigValidate(config)
1532 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1533 # targetvim: vimplugin class
1534 self.my_vims = {}
1535 # targetvim: vim information from database
1536 self.db_vims = {}
1537 # targetvim list
1538 self.vim_targets = []
1539 self.my_id = config["process_id"] + ":" + str(worker_index)
1540 self.db = db
1541 self.item2class = {
1542 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1543 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1544 "image": VimInteractionImage(
1545 self.db, self.my_vims, self.db_vims, self.logger
1546 ),
1547 "flavor": VimInteractionFlavor(
1548 self.db, self.my_vims, self.db_vims, self.logger
1549 ),
1550 "sdn_net": VimInteractionSdnNet(
1551 self.db, self.my_vims, self.db_vims, self.logger
1552 ),
1553 "update": VimInteractionUpdateVdu(
1554 self.db, self.my_vims, self.db_vims, self.logger
1555 ),
1556 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1557 self.db, self.my_vims, self.db_vims, self.logger
1558 ),
1559 "migrate": VimInteractionMigration(
1560 self.db, self.my_vims, self.db_vims, self.logger
1561 ),
1562 "verticalscale": VimInteractionResize(
1563 self.db, self.my_vims, self.db_vims, self.logger
1564 ),
1565 }
1566 self.time_last_task_processed = None
1567 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1568 self.tasks_to_delete = []
1569 # it is idle when there are not vim_targets associated
1570 self.idle = True
1571 self.task_locked_time = config["global"]["task_locked_time"]
1572
1573 def insert_task(self, task):
1574 try:
1575 self.task_queue.put(task, False)
1576 return None
1577 except queue.Full:
1578 raise NsWorkerException("timeout inserting a task")
1579
1580 def terminate(self):
1581 self.insert_task("exit")
1582
1583 def del_task(self, task):
1584 with self.task_lock:
1585 if task["status"] == "SCHEDULED":
1586 task["status"] = "SUPERSEDED"
1587 return True
1588 else: # task["status"] == "processing"
1589 self.task_lock.release()
1590 return False
1591
1592 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1593 """
1594 Process vim config, creating vim configuration files as ca_cert
1595 :param target_id: vim/sdn/wim + id
1596 :param db_vim: Vim dictionary obtained from database
1597 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1598 """
1599 if not db_vim.get("config"):
1600 return
1601
1602 file_name = ""
1603 work_dir = "/app/osm_ro/certs"
1604
1605 try:
1606 if db_vim["config"].get("ca_cert_content"):
1607 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1608
1609 if not path.isdir(file_name):
1610 makedirs(file_name)
1611
1612 file_name = file_name + "/ca_cert"
1613
1614 with open(file_name, "w") as f:
1615 f.write(db_vim["config"]["ca_cert_content"])
1616 del db_vim["config"]["ca_cert_content"]
1617 db_vim["config"]["ca_cert"] = file_name
1618 except Exception as e:
1619 raise NsWorkerException(
1620 "Error writing to file '{}': {}".format(file_name, e)
1621 )
1622
1623 def _load_plugin(self, name, type="vim"):
1624 # type can be vim or sdn
1625 if "rovim_dummy" not in self.plugins:
1626 self.plugins["rovim_dummy"] = VimDummyConnector
1627
1628 if "rosdn_dummy" not in self.plugins:
1629 self.plugins["rosdn_dummy"] = SdnDummyConnector
1630
1631 if name in self.plugins:
1632 return self.plugins[name]
1633
1634 try:
1635 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1636 self.plugins[name] = ep.load()
1637 except Exception as e:
1638 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1639
1640 if name and name not in self.plugins:
1641 raise NsWorkerException(
1642 "Plugin 'osm_{n}' has not been installed".format(n=name)
1643 )
1644
1645 return self.plugins[name]
1646
1647 def _unload_vim(self, target_id):
1648 """
1649 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1650 :param target_id: Contains type:_id; where type can be 'vim', ...
1651 :return: None.
1652 """
1653 try:
1654 self.db_vims.pop(target_id, None)
1655 self.my_vims.pop(target_id, None)
1656
1657 if target_id in self.vim_targets:
1658 self.vim_targets.remove(target_id)
1659
1660 self.logger.info("Unloaded {}".format(target_id))
1661 except Exception as e:
1662 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1663
1664 def _check_vim(self, target_id):
1665 """
1666 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1667 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1668 :return: None.
1669 """
1670 target, _, _id = target_id.partition(":")
1671 now = time.time()
1672 update_dict = {}
1673 unset_dict = {}
1674 op_text = ""
1675 step = ""
1676 loaded = target_id in self.vim_targets
1677 target_database = (
1678 "vim_accounts"
1679 if target == "vim"
1680 else "wim_accounts"
1681 if target == "wim"
1682 else "sdns"
1683 )
1684
1685 try:
1686 step = "Getting {} from db".format(target_id)
1687 db_vim = self.db.get_one(target_database, {"_id": _id})
1688
1689 for op_index, operation in enumerate(
1690 db_vim["_admin"].get("operations", ())
1691 ):
1692 if operation["operationState"] != "PROCESSING":
1693 continue
1694
1695 locked_at = operation.get("locked_at")
1696
1697 if locked_at is not None and locked_at >= now - self.task_locked_time:
1698 # some other thread is doing this operation
1699 return
1700
1701 # lock
1702 op_text = "_admin.operations.{}.".format(op_index)
1703
1704 if not self.db.set_one(
1705 target_database,
1706 q_filter={
1707 "_id": _id,
1708 op_text + "operationState": "PROCESSING",
1709 op_text + "locked_at": locked_at,
1710 },
1711 update_dict={
1712 op_text + "locked_at": now,
1713 "admin.current_operation": op_index,
1714 },
1715 fail_on_empty=False,
1716 ):
1717 return
1718
1719 unset_dict[op_text + "locked_at"] = None
1720 unset_dict["current_operation"] = None
1721 step = "Loading " + target_id
1722 error_text = self._load_vim(target_id)
1723
1724 if not error_text:
1725 step = "Checking connectivity"
1726
1727 if target == "vim":
1728 self.my_vims[target_id].check_vim_connectivity()
1729 else:
1730 self.my_vims[target_id].check_credentials()
1731
1732 update_dict["_admin.operationalState"] = "ENABLED"
1733 update_dict["_admin.detailed-status"] = ""
1734 unset_dict[op_text + "detailed-status"] = None
1735 update_dict[op_text + "operationState"] = "COMPLETED"
1736
1737 return
1738
1739 except Exception as e:
1740 error_text = "{}: {}".format(step, e)
1741 self.logger.error("{} for {}: {}".format(step, target_id, e))
1742
1743 finally:
1744 if update_dict or unset_dict:
1745 if error_text:
1746 update_dict[op_text + "operationState"] = "FAILED"
1747 update_dict[op_text + "detailed-status"] = error_text
1748 unset_dict.pop(op_text + "detailed-status", None)
1749 update_dict["_admin.operationalState"] = "ERROR"
1750 update_dict["_admin.detailed-status"] = error_text
1751
1752 if op_text:
1753 update_dict[op_text + "statusEnteredTime"] = now
1754
1755 self.db.set_one(
1756 target_database,
1757 q_filter={"_id": _id},
1758 update_dict=update_dict,
1759 unset=unset_dict,
1760 fail_on_empty=False,
1761 )
1762
1763 if not loaded:
1764 self._unload_vim(target_id)
1765
1766 def _reload_vim(self, target_id):
1767 if target_id in self.vim_targets:
1768 self._load_vim(target_id)
1769 else:
1770 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1771 # just remove it to force load again next time it is needed
1772 self.db_vims.pop(target_id, None)
1773
1774 def _load_vim(self, target_id):
1775 """
1776 Load or reload a vim_account, sdn_controller or wim_account.
1777 Read content from database, load the plugin if not loaded.
1778 In case of error loading the plugin, it load a failing VIM_connector
1779 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1780 :param target_id: Contains type:_id; where type can be 'vim', ...
1781 :return: None if ok, descriptive text if error
1782 """
1783 target, _, _id = target_id.partition(":")
1784 target_database = (
1785 "vim_accounts"
1786 if target == "vim"
1787 else "wim_accounts"
1788 if target == "wim"
1789 else "sdns"
1790 )
1791 plugin_name = ""
1792 vim = None
1793 step = "Getting {}={} from db".format(target, _id)
1794
1795 try:
1796 # TODO process for wim, sdnc, ...
1797 vim = self.db.get_one(target_database, {"_id": _id})
1798
1799 # if deep_get(vim, "config", "sdn-controller"):
1800 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1801 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1802
1803 step = "Decrypting password"
1804 schema_version = vim.get("schema_version")
1805 self.db.encrypt_decrypt_fields(
1806 vim,
1807 "decrypt",
1808 fields=("password", "secret"),
1809 schema_version=schema_version,
1810 salt=_id,
1811 )
1812 self._process_vim_config(target_id, vim)
1813
1814 if target == "vim":
1815 plugin_name = "rovim_" + vim["vim_type"]
1816 step = "Loading plugin '{}'".format(plugin_name)
1817 vim_module_conn = self._load_plugin(plugin_name)
1818 step = "Loading {}'".format(target_id)
1819 self.my_vims[target_id] = vim_module_conn(
1820 uuid=vim["_id"],
1821 name=vim["name"],
1822 tenant_id=vim.get("vim_tenant_id"),
1823 tenant_name=vim.get("vim_tenant_name"),
1824 url=vim["vim_url"],
1825 url_admin=None,
1826 user=vim["vim_user"],
1827 passwd=vim["vim_password"],
1828 config=vim.get("config") or {},
1829 persistent_info={},
1830 )
1831 else: # sdn
1832 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1833 step = "Loading plugin '{}'".format(plugin_name)
1834 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1835 step = "Loading {}'".format(target_id)
1836 wim = deepcopy(vim)
1837 wim_config = wim.pop("config", {}) or {}
1838 wim["uuid"] = wim["_id"]
1839 if "url" in wim and "wim_url" not in wim:
1840 wim["wim_url"] = wim["url"]
1841 elif "url" not in wim and "wim_url" in wim:
1842 wim["url"] = wim["wim_url"]
1843
1844 if wim.get("dpid"):
1845 wim_config["dpid"] = wim.pop("dpid")
1846
1847 if wim.get("switch_id"):
1848 wim_config["switch_id"] = wim.pop("switch_id")
1849
1850 # wim, wim_account, config
1851 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1852 self.db_vims[target_id] = vim
1853 self.error_status = None
1854
1855 self.logger.info(
1856 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1857 )
1858 except Exception as e:
1859 self.logger.error(
1860 "Cannot load {} plugin={}: {} {}".format(
1861 target_id, plugin_name, step, e
1862 )
1863 )
1864
1865 self.db_vims[target_id] = vim or {}
1866 self.db_vims[target_id] = FailingConnector(str(e))
1867 error_status = "{} Error: {}".format(step, e)
1868
1869 return error_status
1870 finally:
1871 if target_id not in self.vim_targets:
1872 self.vim_targets.append(target_id)
1873
1874 def _get_db_task(self):
1875 """
1876 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1877 :return: None
1878 """
1879 now = time.time()
1880
1881 if not self.time_last_task_processed:
1882 self.time_last_task_processed = now
1883
1884 try:
1885 while True:
1886 """
1887 # Log RO tasks only when loglevel is DEBUG
1888 if self.logger.getEffectiveLevel() == logging.DEBUG:
1889 self._log_ro_task(
1890 None,
1891 None,
1892 None,
1893 "TASK_WF",
1894 "task_locked_time="
1895 + str(self.task_locked_time)
1896 + " "
1897 + "time_last_task_processed="
1898 + str(self.time_last_task_processed)
1899 + " "
1900 + "now="
1901 + str(now),
1902 )
1903 """
1904 locked = self.db.set_one(
1905 "ro_tasks",
1906 q_filter={
1907 "target_id": self.vim_targets,
1908 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1909 "locked_at.lt": now - self.task_locked_time,
1910 "to_check_at.lt": self.time_last_task_processed,
1911 "to_check_at.gt": -1,
1912 },
1913 update_dict={"locked_by": self.my_id, "locked_at": now},
1914 fail_on_empty=False,
1915 )
1916
1917 if locked:
1918 # read and return
1919 ro_task = self.db.get_one(
1920 "ro_tasks",
1921 q_filter={
1922 "target_id": self.vim_targets,
1923 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1924 "locked_at": now,
1925 },
1926 )
1927 return ro_task
1928
1929 if self.time_last_task_processed == now:
1930 self.time_last_task_processed = None
1931 return None
1932 else:
1933 self.time_last_task_processed = now
1934 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1935
1936 except DbException as e:
1937 self.logger.error("Database exception at _get_db_task: {}".format(e))
1938 except Exception as e:
1939 self.logger.critical(
1940 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1941 )
1942
1943 return None
1944
1945 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1946 """
1947 Determine if this task need to be done or superseded
1948 :return: None
1949 """
1950 my_task = ro_task["tasks"][task_index]
1951 task_id = my_task["task_id"]
1952 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1953 "created_items", False
1954 )
1955
1956 self.logger.debug("Needed delete: {}".format(needed_delete))
1957 if my_task["status"] == "FAILED":
1958 return None, None # TODO need to be retry??
1959
1960 try:
1961 for index, task in enumerate(ro_task["tasks"]):
1962 if index == task_index or not task:
1963 continue # own task
1964
1965 if (
1966 my_task["target_record"] == task["target_record"]
1967 and task["action"] == "CREATE"
1968 ):
1969 # set to finished
1970 db_update["tasks.{}.status".format(index)] = task[
1971 "status"
1972 ] = "FINISHED"
1973 elif task["action"] == "CREATE" and task["status"] not in (
1974 "FINISHED",
1975 "SUPERSEDED",
1976 ):
1977 needed_delete = False
1978
1979 if needed_delete:
1980 self.logger.debug(
1981 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
1982 )
1983 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1984 else:
1985 return "SUPERSEDED", None
1986 except Exception as e:
1987 if not isinstance(e, NsWorkerException):
1988 self.logger.critical(
1989 "Unexpected exception at _delete_task task={}: {}".format(
1990 task_id, e
1991 ),
1992 exc_info=True,
1993 )
1994
1995 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
1996
1997 def _create_task(self, ro_task, task_index, task_depends, db_update):
1998 """
1999 Determine if this task need to create something at VIM
2000 :return: None
2001 """
2002 my_task = ro_task["tasks"][task_index]
2003 task_id = my_task["task_id"]
2004 task_status = None
2005
2006 if my_task["status"] == "FAILED":
2007 return None, None # TODO need to be retry??
2008 elif my_task["status"] == "SCHEDULED":
2009 # check if already created by another task
2010 for index, task in enumerate(ro_task["tasks"]):
2011 if index == task_index or not task:
2012 continue # own task
2013
2014 if task["action"] == "CREATE" and task["status"] not in (
2015 "SCHEDULED",
2016 "FINISHED",
2017 "SUPERSEDED",
2018 ):
2019 return task["status"], "COPY_VIM_INFO"
2020
2021 try:
2022 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2023 ro_task, task_index, task_depends
2024 )
2025 # TODO update other CREATE tasks
2026 except Exception as e:
2027 if not isinstance(e, NsWorkerException):
2028 self.logger.error(
2029 "Error executing task={}: {}".format(task_id, e), exc_info=True
2030 )
2031
2032 task_status = "FAILED"
2033 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2034 # TODO update ro_vim_item_update
2035
2036 return task_status, ro_vim_item_update
2037 else:
2038 return None, None
2039
2040 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2041 """
2042 Look for dependency task
2043 :param task_id: Can be one of
2044 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2045 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2046 3. task.task_id: "<action_id>:number"
2047 :param ro_task:
2048 :param target_id:
2049 :return: database ro_task plus index of task
2050 """
2051 if (
2052 task_id.startswith("vim:")
2053 or task_id.startswith("sdn:")
2054 or task_id.startswith("wim:")
2055 ):
2056 target_id, _, task_id = task_id.partition(" ")
2057
2058 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2059 ro_task_dependency = self.db.get_one(
2060 "ro_tasks",
2061 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2062 fail_on_empty=False,
2063 )
2064
2065 if ro_task_dependency:
2066 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2067 if task["target_record_id"] == task_id:
2068 return ro_task_dependency, task_index
2069
2070 else:
2071 if ro_task:
2072 for task_index, task in enumerate(ro_task["tasks"]):
2073 if task and task["task_id"] == task_id:
2074 return ro_task, task_index
2075
2076 ro_task_dependency = self.db.get_one(
2077 "ro_tasks",
2078 q_filter={
2079 "tasks.ANYINDEX.task_id": task_id,
2080 "tasks.ANYINDEX.target_record.ne": None,
2081 },
2082 fail_on_empty=False,
2083 )
2084
2085 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2086 if ro_task_dependency:
2087 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2088 if task["task_id"] == task_id:
2089 return ro_task_dependency, task_index
2090 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2091
2092 def update_vm_refresh(self, ro_task):
2093 """Enables the VM status updates if self.refresh_config.active parameter
2094 is not -1 and then updates the DB accordingly
2095
2096 """
2097 try:
2098 self.logger.debug("Checking if VM status update config")
2099 next_refresh = time.time()
2100 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2101
2102 if next_refresh != -1:
2103 db_ro_task_update = {}
2104 now = time.time()
2105 next_check_at = now + (24 * 60 * 60)
2106 next_check_at = min(next_check_at, next_refresh)
2107 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2108 db_ro_task_update["to_check_at"] = next_check_at
2109
2110 self.logger.debug(
2111 "Finding tasks which to be updated to enable VM status updates"
2112 )
2113 refresh_tasks = self.db.get_list(
2114 "ro_tasks",
2115 q_filter={
2116 "tasks.status": "DONE",
2117 "to_check_at.lt": 0,
2118 },
2119 )
2120 self.logger.debug("Updating tasks to change the to_check_at status")
2121 for task in refresh_tasks:
2122 q_filter = {
2123 "_id": task["_id"],
2124 }
2125 self.db.set_one(
2126 "ro_tasks",
2127 q_filter=q_filter,
2128 update_dict=db_ro_task_update,
2129 fail_on_empty=True,
2130 )
2131
2132 except Exception as e:
2133 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2134
2135 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2136 """Decide the next_refresh according to vim type and refresh config period.
2137 Args:
2138 ro_task (dict): ro_task details
2139 next_refresh (float): next refresh time as epoch format
2140
2141 Returns:
2142 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2143 """
2144 target_vim = ro_task["target_id"]
2145 vim_type = self.db_vims[target_vim]["vim_type"]
2146 if self.refresh_config.active == -1 or vim_type == "openstack":
2147 next_refresh = -1
2148 else:
2149 next_refresh += self.refresh_config.active
2150 return next_refresh
2151
2152 def _process_pending_tasks(self, ro_task):
2153 ro_task_id = ro_task["_id"]
2154 now = time.time()
2155 # one day
2156 next_check_at = now + (24 * 60 * 60)
2157 db_ro_task_update = {}
2158
2159 def _update_refresh(new_status):
2160 # compute next_refresh
2161 nonlocal task
2162 nonlocal next_check_at
2163 nonlocal db_ro_task_update
2164 nonlocal ro_task
2165
2166 next_refresh = time.time()
2167
2168 if task["item"] in ("image", "flavor"):
2169 next_refresh += self.refresh_config.image
2170 elif new_status == "BUILD":
2171 next_refresh += self.refresh_config.build
2172 elif new_status == "DONE":
2173 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2174 else:
2175 next_refresh += self.refresh_config.error
2176
2177 next_check_at = min(next_check_at, next_refresh)
2178 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2179 ro_task["vim_info"]["refresh_at"] = next_refresh
2180
2181 try:
2182 """
2183 # Log RO tasks only when loglevel is DEBUG
2184 if self.logger.getEffectiveLevel() == logging.DEBUG:
2185 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2186 """
2187 # Check if vim status refresh is enabled again
2188 self.update_vm_refresh(ro_task)
2189 # 0: get task_status_create
2190 lock_object = None
2191 task_status_create = None
2192 task_create = next(
2193 (
2194 t
2195 for t in ro_task["tasks"]
2196 if t
2197 and t["action"] == "CREATE"
2198 and t["status"] in ("BUILD", "DONE")
2199 ),
2200 None,
2201 )
2202
2203 if task_create:
2204 task_status_create = task_create["status"]
2205
2206 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2207 for task_action in ("DELETE", "CREATE", "EXEC"):
2208 db_vim_update = None
2209 new_status = None
2210
2211 for task_index, task in enumerate(ro_task["tasks"]):
2212 if not task:
2213 continue # task deleted
2214
2215 task_depends = {}
2216 target_update = None
2217
2218 if (
2219 (
2220 task_action in ("DELETE", "EXEC")
2221 and task["status"] not in ("SCHEDULED", "BUILD")
2222 )
2223 or task["action"] != task_action
2224 or (
2225 task_action == "CREATE"
2226 and task["status"] in ("FINISHED", "SUPERSEDED")
2227 )
2228 ):
2229 continue
2230
2231 task_path = "tasks.{}.status".format(task_index)
2232 try:
2233 db_vim_info_update = None
2234
2235 if task["status"] == "SCHEDULED":
2236 # check if tasks that this depends on have been completed
2237 dependency_not_completed = False
2238
2239 for dependency_task_id in task.get("depends_on") or ():
2240 (
2241 dependency_ro_task,
2242 dependency_task_index,
2243 ) = self._get_dependency(
2244 dependency_task_id, target_id=ro_task["target_id"]
2245 )
2246 dependency_task = dependency_ro_task["tasks"][
2247 dependency_task_index
2248 ]
2249 self.logger.debug(
2250 "dependency_ro_task={} dependency_task_index={}".format(
2251 dependency_ro_task, dependency_task_index
2252 )
2253 )
2254
2255 if dependency_task["status"] == "SCHEDULED":
2256 dependency_not_completed = True
2257 next_check_at = min(
2258 next_check_at, dependency_ro_task["to_check_at"]
2259 )
2260 # must allow dependent task to be processed first
2261 # to do this set time after last_task_processed
2262 next_check_at = max(
2263 self.time_last_task_processed, next_check_at
2264 )
2265 break
2266 elif dependency_task["status"] == "FAILED":
2267 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2268 task["action"],
2269 task["item"],
2270 dependency_task["action"],
2271 dependency_task["item"],
2272 dependency_task_id,
2273 dependency_ro_task["vim_info"].get(
2274 "vim_message"
2275 ),
2276 )
2277 self.logger.error(
2278 "task={} {}".format(task["task_id"], error_text)
2279 )
2280 raise NsWorkerException(error_text)
2281
2282 task_depends[dependency_task_id] = dependency_ro_task[
2283 "vim_info"
2284 ]["vim_id"]
2285 task_depends[
2286 "TASK-{}".format(dependency_task_id)
2287 ] = dependency_ro_task["vim_info"]["vim_id"]
2288
2289 if dependency_not_completed:
2290 self.logger.warning(
2291 "DEPENDENCY NOT COMPLETED {}".format(
2292 dependency_ro_task["vim_info"]["vim_id"]
2293 )
2294 )
2295 # TODO set at vim_info.vim_details that it is waiting
2296 continue
2297
2298 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2299 # the task of renew this locking. It will update database locket_at periodically
2300 if not lock_object:
2301 lock_object = LockRenew.add_lock_object(
2302 "ro_tasks", ro_task, self
2303 )
2304
2305 if task["action"] == "DELETE":
2306 (
2307 new_status,
2308 db_vim_info_update,
2309 ) = self._delete_task(
2310 ro_task, task_index, task_depends, db_ro_task_update
2311 )
2312 new_status = (
2313 "FINISHED" if new_status == "DONE" else new_status
2314 )
2315 # ^with FINISHED instead of DONE it will not be refreshing
2316
2317 if new_status in ("FINISHED", "SUPERSEDED"):
2318 target_update = "DELETE"
2319 elif task["action"] == "EXEC":
2320 (
2321 new_status,
2322 db_vim_info_update,
2323 db_task_update,
2324 ) = self.item2class[task["item"]].exec(
2325 ro_task, task_index, task_depends
2326 )
2327 new_status = (
2328 "FINISHED" if new_status == "DONE" else new_status
2329 )
2330 # ^with FINISHED instead of DONE it will not be refreshing
2331
2332 if db_task_update:
2333 # load into database the modified db_task_update "retries" and "next_retry"
2334 if db_task_update.get("retries"):
2335 db_ro_task_update[
2336 "tasks.{}.retries".format(task_index)
2337 ] = db_task_update["retries"]
2338
2339 next_check_at = time.time() + db_task_update.get(
2340 "next_retry", 60
2341 )
2342 target_update = None
2343 elif task["action"] == "CREATE":
2344 if task["status"] == "SCHEDULED":
2345 if task_status_create:
2346 new_status = task_status_create
2347 target_update = "COPY_VIM_INFO"
2348 else:
2349 new_status, db_vim_info_update = self.item2class[
2350 task["item"]
2351 ].new(ro_task, task_index, task_depends)
2352 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2353 _update_refresh(new_status)
2354 else:
2355 refresh_at = ro_task["vim_info"]["refresh_at"]
2356 if refresh_at and refresh_at != -1 and now > refresh_at:
2357 (
2358 new_status,
2359 db_vim_info_update,
2360 ) = self.item2class[
2361 task["item"]
2362 ].refresh(ro_task)
2363 _update_refresh(new_status)
2364 else:
2365 # The refresh is updated to avoid set the value of "refresh_at" to
2366 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2367 # because it can happen that in this case the task is never processed
2368 _update_refresh(task["status"])
2369
2370 except Exception as e:
2371 new_status = "FAILED"
2372 db_vim_info_update = {
2373 "vim_status": "VIM_ERROR",
2374 "vim_message": str(e),
2375 }
2376
2377 if not isinstance(
2378 e, (NsWorkerException, vimconn.VimConnException)
2379 ):
2380 self.logger.error(
2381 "Unexpected exception at _delete_task task={}: {}".format(
2382 task["task_id"], e
2383 ),
2384 exc_info=True,
2385 )
2386
2387 try:
2388 if db_vim_info_update:
2389 db_vim_update = db_vim_info_update.copy()
2390 db_ro_task_update.update(
2391 {
2392 "vim_info." + k: v
2393 for k, v in db_vim_info_update.items()
2394 }
2395 )
2396 ro_task["vim_info"].update(db_vim_info_update)
2397
2398 if new_status:
2399 if task_action == "CREATE":
2400 task_status_create = new_status
2401 db_ro_task_update[task_path] = new_status
2402
2403 if target_update or db_vim_update:
2404 if target_update == "DELETE":
2405 self._update_target(task, None)
2406 elif target_update == "COPY_VIM_INFO":
2407 self._update_target(task, ro_task["vim_info"])
2408 else:
2409 self._update_target(task, db_vim_update)
2410
2411 except Exception as e:
2412 if (
2413 isinstance(e, DbException)
2414 and e.http_code == HTTPStatus.NOT_FOUND
2415 ):
2416 # if the vnfrs or nsrs has been removed from database, this task must be removed
2417 self.logger.debug(
2418 "marking to delete task={}".format(task["task_id"])
2419 )
2420 self.tasks_to_delete.append(task)
2421 else:
2422 self.logger.error(
2423 "Unexpected exception at _update_target task={}: {}".format(
2424 task["task_id"], e
2425 ),
2426 exc_info=True,
2427 )
2428
2429 locked_at = ro_task["locked_at"]
2430
2431 if lock_object:
2432 locked_at = [
2433 lock_object["locked_at"],
2434 lock_object["locked_at"] + self.task_locked_time,
2435 ]
2436 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2437 # contain exactly locked_at + self.task_locked_time
2438 LockRenew.remove_lock_object(lock_object)
2439
2440 q_filter = {
2441 "_id": ro_task["_id"],
2442 "to_check_at": ro_task["to_check_at"],
2443 "locked_at": locked_at,
2444 }
2445 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2446 # outside this task (by ro_nbi) do not update it
2447 db_ro_task_update["locked_by"] = None
2448 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2449 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2450 db_ro_task_update["modified_at"] = now
2451 db_ro_task_update["to_check_at"] = next_check_at
2452
2453 """
2454 # Log RO tasks only when loglevel is DEBUG
2455 if self.logger.getEffectiveLevel() == logging.DEBUG:
2456 db_ro_task_update_log = db_ro_task_update.copy()
2457 db_ro_task_update_log["_id"] = q_filter["_id"]
2458 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2459 """
2460
2461 if not self.db.set_one(
2462 "ro_tasks",
2463 update_dict=db_ro_task_update,
2464 q_filter=q_filter,
2465 fail_on_empty=False,
2466 ):
2467 del db_ro_task_update["to_check_at"]
2468 del q_filter["to_check_at"]
2469 """
2470 # Log RO tasks only when loglevel is DEBUG
2471 if self.logger.getEffectiveLevel() == logging.DEBUG:
2472 self._log_ro_task(
2473 None,
2474 db_ro_task_update_log,
2475 None,
2476 "TASK_WF",
2477 "SET_TASK " + str(q_filter),
2478 )
2479 """
2480 self.db.set_one(
2481 "ro_tasks",
2482 q_filter=q_filter,
2483 update_dict=db_ro_task_update,
2484 fail_on_empty=True,
2485 )
2486 except DbException as e:
2487 self.logger.error(
2488 "ro_task={} Error updating database {}".format(ro_task_id, e)
2489 )
2490 except Exception as e:
2491 self.logger.error(
2492 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2493 )
2494
2495 def _update_target(self, task, ro_vim_item_update):
2496 table, _, temp = task["target_record"].partition(":")
2497 _id, _, path_vim_status = temp.partition(":")
2498 path_item = path_vim_status[: path_vim_status.rfind(".")]
2499 path_item = path_item[: path_item.rfind(".")]
2500 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2501 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2502
2503 if ro_vim_item_update:
2504 update_dict = {
2505 path_vim_status + "." + k: v
2506 for k, v in ro_vim_item_update.items()
2507 if k
2508 in (
2509 "vim_id",
2510 "vim_details",
2511 "vim_message",
2512 "vim_name",
2513 "vim_status",
2514 "interfaces",
2515 "interfaces_backup",
2516 )
2517 }
2518
2519 if path_vim_status.startswith("vdur."):
2520 # for backward compatibility, add vdur.name apart from vdur.vim_name
2521 if ro_vim_item_update.get("vim_name"):
2522 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2523
2524 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2525 if ro_vim_item_update.get("vim_id"):
2526 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2527
2528 # update general status
2529 if ro_vim_item_update.get("vim_status"):
2530 update_dict[path_item + ".status"] = ro_vim_item_update[
2531 "vim_status"
2532 ]
2533
2534 if ro_vim_item_update.get("interfaces"):
2535 path_interfaces = path_item + ".interfaces"
2536
2537 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2538 if iface:
2539 update_dict.update(
2540 {
2541 path_interfaces + ".{}.".format(i) + k: v
2542 for k, v in iface.items()
2543 if k in ("vlan", "compute_node", "pci")
2544 }
2545 )
2546
2547 # put ip_address and mac_address with ip-address and mac-address
2548 if iface.get("ip_address"):
2549 update_dict[
2550 path_interfaces + ".{}.".format(i) + "ip-address"
2551 ] = iface["ip_address"]
2552
2553 if iface.get("mac_address"):
2554 update_dict[
2555 path_interfaces + ".{}.".format(i) + "mac-address"
2556 ] = iface["mac_address"]
2557
2558 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2559 update_dict["ip-address"] = iface.get("ip_address").split(
2560 ";"
2561 )[0]
2562
2563 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2564 update_dict[path_item + ".ip-address"] = iface.get(
2565 "ip_address"
2566 ).split(";")[0]
2567
2568 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2569
2570 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2571 if ro_vim_item_update.get("interfaces"):
2572 search_key = path_vim_status + ".interfaces"
2573 if update_dict.get(search_key):
2574 interfaces_backup_update = {
2575 path_vim_status + ".interfaces_backup": update_dict[search_key]
2576 }
2577
2578 self.db.set_one(
2579 table,
2580 q_filter={"_id": _id},
2581 update_dict=interfaces_backup_update,
2582 )
2583
2584 else:
2585 update_dict = {path_item + ".status": "DELETED"}
2586 self.db.set_one(
2587 table,
2588 q_filter={"_id": _id},
2589 update_dict=update_dict,
2590 unset={path_vim_status: None},
2591 )
2592
2593 def _process_delete_db_tasks(self):
2594 """
2595 Delete task from database because vnfrs or nsrs or both have been deleted
2596 :return: None. Uses and modify self.tasks_to_delete
2597 """
2598 while self.tasks_to_delete:
2599 task = self.tasks_to_delete[0]
2600 vnfrs_deleted = None
2601 nsr_id = task["nsr_id"]
2602
2603 if task["target_record"].startswith("vnfrs:"):
2604 # check if nsrs is present
2605 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2606 vnfrs_deleted = task["target_record"].split(":")[1]
2607
2608 try:
2609 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2610 except Exception as e:
2611 self.logger.error(
2612 "Error deleting task={}: {}".format(task["task_id"], e)
2613 )
2614 self.tasks_to_delete.pop(0)
2615
2616 @staticmethod
2617 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2618 """
2619 Static method because it is called from osm_ng_ro.ns
2620 :param db: instance of database to use
2621 :param nsr_id: affected nsrs id
2622 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2623 :return: None, exception is fails
2624 """
2625 retries = 5
2626 for retry in range(retries):
2627 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2628 now = time.time()
2629 conflict = False
2630
2631 for ro_task in ro_tasks:
2632 db_update = {}
2633 to_delete_ro_task = True
2634
2635 for index, task in enumerate(ro_task["tasks"]):
2636 if not task:
2637 pass
2638 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2639 vnfrs_deleted
2640 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2641 ):
2642 db_update["tasks.{}".format(index)] = None
2643 else:
2644 # used by other nsr, ro_task cannot be deleted
2645 to_delete_ro_task = False
2646
2647 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2648 if to_delete_ro_task:
2649 if not db.del_one(
2650 "ro_tasks",
2651 q_filter={
2652 "_id": ro_task["_id"],
2653 "modified_at": ro_task["modified_at"],
2654 },
2655 fail_on_empty=False,
2656 ):
2657 conflict = True
2658 elif db_update:
2659 db_update["modified_at"] = now
2660 if not db.set_one(
2661 "ro_tasks",
2662 q_filter={
2663 "_id": ro_task["_id"],
2664 "modified_at": ro_task["modified_at"],
2665 },
2666 update_dict=db_update,
2667 fail_on_empty=False,
2668 ):
2669 conflict = True
2670 if not conflict:
2671 return
2672 else:
2673 raise NsWorkerException("Exceeded {} retries".format(retries))
2674
2675 def run(self):
2676 # load database
2677 self.logger.info("Starting")
2678 while True:
2679 # step 1: get commands from queue
2680 try:
2681 if self.vim_targets:
2682 task = self.task_queue.get(block=False)
2683 else:
2684 if not self.idle:
2685 self.logger.debug("enters in idle state")
2686 self.idle = True
2687 task = self.task_queue.get(block=True)
2688 self.idle = False
2689
2690 if task[0] == "terminate":
2691 break
2692 elif task[0] == "load_vim":
2693 self.logger.info("order to load vim {}".format(task[1]))
2694 self._load_vim(task[1])
2695 elif task[0] == "unload_vim":
2696 self.logger.info("order to unload vim {}".format(task[1]))
2697 self._unload_vim(task[1])
2698 elif task[0] == "reload_vim":
2699 self._reload_vim(task[1])
2700 elif task[0] == "check_vim":
2701 self.logger.info("order to check vim {}".format(task[1]))
2702 self._check_vim(task[1])
2703 continue
2704 except Exception as e:
2705 if isinstance(e, queue.Empty):
2706 pass
2707 else:
2708 self.logger.critical(
2709 "Error processing task: {}".format(e), exc_info=True
2710 )
2711
2712 # step 2: process pending_tasks, delete not needed tasks
2713 try:
2714 if self.tasks_to_delete:
2715 self._process_delete_db_tasks()
2716 busy = False
2717 """
2718 # Log RO tasks only when loglevel is DEBUG
2719 if self.logger.getEffectiveLevel() == logging.DEBUG:
2720 _ = self._get_db_all_tasks()
2721 """
2722 ro_task = self._get_db_task()
2723 if ro_task:
2724 self.logger.debug("Task to process: {}".format(ro_task))
2725 time.sleep(1)
2726 self._process_pending_tasks(ro_task)
2727 busy = True
2728 if not busy:
2729 time.sleep(5)
2730 except Exception as e:
2731 self.logger.critical(
2732 "Unexpected exception at run: " + str(e), exc_info=True
2733 )
2734
2735 self.logger.info("Finishing")