Fixes multiattach issues in attaching and deletion
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn
43 from osm_ro_plugin import vimconn
44 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 import yaml
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
223
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
234
235 return "FAILED", ro_vim_item_update
236
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
243
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
247
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
263
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 ro_vim_item_update.get("vim_message")
289 if ro_vim_item_update.get("vim_status") != "ACTIVE"
290 else "",
291 )
292 )
293
294 return task_status, ro_vim_item_update
295
296 def delete(self, ro_task, task_index):
297 task = ro_task["tasks"][task_index]
298 task_id = task["task_id"]
299 net_vim_id = ro_task["vim_info"]["vim_id"]
300 ro_vim_item_update_ok = {
301 "vim_status": "DELETED",
302 "created": False,
303 "vim_message": "DELETED",
304 "vim_id": None,
305 }
306
307 try:
308 if net_vim_id or ro_task["vim_info"]["created_items"]:
309 target_vim = self.my_vims[ro_task["target_id"]]
310 target_vim.delete_network(
311 net_vim_id, ro_task["vim_info"]["created_items"]
312 )
313 except vimconn.VimConnNotFoundException:
314 ro_vim_item_update_ok["vim_message"] = "already deleted"
315 except vimconn.VimConnException as e:
316 self.logger.error(
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task["_id"], ro_task["target_id"], net_vim_id, e
319 )
320 )
321 ro_vim_item_update = {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e),
324 }
325
326 return "FAILED", ro_vim_item_update
327
328 self.logger.debug(
329 "task={} {} del-net={} {}".format(
330 task_id,
331 ro_task["target_id"],
332 net_vim_id,
333 ro_vim_item_update_ok.get("vim_message", ""),
334 )
335 )
336
337 return "DONE", ro_vim_item_update_ok
338
339
340 class VimInteractionVdu(VimInteractionBase):
341 max_retries_inject_ssh_key = 20 # 20 times
342 time_retries_inject_ssh_key = 30 # wevery 30 seconds
343
344 def new(self, ro_task, task_index, task_depends):
345 task = ro_task["tasks"][task_index]
346 task_id = task["task_id"]
347 created = False
348 created_items = {}
349 target_vim = self.my_vims[ro_task["target_id"]]
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
392 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393
394 # add to created items previous_created_volumes (healing)
395 if task.get("previous_created_volumes"):
396 for k, v in task["previous_created_volumes"].items():
397 created_items[k] = v
398
399 ro_vim_item_update = {
400 "vim_id": vim_vm_id,
401 "vim_status": "BUILD",
402 "created": created,
403 "created_items": created_items,
404 "vim_details": None,
405 "vim_message": None,
406 "interfaces_vim_ids": interfaces,
407 "interfaces": [],
408 "interfaces_backup": [],
409 }
410 self.logger.debug(
411 "task={} {} new-vm={} created={}".format(
412 task_id, ro_task["target_id"], vim_vm_id, created
413 )
414 )
415
416 return "BUILD", ro_vim_item_update
417 except (vimconn.VimConnException, NsWorkerException) as e:
418 self.logger.debug(traceback.format_exc())
419 self.logger.error(
420 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
421 )
422 ro_vim_item_update = {
423 "vim_status": "VIM_ERROR",
424 "created": created,
425 "vim_message": str(e),
426 }
427
428 return "FAILED", ro_vim_item_update
429
430 def delete(self, ro_task, task_index):
431 task = ro_task["tasks"][task_index]
432 task_id = task["task_id"]
433 vm_vim_id = ro_task["vim_info"]["vim_id"]
434 ro_vim_item_update_ok = {
435 "vim_status": "DELETED",
436 "created": False,
437 "vim_message": "DELETED",
438 "vim_id": None,
439 }
440
441 try:
442 self.logger.debug(
443 "delete_vminstance: vm_vim_id={} created_items={}".format(
444 vm_vim_id, ro_task["vim_info"]["created_items"]
445 )
446 )
447 if vm_vim_id or ro_task["vim_info"]["created_items"]:
448 target_vim = self.my_vims[ro_task["target_id"]]
449 target_vim.delete_vminstance(
450 vm_vim_id,
451 ro_task["vim_info"]["created_items"],
452 ro_task["vim_info"].get("volumes_to_hold", []),
453 )
454 except vimconn.VimConnNotFoundException:
455 ro_vim_item_update_ok["vim_message"] = "already deleted"
456 except vimconn.VimConnException as e:
457 self.logger.error(
458 "ro_task={} vim={} del-vm={}: {}".format(
459 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
460 )
461 )
462 ro_vim_item_update = {
463 "vim_status": "VIM_ERROR",
464 "vim_message": "Error while deleting: {}".format(e),
465 }
466
467 return "FAILED", ro_vim_item_update
468
469 self.logger.debug(
470 "task={} {} del-vm={} {}".format(
471 task_id,
472 ro_task["target_id"],
473 vm_vim_id,
474 ro_vim_item_update_ok.get("vim_message", ""),
475 )
476 )
477
478 return "DONE", ro_vim_item_update_ok
479
480 def refresh(self, ro_task):
481 """Call VIM to get vm status"""
482 ro_task_id = ro_task["_id"]
483 target_vim = self.my_vims[ro_task["target_id"]]
484 vim_id = ro_task["vim_info"]["vim_id"]
485
486 if not vim_id:
487 return None, None
488
489 vm_to_refresh_list = [vim_id]
490 try:
491 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
492 vim_info = vim_dict[vim_id]
493
494 if vim_info["status"] == "ACTIVE":
495 task_status = "DONE"
496 elif vim_info["status"] == "BUILD":
497 task_status = "BUILD"
498 else:
499 task_status = "FAILED"
500
501 # try to load and parse vim_information
502 try:
503 vim_info_info = yaml.safe_load(vim_info["vim_info"])
504 if vim_info_info.get("name"):
505 vim_info["name"] = vim_info_info["name"]
506 except Exception as vim_info_error:
507 self.logger.exception(
508 f"{vim_info_error} occured while getting the vim_info from yaml"
509 )
510 except vimconn.VimConnException as e:
511 # Mark all tasks at VIM_ERROR status
512 self.logger.error(
513 "ro_task={} vim={} get-vm={}: {}".format(
514 ro_task_id, ro_task["target_id"], vim_id, e
515 )
516 )
517 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
518 task_status = "FAILED"
519
520 ro_vim_item_update = {}
521
522 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
523 vim_interfaces = []
524 if vim_info.get("interfaces"):
525 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
526 iface = next(
527 (
528 iface
529 for iface in vim_info["interfaces"]
530 if vim_iface_id == iface["vim_interface_id"]
531 ),
532 None,
533 )
534 # if iface:
535 # iface.pop("vim_info", None)
536 vim_interfaces.append(iface)
537
538 task_create = next(
539 t
540 for t in ro_task["tasks"]
541 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
542 )
543 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
544 vim_interfaces[task_create["mgmt_vnf_interface"]][
545 "mgmt_vnf_interface"
546 ] = True
547
548 mgmt_vdu_iface = task_create.get(
549 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
550 )
551 if vim_interfaces:
552 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
553
554 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
555 ro_vim_item_update["interfaces"] = vim_interfaces
556
557 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
558 ro_vim_item_update["vim_status"] = vim_info["status"]
559
560 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
561 ro_vim_item_update["vim_name"] = vim_info.get("name")
562
563 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
564 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
565 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
566 elif vim_info["status"] == "DELETED":
567 ro_vim_item_update["vim_id"] = None
568 ro_vim_item_update["vim_message"] = "Deleted externally"
569 else:
570 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
571 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
572
573 if ro_vim_item_update:
574 self.logger.debug(
575 "ro_task={} {} get-vm={}: status={} {}".format(
576 ro_task_id,
577 ro_task["target_id"],
578 vim_id,
579 ro_vim_item_update.get("vim_status"),
580 ro_vim_item_update.get("vim_message")
581 if ro_vim_item_update.get("vim_status") != "ACTIVE"
582 else "",
583 )
584 )
585
586 return task_status, ro_vim_item_update
587
588 def exec(self, ro_task, task_index, task_depends):
589 task = ro_task["tasks"][task_index]
590 task_id = task["task_id"]
591 target_vim = self.my_vims[ro_task["target_id"]]
592 db_task_update = {"retries": 0}
593 retries = task.get("retries", 0)
594
595 try:
596 params = task["params"]
597 params_copy = deepcopy(params)
598 params_copy["ro_key"] = self.db.decrypt(
599 params_copy.pop("private_key"),
600 params_copy.pop("schema_version"),
601 params_copy.pop("salt"),
602 )
603 params_copy["ip_addr"] = params_copy.pop("ip_address")
604 target_vim.inject_user_key(**params_copy)
605 self.logger.debug(
606 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
607 )
608
609 return (
610 "DONE",
611 None,
612 db_task_update,
613 ) # params_copy["key"]
614 except (vimconn.VimConnException, NsWorkerException) as e:
615 retries += 1
616
617 self.logger.debug(traceback.format_exc())
618 if retries < self.max_retries_inject_ssh_key:
619 return (
620 "BUILD",
621 None,
622 {
623 "retries": retries,
624 "next_retry": self.time_retries_inject_ssh_key,
625 },
626 )
627
628 self.logger.error(
629 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
630 )
631 ro_vim_item_update = {"vim_message": str(e)}
632
633 return "FAILED", ro_vim_item_update, db_task_update
634
635
636 class VimInteractionImage(VimInteractionBase):
637 def new(self, ro_task, task_index, task_depends):
638 task = ro_task["tasks"][task_index]
639 task_id = task["task_id"]
640 created = False
641 created_items = {}
642 target_vim = self.my_vims[ro_task["target_id"]]
643
644 try:
645 # FIND
646 if task.get("find_params"):
647 vim_images = target_vim.get_image_list(**task["find_params"])
648
649 if not vim_images:
650 raise NsWorkerExceptionNotFound(
651 "Image not found with this criteria: '{}'".format(
652 task["find_params"]
653 )
654 )
655 elif len(vim_images) > 1:
656 raise NsWorkerException(
657 "More than one image found with this criteria: '{}'".format(
658 task["find_params"]
659 )
660 )
661 else:
662 vim_image_id = vim_images[0]["id"]
663
664 ro_vim_item_update = {
665 "vim_id": vim_image_id,
666 "vim_status": "DONE",
667 "created": created,
668 "created_items": created_items,
669 "vim_details": None,
670 "vim_message": None,
671 }
672 self.logger.debug(
673 "task={} {} new-image={} created={}".format(
674 task_id, ro_task["target_id"], vim_image_id, created
675 )
676 )
677
678 return "DONE", ro_vim_item_update
679 except (NsWorkerException, vimconn.VimConnException) as e:
680 self.logger.error(
681 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
682 )
683 ro_vim_item_update = {
684 "vim_status": "VIM_ERROR",
685 "created": created,
686 "vim_message": str(e),
687 }
688
689 return "FAILED", ro_vim_item_update
690
691
692 class VimInteractionSharedVolume(VimInteractionBase):
693 def delete(self, ro_task, task_index):
694 task = ro_task["tasks"][task_index]
695 task_id = task["task_id"]
696 shared_volume_vim_id = ro_task["vim_info"]["vim_id"]
697 created_items = ro_task["vim_info"]["created_items"]
698 ro_vim_item_update_ok = {
699 "vim_status": "DELETED",
700 "created": False,
701 "vim_message": "DELETED",
702 "vim_id": None,
703 }
704 if created_items and created_items.get(shared_volume_vim_id).get("keep"):
705 ro_vim_item_update_ok = {
706 "vim_status": "ACTIVE",
707 "created": False,
708 "vim_message": None,
709 }
710 return "DONE", ro_vim_item_update_ok
711 try:
712 if shared_volume_vim_id:
713 target_vim = self.my_vims[ro_task["target_id"]]
714 target_vim.delete_shared_volumes(shared_volume_vim_id)
715 except vimconn.VimConnNotFoundException:
716 ro_vim_item_update_ok["vim_message"] = "already deleted"
717 except vimconn.VimConnException as e:
718 self.logger.error(
719 "ro_task={} vim={} del-shared-volume={}: {}".format(
720 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e
721 )
722 )
723 ro_vim_item_update = {
724 "vim_status": "VIM_ERROR",
725 "vim_message": "Error while deleting: {}".format(e),
726 }
727
728 return "FAILED", ro_vim_item_update
729
730 self.logger.debug(
731 "task={} {} del-shared-volume={} {}".format(
732 task_id,
733 ro_task["target_id"],
734 shared_volume_vim_id,
735 ro_vim_item_update_ok.get("vim_message", ""),
736 )
737 )
738
739 return "DONE", ro_vim_item_update_ok
740
741 def new(self, ro_task, task_index, task_depends):
742 task = ro_task["tasks"][task_index]
743 task_id = task["task_id"]
744 created = False
745 created_items = {}
746 target_vim = self.my_vims[ro_task["target_id"]]
747
748 try:
749 shared_volume_vim_id = None
750 shared_volume_data = None
751
752 if task.get("params"):
753 shared_volume_data = task["params"]
754
755 if shared_volume_data:
756 self.logger.info(
757 f"Creating the new shared_volume for {shared_volume_data}\n"
758 )
759 (
760 shared_volume_name,
761 shared_volume_vim_id,
762 ) = target_vim.new_shared_volumes(shared_volume_data)
763 created = True
764 created_items[shared_volume_vim_id] = {
765 "name": shared_volume_name,
766 "keep": shared_volume_data.get("keep"),
767 }
768
769 ro_vim_item_update = {
770 "vim_id": shared_volume_vim_id,
771 "vim_status": "ACTIVE",
772 "created": created,
773 "created_items": created_items,
774 "vim_details": None,
775 "vim_message": None,
776 }
777 self.logger.debug(
778 "task={} {} new-shared-volume={} created={}".format(
779 task_id, ro_task["target_id"], shared_volume_vim_id, created
780 )
781 )
782
783 return "DONE", ro_vim_item_update
784 except (vimconn.VimConnException, NsWorkerException) as e:
785 self.logger.error(
786 "task={} vim={} new-shared-volume:"
787 " {}".format(task_id, ro_task["target_id"], e)
788 )
789 ro_vim_item_update = {
790 "vim_status": "VIM_ERROR",
791 "created": created,
792 "vim_message": str(e),
793 }
794
795 return "FAILED", ro_vim_item_update
796
797
798 class VimInteractionFlavor(VimInteractionBase):
799 def delete(self, ro_task, task_index):
800 task = ro_task["tasks"][task_index]
801 task_id = task["task_id"]
802 flavor_vim_id = ro_task["vim_info"]["vim_id"]
803 ro_vim_item_update_ok = {
804 "vim_status": "DELETED",
805 "created": False,
806 "vim_message": "DELETED",
807 "vim_id": None,
808 }
809
810 try:
811 if flavor_vim_id:
812 target_vim = self.my_vims[ro_task["target_id"]]
813 target_vim.delete_flavor(flavor_vim_id)
814 except vimconn.VimConnNotFoundException:
815 ro_vim_item_update_ok["vim_message"] = "already deleted"
816 except vimconn.VimConnException as e:
817 self.logger.error(
818 "ro_task={} vim={} del-flavor={}: {}".format(
819 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
820 )
821 )
822 ro_vim_item_update = {
823 "vim_status": "VIM_ERROR",
824 "vim_message": "Error while deleting: {}".format(e),
825 }
826
827 return "FAILED", ro_vim_item_update
828
829 self.logger.debug(
830 "task={} {} del-flavor={} {}".format(
831 task_id,
832 ro_task["target_id"],
833 flavor_vim_id,
834 ro_vim_item_update_ok.get("vim_message", ""),
835 )
836 )
837
838 return "DONE", ro_vim_item_update_ok
839
840 def new(self, ro_task, task_index, task_depends):
841 task = ro_task["tasks"][task_index]
842 task_id = task["task_id"]
843 created = False
844 created_items = {}
845 target_vim = self.my_vims[ro_task["target_id"]]
846 try:
847 # FIND
848 vim_flavor_id = None
849
850 if task.get("find_params"):
851 try:
852 flavor_data = task["find_params"]["flavor_data"]
853 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
854 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
855 self.logger.warning(
856 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
857 )
858
859 if not vim_flavor_id and task.get("params"):
860 # CREATE
861 flavor_data = task["params"]["flavor_data"]
862 vim_flavor_id = target_vim.new_flavor(flavor_data)
863 created = True
864
865 ro_vim_item_update = {
866 "vim_id": vim_flavor_id,
867 "vim_status": "DONE",
868 "created": created,
869 "created_items": created_items,
870 "vim_details": None,
871 "vim_message": None,
872 }
873 self.logger.debug(
874 "task={} {} new-flavor={} created={}".format(
875 task_id, ro_task["target_id"], vim_flavor_id, created
876 )
877 )
878
879 return "DONE", ro_vim_item_update
880 except (vimconn.VimConnException, NsWorkerException) as e:
881 self.logger.error(
882 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
883 )
884 ro_vim_item_update = {
885 "vim_status": "VIM_ERROR",
886 "created": created,
887 "vim_message": str(e),
888 }
889
890 return "FAILED", ro_vim_item_update
891
892
893 class VimInteractionAffinityGroup(VimInteractionBase):
894 def delete(self, ro_task, task_index):
895 task = ro_task["tasks"][task_index]
896 task_id = task["task_id"]
897 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
898 ro_vim_item_update_ok = {
899 "vim_status": "DELETED",
900 "created": False,
901 "vim_message": "DELETED",
902 "vim_id": None,
903 }
904
905 try:
906 if affinity_group_vim_id:
907 target_vim = self.my_vims[ro_task["target_id"]]
908 target_vim.delete_affinity_group(affinity_group_vim_id)
909 except vimconn.VimConnNotFoundException:
910 ro_vim_item_update_ok["vim_message"] = "already deleted"
911 except vimconn.VimConnException as e:
912 self.logger.error(
913 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
914 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
915 )
916 )
917 ro_vim_item_update = {
918 "vim_status": "VIM_ERROR",
919 "vim_message": "Error while deleting: {}".format(e),
920 }
921
922 return "FAILED", ro_vim_item_update
923
924 self.logger.debug(
925 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
926 task_id,
927 ro_task["target_id"],
928 affinity_group_vim_id,
929 ro_vim_item_update_ok.get("vim_message", ""),
930 )
931 )
932
933 return "DONE", ro_vim_item_update_ok
934
935 def new(self, ro_task, task_index, task_depends):
936 task = ro_task["tasks"][task_index]
937 task_id = task["task_id"]
938 created = False
939 created_items = {}
940 target_vim = self.my_vims[ro_task["target_id"]]
941
942 try:
943 affinity_group_vim_id = None
944 affinity_group_data = None
945
946 if task.get("params"):
947 affinity_group_data = task["params"].get("affinity_group_data")
948
949 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
950 try:
951 param_affinity_group_id = task["params"]["affinity_group_data"].get(
952 "vim-affinity-group-id"
953 )
954 affinity_group_vim_id = target_vim.get_affinity_group(
955 param_affinity_group_id
956 ).get("id")
957 except vimconn.VimConnNotFoundException:
958 self.logger.error(
959 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
960 "could not be found at VIM. Creating a new one.".format(
961 task_id, ro_task["target_id"], param_affinity_group_id
962 )
963 )
964
965 if not affinity_group_vim_id and affinity_group_data:
966 affinity_group_vim_id = target_vim.new_affinity_group(
967 affinity_group_data
968 )
969 created = True
970
971 ro_vim_item_update = {
972 "vim_id": affinity_group_vim_id,
973 "vim_status": "DONE",
974 "created": created,
975 "created_items": created_items,
976 "vim_details": None,
977 "vim_message": None,
978 }
979 self.logger.debug(
980 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
981 task_id, ro_task["target_id"], affinity_group_vim_id, created
982 )
983 )
984
985 return "DONE", ro_vim_item_update
986 except (vimconn.VimConnException, NsWorkerException) as e:
987 self.logger.error(
988 "task={} vim={} new-affinity-or-anti-affinity-group:"
989 " {}".format(task_id, ro_task["target_id"], e)
990 )
991 ro_vim_item_update = {
992 "vim_status": "VIM_ERROR",
993 "created": created,
994 "vim_message": str(e),
995 }
996
997 return "FAILED", ro_vim_item_update
998
999
1000 class VimInteractionUpdateVdu(VimInteractionBase):
1001 def exec(self, ro_task, task_index, task_depends):
1002 task = ro_task["tasks"][task_index]
1003 task_id = task["task_id"]
1004 db_task_update = {"retries": 0}
1005 created = False
1006 created_items = {}
1007 target_vim = self.my_vims[ro_task["target_id"]]
1008
1009 try:
1010 if task.get("params"):
1011 vim_vm_id = task["params"].get("vim_vm_id")
1012 action = task["params"].get("action")
1013 context = {action: action}
1014 target_vim.action_vminstance(vim_vm_id, context)
1015 # created = True
1016 ro_vim_item_update = {
1017 "vim_id": vim_vm_id,
1018 "vim_status": "DONE",
1019 "created": created,
1020 "created_items": created_items,
1021 "vim_details": None,
1022 "vim_message": None,
1023 }
1024 self.logger.debug(
1025 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1026 )
1027 return "DONE", ro_vim_item_update, db_task_update
1028 except (vimconn.VimConnException, NsWorkerException) as e:
1029 self.logger.error(
1030 "task={} vim={} VM Migration:"
1031 " {}".format(task_id, ro_task["target_id"], e)
1032 )
1033 ro_vim_item_update = {
1034 "vim_status": "VIM_ERROR",
1035 "created": created,
1036 "vim_message": str(e),
1037 }
1038
1039 return "FAILED", ro_vim_item_update, db_task_update
1040
1041
1042 class VimInteractionSdnNet(VimInteractionBase):
1043 @staticmethod
1044 def _match_pci(port_pci, mapping):
1045 """
1046 Check if port_pci matches with mapping
1047 mapping can have brackets to indicate that several chars are accepted. e.g
1048 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1049 :param port_pci: text
1050 :param mapping: text, can contain brackets to indicate several chars are available
1051 :return: True if matches, False otherwise
1052 """
1053 if not port_pci or not mapping:
1054 return False
1055 if port_pci == mapping:
1056 return True
1057
1058 mapping_index = 0
1059 pci_index = 0
1060 while True:
1061 bracket_start = mapping.find("[", mapping_index)
1062
1063 if bracket_start == -1:
1064 break
1065
1066 bracket_end = mapping.find("]", bracket_start)
1067 if bracket_end == -1:
1068 break
1069
1070 length = bracket_start - mapping_index
1071 if (
1072 length
1073 and port_pci[pci_index : pci_index + length]
1074 != mapping[mapping_index:bracket_start]
1075 ):
1076 return False
1077
1078 if (
1079 port_pci[pci_index + length]
1080 not in mapping[bracket_start + 1 : bracket_end]
1081 ):
1082 return False
1083
1084 pci_index += length + 1
1085 mapping_index = bracket_end + 1
1086
1087 if port_pci[pci_index:] != mapping[mapping_index:]:
1088 return False
1089
1090 return True
1091
1092 def _get_interfaces(self, vlds_to_connect, vim_account_id):
1093 """
1094 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1095 :param vim_account_id:
1096 :return:
1097 """
1098 interfaces = []
1099
1100 for vld in vlds_to_connect:
1101 table, _, db_id = vld.partition(":")
1102 db_id, _, vld = db_id.partition(":")
1103 _, _, vld_id = vld.partition(".")
1104
1105 if table == "vnfrs":
1106 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1107 iface_key = "vnf-vld-id"
1108 else: # table == "nsrs"
1109 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1110 iface_key = "ns-vld-id"
1111
1112 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1113
1114 for db_vnfr in db_vnfrs:
1115 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1116 for iface_index, interface in enumerate(vdur["interfaces"]):
1117 if interface.get(iface_key) == vld_id and interface.get(
1118 "type"
1119 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1120 # only SR-IOV o PT
1121 interface_ = interface.copy()
1122 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1123 db_vnfr["_id"], vdu_index, iface_index
1124 )
1125
1126 if vdur.get("status") == "ERROR":
1127 interface_["status"] = "ERROR"
1128
1129 interfaces.append(interface_)
1130
1131 return interfaces
1132
1133 def refresh(self, ro_task):
1134 # look for task create
1135 task_create_index, _ = next(
1136 i_t
1137 for i_t in enumerate(ro_task["tasks"])
1138 if i_t[1]
1139 and i_t[1]["action"] == "CREATE"
1140 and i_t[1]["status"] != "FINISHED"
1141 )
1142
1143 return self.new(ro_task, task_create_index, None)
1144
1145 def new(self, ro_task, task_index, task_depends):
1146 task = ro_task["tasks"][task_index]
1147 task_id = task["task_id"]
1148 target_vim = self.my_vims[ro_task["target_id"]]
1149
1150 sdn_net_id = ro_task["vim_info"]["vim_id"]
1151
1152 created_items = ro_task["vim_info"].get("created_items")
1153 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1154 new_connected_ports = []
1155 last_update = ro_task["vim_info"].get("last_update", 0)
1156 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1157 error_list = []
1158 created = ro_task["vim_info"].get("created", False)
1159
1160 try:
1161 # CREATE
1162 params = task["params"]
1163 vlds_to_connect = params.get("vlds", [])
1164 associated_vim = params.get("target_vim")
1165 # external additional ports
1166 additional_ports = params.get("sdn-ports") or ()
1167 _, _, vim_account_id = (
1168 (None, None, None)
1169 if associated_vim is None
1170 else associated_vim.partition(":")
1171 )
1172
1173 if associated_vim:
1174 # get associated VIM
1175 if associated_vim not in self.db_vims:
1176 self.db_vims[associated_vim] = self.db.get_one(
1177 "vim_accounts", {"_id": vim_account_id}
1178 )
1179
1180 db_vim = self.db_vims[associated_vim]
1181
1182 # look for ports to connect
1183 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1184 # print(ports)
1185
1186 sdn_ports = []
1187 pending_ports = error_ports = 0
1188 vlan_used = None
1189 sdn_need_update = False
1190
1191 for port in ports:
1192 vlan_used = port.get("vlan") or vlan_used
1193
1194 # TODO. Do not connect if already done
1195 if not port.get("compute_node") or not port.get("pci"):
1196 if port.get("status") == "ERROR":
1197 error_ports += 1
1198 else:
1199 pending_ports += 1
1200 continue
1201
1202 pmap = None
1203 compute_node_mappings = next(
1204 (
1205 c
1206 for c in db_vim["config"].get("sdn-port-mapping", ())
1207 if c and c["compute_node"] == port["compute_node"]
1208 ),
1209 None,
1210 )
1211
1212 if compute_node_mappings:
1213 # process port_mapping pci of type 0000:af:1[01].[1357]
1214 pmap = next(
1215 (
1216 p
1217 for p in compute_node_mappings["ports"]
1218 if self._match_pci(port["pci"], p.get("pci"))
1219 ),
1220 None,
1221 )
1222
1223 if not pmap:
1224 if not db_vim["config"].get("mapping_not_needed"):
1225 error_list.append(
1226 "Port mapping not found for compute_node={} pci={}".format(
1227 port["compute_node"], port["pci"]
1228 )
1229 )
1230 continue
1231
1232 pmap = {}
1233
1234 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1235 new_port = {
1236 "service_endpoint_id": pmap.get("service_endpoint_id")
1237 or service_endpoint_id,
1238 "service_endpoint_encapsulation_type": "dot1q"
1239 if port["type"] == "SR-IOV"
1240 else None,
1241 "service_endpoint_encapsulation_info": {
1242 "vlan": port.get("vlan"),
1243 "mac": port.get("mac-address"),
1244 "device_id": pmap.get("device_id") or port["compute_node"],
1245 "device_interface_id": pmap.get("device_interface_id")
1246 or port["pci"],
1247 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1248 "switch_port": pmap.get("switch_port"),
1249 "service_mapping_info": pmap.get("service_mapping_info"),
1250 },
1251 }
1252
1253 # TODO
1254 # if port["modified_at"] > last_update:
1255 # sdn_need_update = True
1256 new_connected_ports.append(port["id"]) # TODO
1257 sdn_ports.append(new_port)
1258
1259 if error_ports:
1260 error_list.append(
1261 "{} interfaces have not been created as VDU is on ERROR status".format(
1262 error_ports
1263 )
1264 )
1265
1266 # connect external ports
1267 for index, additional_port in enumerate(additional_ports):
1268 additional_port_id = additional_port.get(
1269 "service_endpoint_id"
1270 ) or "external-{}".format(index)
1271 sdn_ports.append(
1272 {
1273 "service_endpoint_id": additional_port_id,
1274 "service_endpoint_encapsulation_type": additional_port.get(
1275 "service_endpoint_encapsulation_type", "dot1q"
1276 ),
1277 "service_endpoint_encapsulation_info": {
1278 "vlan": additional_port.get("vlan") or vlan_used,
1279 "mac": additional_port.get("mac_address"),
1280 "device_id": additional_port.get("device_id"),
1281 "device_interface_id": additional_port.get(
1282 "device_interface_id"
1283 ),
1284 "switch_dpid": additional_port.get("switch_dpid")
1285 or additional_port.get("switch_id"),
1286 "switch_port": additional_port.get("switch_port"),
1287 "service_mapping_info": additional_port.get(
1288 "service_mapping_info"
1289 ),
1290 },
1291 }
1292 )
1293 new_connected_ports.append(additional_port_id)
1294 sdn_info = ""
1295
1296 # if there are more ports to connect or they have been modified, call create/update
1297 if error_list:
1298 sdn_status = "ERROR"
1299 sdn_info = "; ".join(error_list)
1300 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1301 last_update = time.time()
1302
1303 if not sdn_net_id:
1304 if len(sdn_ports) < 2:
1305 sdn_status = "ACTIVE"
1306
1307 if not pending_ports:
1308 self.logger.debug(
1309 "task={} {} new-sdn-net done, less than 2 ports".format(
1310 task_id, ro_task["target_id"]
1311 )
1312 )
1313 else:
1314 net_type = params.get("type") or "ELAN"
1315 (
1316 sdn_net_id,
1317 created_items,
1318 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1319 created = True
1320 self.logger.debug(
1321 "task={} {} new-sdn-net={} created={}".format(
1322 task_id, ro_task["target_id"], sdn_net_id, created
1323 )
1324 )
1325 else:
1326 created_items = target_vim.edit_connectivity_service(
1327 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1328 )
1329 created = True
1330 self.logger.debug(
1331 "task={} {} update-sdn-net={} created={}".format(
1332 task_id, ro_task["target_id"], sdn_net_id, created
1333 )
1334 )
1335
1336 connected_ports = new_connected_ports
1337 elif sdn_net_id:
1338 wim_status_dict = target_vim.get_connectivity_service_status(
1339 sdn_net_id, conn_info=created_items
1340 )
1341 sdn_status = wim_status_dict["sdn_status"]
1342
1343 if wim_status_dict.get("sdn_info"):
1344 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1345
1346 if wim_status_dict.get("error_msg"):
1347 sdn_info = wim_status_dict.get("error_msg") or ""
1348
1349 if pending_ports:
1350 if sdn_status != "ERROR":
1351 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1352 len(ports) - pending_ports, len(ports)
1353 )
1354
1355 if sdn_status == "ACTIVE":
1356 sdn_status = "BUILD"
1357
1358 ro_vim_item_update = {
1359 "vim_id": sdn_net_id,
1360 "vim_status": sdn_status,
1361 "created": created,
1362 "created_items": created_items,
1363 "connected_ports": connected_ports,
1364 "vim_details": sdn_info,
1365 "vim_message": None,
1366 "last_update": last_update,
1367 }
1368
1369 return sdn_status, ro_vim_item_update
1370 except Exception as e:
1371 self.logger.error(
1372 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1373 exc_info=not isinstance(
1374 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1375 ),
1376 )
1377 ro_vim_item_update = {
1378 "vim_status": "VIM_ERROR",
1379 "created": created,
1380 "vim_message": str(e),
1381 }
1382
1383 return "FAILED", ro_vim_item_update
1384
1385 def delete(self, ro_task, task_index):
1386 task = ro_task["tasks"][task_index]
1387 task_id = task["task_id"]
1388 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1389 ro_vim_item_update_ok = {
1390 "vim_status": "DELETED",
1391 "created": False,
1392 "vim_message": "DELETED",
1393 "vim_id": None,
1394 }
1395
1396 try:
1397 if sdn_vim_id:
1398 target_vim = self.my_vims[ro_task["target_id"]]
1399 target_vim.delete_connectivity_service(
1400 sdn_vim_id, ro_task["vim_info"].get("created_items")
1401 )
1402
1403 except Exception as e:
1404 if (
1405 isinstance(e, sdnconn.SdnConnectorError)
1406 and e.http_code == HTTPStatus.NOT_FOUND.value
1407 ):
1408 ro_vim_item_update_ok["vim_message"] = "already deleted"
1409 else:
1410 self.logger.error(
1411 "ro_task={} vim={} del-sdn-net={}: {}".format(
1412 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1413 ),
1414 exc_info=not isinstance(
1415 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1416 ),
1417 )
1418 ro_vim_item_update = {
1419 "vim_status": "VIM_ERROR",
1420 "vim_message": "Error while deleting: {}".format(e),
1421 }
1422
1423 return "FAILED", ro_vim_item_update
1424
1425 self.logger.debug(
1426 "task={} {} del-sdn-net={} {}".format(
1427 task_id,
1428 ro_task["target_id"],
1429 sdn_vim_id,
1430 ro_vim_item_update_ok.get("vim_message", ""),
1431 )
1432 )
1433
1434 return "DONE", ro_vim_item_update_ok
1435
1436
1437 class VimInteractionMigration(VimInteractionBase):
1438 def exec(self, ro_task, task_index, task_depends):
1439 task = ro_task["tasks"][task_index]
1440 task_id = task["task_id"]
1441 db_task_update = {"retries": 0}
1442 target_vim = self.my_vims[ro_task["target_id"]]
1443 vim_interfaces = []
1444 created = False
1445 created_items = {}
1446 refreshed_vim_info = {}
1447
1448 try:
1449 if task.get("params"):
1450 vim_vm_id = task["params"].get("vim_vm_id")
1451 migrate_host = task["params"].get("migrate_host")
1452 _, migrated_compute_node = target_vim.migrate_instance(
1453 vim_vm_id, migrate_host
1454 )
1455
1456 if migrated_compute_node:
1457 # When VM is migrated, vdu["vim_info"] needs to be updated
1458 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1459 ro_task["target_id"]
1460 )
1461
1462 # Refresh VM to get new vim_info
1463 vm_to_refresh_list = [vim_vm_id]
1464 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1465 refreshed_vim_info = vim_dict[vim_vm_id]
1466
1467 if refreshed_vim_info.get("interfaces"):
1468 for old_iface in vdu_old_vim_info.get("interfaces"):
1469 iface = next(
1470 (
1471 iface
1472 for iface in refreshed_vim_info["interfaces"]
1473 if old_iface["vim_interface_id"]
1474 == iface["vim_interface_id"]
1475 ),
1476 None,
1477 )
1478 vim_interfaces.append(iface)
1479
1480 ro_vim_item_update = {
1481 "vim_id": vim_vm_id,
1482 "vim_status": "ACTIVE",
1483 "created": created,
1484 "created_items": created_items,
1485 "vim_details": None,
1486 "vim_message": None,
1487 }
1488
1489 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1490 "ERROR",
1491 "VIM_ERROR",
1492 ):
1493 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1494
1495 if vim_interfaces:
1496 ro_vim_item_update["interfaces"] = vim_interfaces
1497
1498 self.logger.debug(
1499 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1500 )
1501
1502 return "DONE", ro_vim_item_update, db_task_update
1503
1504 except (vimconn.VimConnException, NsWorkerException) as e:
1505 self.logger.error(
1506 "task={} vim={} VM Migration:"
1507 " {}".format(task_id, ro_task["target_id"], e)
1508 )
1509 ro_vim_item_update = {
1510 "vim_status": "VIM_ERROR",
1511 "created": created,
1512 "vim_message": str(e),
1513 }
1514
1515 return "FAILED", ro_vim_item_update, db_task_update
1516
1517
1518 class VimInteractionResize(VimInteractionBase):
1519 def exec(self, ro_task, task_index, task_depends):
1520 task = ro_task["tasks"][task_index]
1521 task_id = task["task_id"]
1522 db_task_update = {"retries": 0}
1523 created = False
1524 target_flavor_uuid = None
1525 created_items = {}
1526 refreshed_vim_info = {}
1527 target_vim = self.my_vims[ro_task["target_id"]]
1528
1529 try:
1530 if task.get("params"):
1531 vim_vm_id = task["params"].get("vim_vm_id")
1532 flavor_dict = task["params"].get("flavor_dict")
1533 self.logger.info("flavor_dict %s", flavor_dict)
1534
1535 try:
1536 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1537 except Exception as e:
1538 self.logger.info("Cannot find any flavor matching %s.", str(e))
1539 try:
1540 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1541 except Exception as e:
1542 self.logger.error("Error creating flavor at VIM %s.", str(e))
1543
1544 if target_flavor_uuid is not None:
1545 resized_status = target_vim.resize_instance(
1546 vim_vm_id, target_flavor_uuid
1547 )
1548
1549 if resized_status:
1550 # Refresh VM to get new vim_info
1551 vm_to_refresh_list = [vim_vm_id]
1552 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1553 refreshed_vim_info = vim_dict[vim_vm_id]
1554
1555 ro_vim_item_update = {
1556 "vim_id": vim_vm_id,
1557 "vim_status": "DONE",
1558 "created": created,
1559 "created_items": created_items,
1560 "vim_details": None,
1561 "vim_message": None,
1562 }
1563
1564 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1565 "ERROR",
1566 "VIM_ERROR",
1567 ):
1568 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1569
1570 self.logger.debug(
1571 "task={} {} resize done".format(task_id, ro_task["target_id"])
1572 )
1573 return "DONE", ro_vim_item_update, db_task_update
1574 except (vimconn.VimConnException, NsWorkerException) as e:
1575 self.logger.error(
1576 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1577 )
1578 ro_vim_item_update = {
1579 "vim_status": "VIM_ERROR",
1580 "created": created,
1581 "vim_message": str(e),
1582 }
1583
1584 return "FAILED", ro_vim_item_update, db_task_update
1585
1586
1587 class ConfigValidate:
1588 def __init__(self, config: Dict):
1589 self.conf = config
1590
1591 @property
1592 def active(self):
1593 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1594 if (
1595 self.conf["period"]["refresh_active"] >= 60
1596 or self.conf["period"]["refresh_active"] == -1
1597 ):
1598 return self.conf["period"]["refresh_active"]
1599
1600 return 60
1601
1602 @property
1603 def build(self):
1604 return self.conf["period"]["refresh_build"]
1605
1606 @property
1607 def image(self):
1608 return self.conf["period"]["refresh_image"]
1609
1610 @property
1611 def error(self):
1612 return self.conf["period"]["refresh_error"]
1613
1614 @property
1615 def queue_size(self):
1616 return self.conf["period"]["queue_size"]
1617
1618
1619 class NsWorker(threading.Thread):
1620 def __init__(self, worker_index, config, plugins, db):
1621 """
1622 :param worker_index: thread index
1623 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1624 :param plugins: global shared dict with the loaded plugins
1625 :param db: database class instance to use
1626 """
1627 threading.Thread.__init__(self)
1628 self.config = config
1629 self.plugins = plugins
1630 self.plugin_name = "unknown"
1631 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1632 self.worker_index = worker_index
1633 # refresh periods for created items
1634 self.refresh_config = ConfigValidate(config)
1635 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1636 # targetvim: vimplugin class
1637 self.my_vims = {}
1638 # targetvim: vim information from database
1639 self.db_vims = {}
1640 # targetvim list
1641 self.vim_targets = []
1642 self.my_id = config["process_id"] + ":" + str(worker_index)
1643 self.db = db
1644 self.item2class = {
1645 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1646 "shared-volumes": VimInteractionSharedVolume(
1647 self.db, self.my_vims, self.db_vims, self.logger
1648 ),
1649 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1650 "image": VimInteractionImage(
1651 self.db, self.my_vims, self.db_vims, self.logger
1652 ),
1653 "flavor": VimInteractionFlavor(
1654 self.db, self.my_vims, self.db_vims, self.logger
1655 ),
1656 "sdn_net": VimInteractionSdnNet(
1657 self.db, self.my_vims, self.db_vims, self.logger
1658 ),
1659 "update": VimInteractionUpdateVdu(
1660 self.db, self.my_vims, self.db_vims, self.logger
1661 ),
1662 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1663 self.db, self.my_vims, self.db_vims, self.logger
1664 ),
1665 "migrate": VimInteractionMigration(
1666 self.db, self.my_vims, self.db_vims, self.logger
1667 ),
1668 "verticalscale": VimInteractionResize(
1669 self.db, self.my_vims, self.db_vims, self.logger
1670 ),
1671 }
1672 self.time_last_task_processed = None
1673 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1674 self.tasks_to_delete = []
1675 # it is idle when there are not vim_targets associated
1676 self.idle = True
1677 self.task_locked_time = config["global"]["task_locked_time"]
1678
1679 def insert_task(self, task):
1680 try:
1681 self.task_queue.put(task, False)
1682 return None
1683 except queue.Full:
1684 raise NsWorkerException("timeout inserting a task")
1685
1686 def terminate(self):
1687 self.insert_task("exit")
1688
1689 def del_task(self, task):
1690 with self.task_lock:
1691 if task["status"] == "SCHEDULED":
1692 task["status"] = "SUPERSEDED"
1693 return True
1694 else: # task["status"] == "processing"
1695 self.task_lock.release()
1696 return False
1697
1698 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1699 """
1700 Process vim config, creating vim configuration files as ca_cert
1701 :param target_id: vim/sdn/wim + id
1702 :param db_vim: Vim dictionary obtained from database
1703 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1704 """
1705 if not db_vim.get("config"):
1706 return
1707
1708 file_name = ""
1709 work_dir = "/app/osm_ro/certs"
1710
1711 try:
1712 if db_vim["config"].get("ca_cert_content"):
1713 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1714
1715 if not path.isdir(file_name):
1716 makedirs(file_name)
1717
1718 file_name = file_name + "/ca_cert"
1719
1720 with open(file_name, "w") as f:
1721 f.write(db_vim["config"]["ca_cert_content"])
1722 del db_vim["config"]["ca_cert_content"]
1723 db_vim["config"]["ca_cert"] = file_name
1724 except Exception as e:
1725 raise NsWorkerException(
1726 "Error writing to file '{}': {}".format(file_name, e)
1727 )
1728
1729 def _load_plugin(self, name, type="vim"):
1730 # type can be vim or sdn
1731 if "rovim_dummy" not in self.plugins:
1732 self.plugins["rovim_dummy"] = VimDummyConnector
1733
1734 if "rosdn_dummy" not in self.plugins:
1735 self.plugins["rosdn_dummy"] = SdnDummyConnector
1736
1737 if name in self.plugins:
1738 return self.plugins[name]
1739
1740 try:
1741 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1742 self.plugins[name] = ep.load()
1743 except Exception as e:
1744 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1745
1746 if name and name not in self.plugins:
1747 raise NsWorkerException(
1748 "Plugin 'osm_{n}' has not been installed".format(n=name)
1749 )
1750
1751 return self.plugins[name]
1752
1753 def _unload_vim(self, target_id):
1754 """
1755 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1756 :param target_id: Contains type:_id; where type can be 'vim', ...
1757 :return: None.
1758 """
1759 try:
1760 self.db_vims.pop(target_id, None)
1761 self.my_vims.pop(target_id, None)
1762
1763 if target_id in self.vim_targets:
1764 self.vim_targets.remove(target_id)
1765
1766 self.logger.info("Unloaded {}".format(target_id))
1767 except Exception as e:
1768 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1769
1770 def _check_vim(self, target_id):
1771 """
1772 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1773 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1774 :return: None.
1775 """
1776 target, _, _id = target_id.partition(":")
1777 now = time.time()
1778 update_dict = {}
1779 unset_dict = {}
1780 op_text = ""
1781 step = ""
1782 loaded = target_id in self.vim_targets
1783 target_database = (
1784 "vim_accounts"
1785 if target == "vim"
1786 else "wim_accounts"
1787 if target == "wim"
1788 else "sdns"
1789 )
1790
1791 try:
1792 step = "Getting {} from db".format(target_id)
1793 db_vim = self.db.get_one(target_database, {"_id": _id})
1794
1795 for op_index, operation in enumerate(
1796 db_vim["_admin"].get("operations", ())
1797 ):
1798 if operation["operationState"] != "PROCESSING":
1799 continue
1800
1801 locked_at = operation.get("locked_at")
1802
1803 if locked_at is not None and locked_at >= now - self.task_locked_time:
1804 # some other thread is doing this operation
1805 return
1806
1807 # lock
1808 op_text = "_admin.operations.{}.".format(op_index)
1809
1810 if not self.db.set_one(
1811 target_database,
1812 q_filter={
1813 "_id": _id,
1814 op_text + "operationState": "PROCESSING",
1815 op_text + "locked_at": locked_at,
1816 },
1817 update_dict={
1818 op_text + "locked_at": now,
1819 "admin.current_operation": op_index,
1820 },
1821 fail_on_empty=False,
1822 ):
1823 return
1824
1825 unset_dict[op_text + "locked_at"] = None
1826 unset_dict["current_operation"] = None
1827 step = "Loading " + target_id
1828 error_text = self._load_vim(target_id)
1829
1830 if not error_text:
1831 step = "Checking connectivity"
1832
1833 if target == "vim":
1834 self.my_vims[target_id].check_vim_connectivity()
1835 else:
1836 self.my_vims[target_id].check_credentials()
1837
1838 update_dict["_admin.operationalState"] = "ENABLED"
1839 update_dict["_admin.detailed-status"] = ""
1840 unset_dict[op_text + "detailed-status"] = None
1841 update_dict[op_text + "operationState"] = "COMPLETED"
1842
1843 return
1844
1845 except Exception as e:
1846 error_text = "{}: {}".format(step, e)
1847 self.logger.error("{} for {}: {}".format(step, target_id, e))
1848
1849 finally:
1850 if update_dict or unset_dict:
1851 if error_text:
1852 update_dict[op_text + "operationState"] = "FAILED"
1853 update_dict[op_text + "detailed-status"] = error_text
1854 unset_dict.pop(op_text + "detailed-status", None)
1855 update_dict["_admin.operationalState"] = "ERROR"
1856 update_dict["_admin.detailed-status"] = error_text
1857
1858 if op_text:
1859 update_dict[op_text + "statusEnteredTime"] = now
1860
1861 self.db.set_one(
1862 target_database,
1863 q_filter={"_id": _id},
1864 update_dict=update_dict,
1865 unset=unset_dict,
1866 fail_on_empty=False,
1867 )
1868
1869 if not loaded:
1870 self._unload_vim(target_id)
1871
1872 def _reload_vim(self, target_id):
1873 if target_id in self.vim_targets:
1874 self._load_vim(target_id)
1875 else:
1876 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1877 # just remove it to force load again next time it is needed
1878 self.db_vims.pop(target_id, None)
1879
1880 def _load_vim(self, target_id):
1881 """
1882 Load or reload a vim_account, sdn_controller or wim_account.
1883 Read content from database, load the plugin if not loaded.
1884 In case of error loading the plugin, it load a failing VIM_connector
1885 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1886 :param target_id: Contains type:_id; where type can be 'vim', ...
1887 :return: None if ok, descriptive text if error
1888 """
1889 target, _, _id = target_id.partition(":")
1890 target_database = (
1891 "vim_accounts"
1892 if target == "vim"
1893 else "wim_accounts"
1894 if target == "wim"
1895 else "sdns"
1896 )
1897 plugin_name = ""
1898 vim = None
1899 step = "Getting {}={} from db".format(target, _id)
1900
1901 try:
1902 # TODO process for wim, sdnc, ...
1903 vim = self.db.get_one(target_database, {"_id": _id})
1904
1905 # if deep_get(vim, "config", "sdn-controller"):
1906 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1907 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1908
1909 step = "Decrypting password"
1910 schema_version = vim.get("schema_version")
1911 self.db.encrypt_decrypt_fields(
1912 vim,
1913 "decrypt",
1914 fields=("password", "secret"),
1915 schema_version=schema_version,
1916 salt=_id,
1917 )
1918 self._process_vim_config(target_id, vim)
1919
1920 if target == "vim":
1921 plugin_name = "rovim_" + vim["vim_type"]
1922 step = "Loading plugin '{}'".format(plugin_name)
1923 vim_module_conn = self._load_plugin(plugin_name)
1924 step = "Loading {}'".format(target_id)
1925 self.my_vims[target_id] = vim_module_conn(
1926 uuid=vim["_id"],
1927 name=vim["name"],
1928 tenant_id=vim.get("vim_tenant_id"),
1929 tenant_name=vim.get("vim_tenant_name"),
1930 url=vim["vim_url"],
1931 url_admin=None,
1932 user=vim["vim_user"],
1933 passwd=vim["vim_password"],
1934 config=vim.get("config") or {},
1935 persistent_info={},
1936 )
1937 else: # sdn
1938 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1939 step = "Loading plugin '{}'".format(plugin_name)
1940 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1941 step = "Loading {}'".format(target_id)
1942 wim = deepcopy(vim)
1943 wim_config = wim.pop("config", {}) or {}
1944 wim["uuid"] = wim["_id"]
1945 if "url" in wim and "wim_url" not in wim:
1946 wim["wim_url"] = wim["url"]
1947 elif "url" not in wim and "wim_url" in wim:
1948 wim["url"] = wim["wim_url"]
1949
1950 if wim.get("dpid"):
1951 wim_config["dpid"] = wim.pop("dpid")
1952
1953 if wim.get("switch_id"):
1954 wim_config["switch_id"] = wim.pop("switch_id")
1955
1956 # wim, wim_account, config
1957 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1958 self.db_vims[target_id] = vim
1959 self.error_status = None
1960
1961 self.logger.info(
1962 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1963 )
1964 except Exception as e:
1965 self.logger.error(
1966 "Cannot load {} plugin={}: {} {}".format(
1967 target_id, plugin_name, step, e
1968 )
1969 )
1970
1971 self.db_vims[target_id] = vim or {}
1972 self.db_vims[target_id] = FailingConnector(str(e))
1973 error_status = "{} Error: {}".format(step, e)
1974
1975 return error_status
1976 finally:
1977 if target_id not in self.vim_targets:
1978 self.vim_targets.append(target_id)
1979
1980 def _get_db_task(self):
1981 """
1982 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1983 :return: None
1984 """
1985 now = time.time()
1986
1987 if not self.time_last_task_processed:
1988 self.time_last_task_processed = now
1989
1990 try:
1991 while True:
1992 """
1993 # Log RO tasks only when loglevel is DEBUG
1994 if self.logger.getEffectiveLevel() == logging.DEBUG:
1995 self._log_ro_task(
1996 None,
1997 None,
1998 None,
1999 "TASK_WF",
2000 "task_locked_time="
2001 + str(self.task_locked_time)
2002 + " "
2003 + "time_last_task_processed="
2004 + str(self.time_last_task_processed)
2005 + " "
2006 + "now="
2007 + str(now),
2008 )
2009 """
2010 locked = self.db.set_one(
2011 "ro_tasks",
2012 q_filter={
2013 "target_id": self.vim_targets,
2014 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2015 "locked_at.lt": now - self.task_locked_time,
2016 "to_check_at.lt": self.time_last_task_processed,
2017 "to_check_at.gt": -1,
2018 },
2019 update_dict={"locked_by": self.my_id, "locked_at": now},
2020 fail_on_empty=False,
2021 )
2022
2023 if locked:
2024 # read and return
2025 ro_task = self.db.get_one(
2026 "ro_tasks",
2027 q_filter={
2028 "target_id": self.vim_targets,
2029 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2030 "locked_at": now,
2031 },
2032 )
2033 return ro_task
2034
2035 if self.time_last_task_processed == now:
2036 self.time_last_task_processed = None
2037 return None
2038 else:
2039 self.time_last_task_processed = now
2040 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2041
2042 except DbException as e:
2043 self.logger.error("Database exception at _get_db_task: {}".format(e))
2044 except Exception as e:
2045 self.logger.critical(
2046 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
2047 )
2048
2049 return None
2050
2051 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2052 """
2053 Determine if this task need to be done or superseded
2054 :return: None
2055 """
2056 my_task = ro_task["tasks"][task_index]
2057 task_id = my_task["task_id"]
2058 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2059 "created_items", False
2060 )
2061
2062 self.logger.debug("Needed delete: {}".format(needed_delete))
2063 if my_task["status"] == "FAILED":
2064 return None, None # TODO need to be retry??
2065
2066 try:
2067 for index, task in enumerate(ro_task["tasks"]):
2068 if index == task_index or not task:
2069 continue # own task
2070
2071 if (
2072 my_task["target_record"] == task["target_record"]
2073 and task["action"] == "CREATE"
2074 ):
2075 # set to finished
2076 db_update["tasks.{}.status".format(index)] = task[
2077 "status"
2078 ] = "FINISHED"
2079 elif task["action"] == "CREATE" and task["status"] not in (
2080 "FINISHED",
2081 "SUPERSEDED",
2082 ):
2083 needed_delete = False
2084
2085 if needed_delete:
2086 self.logger.debug(
2087 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2088 )
2089 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2090 else:
2091 return "SUPERSEDED", None
2092 except Exception as e:
2093 if not isinstance(e, NsWorkerException):
2094 self.logger.critical(
2095 "Unexpected exception at _delete_task task={}: {}".format(
2096 task_id, e
2097 ),
2098 exc_info=True,
2099 )
2100
2101 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2102
2103 def _create_task(self, ro_task, task_index, task_depends, db_update):
2104 """
2105 Determine if this task need to create something at VIM
2106 :return: None
2107 """
2108 my_task = ro_task["tasks"][task_index]
2109 task_id = my_task["task_id"]
2110 task_status = None
2111
2112 if my_task["status"] == "FAILED":
2113 return None, None # TODO need to be retry??
2114 elif my_task["status"] == "SCHEDULED":
2115 # check if already created by another task
2116 for index, task in enumerate(ro_task["tasks"]):
2117 if index == task_index or not task:
2118 continue # own task
2119
2120 if task["action"] == "CREATE" and task["status"] not in (
2121 "SCHEDULED",
2122 "FINISHED",
2123 "SUPERSEDED",
2124 ):
2125 return task["status"], "COPY_VIM_INFO"
2126
2127 try:
2128 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2129 ro_task, task_index, task_depends
2130 )
2131 # TODO update other CREATE tasks
2132 except Exception as e:
2133 if not isinstance(e, NsWorkerException):
2134 self.logger.error(
2135 "Error executing task={}: {}".format(task_id, e), exc_info=True
2136 )
2137
2138 task_status = "FAILED"
2139 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2140 # TODO update ro_vim_item_update
2141
2142 return task_status, ro_vim_item_update
2143 else:
2144 return None, None
2145
2146 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2147 """
2148 Look for dependency task
2149 :param task_id: Can be one of
2150 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2151 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2152 3. task.task_id: "<action_id>:number"
2153 :param ro_task:
2154 :param target_id:
2155 :return: database ro_task plus index of task
2156 """
2157 if (
2158 task_id.startswith("vim:")
2159 or task_id.startswith("sdn:")
2160 or task_id.startswith("wim:")
2161 ):
2162 target_id, _, task_id = task_id.partition(" ")
2163
2164 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2165 ro_task_dependency = self.db.get_one(
2166 "ro_tasks",
2167 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2168 fail_on_empty=False,
2169 )
2170
2171 if ro_task_dependency:
2172 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2173 if task["target_record_id"] == task_id:
2174 return ro_task_dependency, task_index
2175
2176 else:
2177 if ro_task:
2178 for task_index, task in enumerate(ro_task["tasks"]):
2179 if task and task["task_id"] == task_id:
2180 return ro_task, task_index
2181
2182 ro_task_dependency = self.db.get_one(
2183 "ro_tasks",
2184 q_filter={
2185 "tasks.ANYINDEX.task_id": task_id,
2186 "tasks.ANYINDEX.target_record.ne": None,
2187 },
2188 fail_on_empty=False,
2189 )
2190
2191 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2192 if ro_task_dependency:
2193 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2194 if task["task_id"] == task_id:
2195 return ro_task_dependency, task_index
2196 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2197
2198 def update_vm_refresh(self, ro_task):
2199 """Enables the VM status updates if self.refresh_config.active parameter
2200 is not -1 and then updates the DB accordingly
2201
2202 """
2203 try:
2204 self.logger.debug("Checking if VM status update config")
2205 next_refresh = time.time()
2206 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2207
2208 if next_refresh != -1:
2209 db_ro_task_update = {}
2210 now = time.time()
2211 next_check_at = now + (24 * 60 * 60)
2212 next_check_at = min(next_check_at, next_refresh)
2213 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2214 db_ro_task_update["to_check_at"] = next_check_at
2215
2216 self.logger.debug(
2217 "Finding tasks which to be updated to enable VM status updates"
2218 )
2219 refresh_tasks = self.db.get_list(
2220 "ro_tasks",
2221 q_filter={
2222 "tasks.status": "DONE",
2223 "to_check_at.lt": 0,
2224 },
2225 )
2226 self.logger.debug("Updating tasks to change the to_check_at status")
2227 for task in refresh_tasks:
2228 q_filter = {
2229 "_id": task["_id"],
2230 }
2231 self.db.set_one(
2232 "ro_tasks",
2233 q_filter=q_filter,
2234 update_dict=db_ro_task_update,
2235 fail_on_empty=True,
2236 )
2237
2238 except Exception as e:
2239 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2240
2241 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2242 """Decide the next_refresh according to vim type and refresh config period.
2243 Args:
2244 ro_task (dict): ro_task details
2245 next_refresh (float): next refresh time as epoch format
2246
2247 Returns:
2248 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2249 """
2250 target_vim = ro_task["target_id"]
2251 vim_type = self.db_vims[target_vim]["vim_type"]
2252 if self.refresh_config.active == -1 or vim_type == "openstack":
2253 next_refresh = -1
2254 else:
2255 next_refresh += self.refresh_config.active
2256 return next_refresh
2257
2258 def _process_pending_tasks(self, ro_task):
2259 ro_task_id = ro_task["_id"]
2260 now = time.time()
2261 # one day
2262 next_check_at = now + (24 * 60 * 60)
2263 db_ro_task_update = {}
2264
2265 def _update_refresh(new_status):
2266 # compute next_refresh
2267 nonlocal task
2268 nonlocal next_check_at
2269 nonlocal db_ro_task_update
2270 nonlocal ro_task
2271
2272 next_refresh = time.time()
2273
2274 if task["item"] in ("image", "flavor"):
2275 next_refresh += self.refresh_config.image
2276 elif new_status == "BUILD":
2277 next_refresh += self.refresh_config.build
2278 elif new_status == "DONE":
2279 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2280 else:
2281 next_refresh += self.refresh_config.error
2282
2283 next_check_at = min(next_check_at, next_refresh)
2284 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2285 ro_task["vim_info"]["refresh_at"] = next_refresh
2286
2287 try:
2288 """
2289 # Log RO tasks only when loglevel is DEBUG
2290 if self.logger.getEffectiveLevel() == logging.DEBUG:
2291 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2292 """
2293 # Check if vim status refresh is enabled again
2294 self.update_vm_refresh(ro_task)
2295 # 0: get task_status_create
2296 lock_object = None
2297 task_status_create = None
2298 task_create = next(
2299 (
2300 t
2301 for t in ro_task["tasks"]
2302 if t
2303 and t["action"] == "CREATE"
2304 and t["status"] in ("BUILD", "DONE")
2305 ),
2306 None,
2307 )
2308
2309 if task_create:
2310 task_status_create = task_create["status"]
2311
2312 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2313 for task_action in ("DELETE", "CREATE", "EXEC"):
2314 db_vim_update = None
2315 new_status = None
2316
2317 for task_index, task in enumerate(ro_task["tasks"]):
2318 if not task:
2319 continue # task deleted
2320
2321 task_depends = {}
2322 target_update = None
2323
2324 if (
2325 (
2326 task_action in ("DELETE", "EXEC")
2327 and task["status"] not in ("SCHEDULED", "BUILD")
2328 )
2329 or task["action"] != task_action
2330 or (
2331 task_action == "CREATE"
2332 and task["status"] in ("FINISHED", "SUPERSEDED")
2333 )
2334 ):
2335 continue
2336
2337 task_path = "tasks.{}.status".format(task_index)
2338 try:
2339 db_vim_info_update = None
2340
2341 if task["status"] == "SCHEDULED":
2342 # check if tasks that this depends on have been completed
2343 dependency_not_completed = False
2344
2345 for dependency_task_id in task.get("depends_on") or ():
2346 (
2347 dependency_ro_task,
2348 dependency_task_index,
2349 ) = self._get_dependency(
2350 dependency_task_id, target_id=ro_task["target_id"]
2351 )
2352 dependency_task = dependency_ro_task["tasks"][
2353 dependency_task_index
2354 ]
2355 self.logger.debug(
2356 "dependency_ro_task={} dependency_task_index={}".format(
2357 dependency_ro_task, dependency_task_index
2358 )
2359 )
2360
2361 if dependency_task["status"] == "SCHEDULED":
2362 dependency_not_completed = True
2363 next_check_at = min(
2364 next_check_at, dependency_ro_task["to_check_at"]
2365 )
2366 # must allow dependent task to be processed first
2367 # to do this set time after last_task_processed
2368 next_check_at = max(
2369 self.time_last_task_processed, next_check_at
2370 )
2371 break
2372 elif dependency_task["status"] == "FAILED":
2373 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2374 task["action"],
2375 task["item"],
2376 dependency_task["action"],
2377 dependency_task["item"],
2378 dependency_task_id,
2379 dependency_ro_task["vim_info"].get(
2380 "vim_message"
2381 ),
2382 )
2383 self.logger.error(
2384 "task={} {}".format(task["task_id"], error_text)
2385 )
2386 raise NsWorkerException(error_text)
2387
2388 task_depends[dependency_task_id] = dependency_ro_task[
2389 "vim_info"
2390 ]["vim_id"]
2391 task_depends[
2392 "TASK-{}".format(dependency_task_id)
2393 ] = dependency_ro_task["vim_info"]["vim_id"]
2394
2395 if dependency_not_completed:
2396 self.logger.warning(
2397 "DEPENDENCY NOT COMPLETED {}".format(
2398 dependency_ro_task["vim_info"]["vim_id"]
2399 )
2400 )
2401 # TODO set at vim_info.vim_details that it is waiting
2402 continue
2403
2404 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2405 # the task of renew this locking. It will update database locket_at periodically
2406 if not lock_object:
2407 lock_object = LockRenew.add_lock_object(
2408 "ro_tasks", ro_task, self
2409 )
2410 if task["action"] == "DELETE":
2411 (
2412 new_status,
2413 db_vim_info_update,
2414 ) = self._delete_task(
2415 ro_task, task_index, task_depends, db_ro_task_update
2416 )
2417 new_status = (
2418 "FINISHED" if new_status == "DONE" else new_status
2419 )
2420 # ^with FINISHED instead of DONE it will not be refreshing
2421
2422 if new_status in ("FINISHED", "SUPERSEDED"):
2423 target_update = "DELETE"
2424 elif task["action"] == "EXEC":
2425 (
2426 new_status,
2427 db_vim_info_update,
2428 db_task_update,
2429 ) = self.item2class[task["item"]].exec(
2430 ro_task, task_index, task_depends
2431 )
2432 new_status = (
2433 "FINISHED" if new_status == "DONE" else new_status
2434 )
2435 # ^with FINISHED instead of DONE it will not be refreshing
2436
2437 if db_task_update:
2438 # load into database the modified db_task_update "retries" and "next_retry"
2439 if db_task_update.get("retries"):
2440 db_ro_task_update[
2441 "tasks.{}.retries".format(task_index)
2442 ] = db_task_update["retries"]
2443
2444 next_check_at = time.time() + db_task_update.get(
2445 "next_retry", 60
2446 )
2447 target_update = None
2448 elif task["action"] == "CREATE":
2449 if task["status"] == "SCHEDULED":
2450 if task_status_create:
2451 new_status = task_status_create
2452 target_update = "COPY_VIM_INFO"
2453 else:
2454 new_status, db_vim_info_update = self.item2class[
2455 task["item"]
2456 ].new(ro_task, task_index, task_depends)
2457 _update_refresh(new_status)
2458 else:
2459 refresh_at = ro_task["vim_info"]["refresh_at"]
2460 if refresh_at and refresh_at != -1 and now > refresh_at:
2461 (
2462 new_status,
2463 db_vim_info_update,
2464 ) = self.item2class[
2465 task["item"]
2466 ].refresh(ro_task)
2467 _update_refresh(new_status)
2468 else:
2469 # The refresh is updated to avoid set the value of "refresh_at" to
2470 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2471 # because it can happen that in this case the task is never processed
2472 _update_refresh(task["status"])
2473
2474 except Exception as e:
2475 new_status = "FAILED"
2476 db_vim_info_update = {
2477 "vim_status": "VIM_ERROR",
2478 "vim_message": str(e),
2479 }
2480
2481 if not isinstance(
2482 e, (NsWorkerException, vimconn.VimConnException)
2483 ):
2484 self.logger.error(
2485 "Unexpected exception at _delete_task task={}: {}".format(
2486 task["task_id"], e
2487 ),
2488 exc_info=True,
2489 )
2490
2491 try:
2492 if db_vim_info_update:
2493 db_vim_update = db_vim_info_update.copy()
2494 db_ro_task_update.update(
2495 {
2496 "vim_info." + k: v
2497 for k, v in db_vim_info_update.items()
2498 }
2499 )
2500 ro_task["vim_info"].update(db_vim_info_update)
2501
2502 if new_status:
2503 if task_action == "CREATE":
2504 task_status_create = new_status
2505 db_ro_task_update[task_path] = new_status
2506
2507 if target_update or db_vim_update:
2508 if target_update == "DELETE":
2509 self._update_target(task, None)
2510 elif target_update == "COPY_VIM_INFO":
2511 self._update_target(task, ro_task["vim_info"])
2512 else:
2513 self._update_target(task, db_vim_update)
2514
2515 except Exception as e:
2516 if (
2517 isinstance(e, DbException)
2518 and e.http_code == HTTPStatus.NOT_FOUND
2519 ):
2520 # if the vnfrs or nsrs has been removed from database, this task must be removed
2521 self.logger.debug(
2522 "marking to delete task={}".format(task["task_id"])
2523 )
2524 self.tasks_to_delete.append(task)
2525 else:
2526 self.logger.error(
2527 "Unexpected exception at _update_target task={}: {}".format(
2528 task["task_id"], e
2529 ),
2530 exc_info=True,
2531 )
2532
2533 locked_at = ro_task["locked_at"]
2534
2535 if lock_object:
2536 locked_at = [
2537 lock_object["locked_at"],
2538 lock_object["locked_at"] + self.task_locked_time,
2539 ]
2540 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2541 # contain exactly locked_at + self.task_locked_time
2542 LockRenew.remove_lock_object(lock_object)
2543
2544 q_filter = {
2545 "_id": ro_task["_id"],
2546 "to_check_at": ro_task["to_check_at"],
2547 "locked_at": locked_at,
2548 }
2549 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2550 # outside this task (by ro_nbi) do not update it
2551 db_ro_task_update["locked_by"] = None
2552 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2553 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2554 db_ro_task_update["modified_at"] = now
2555 db_ro_task_update["to_check_at"] = next_check_at
2556
2557 """
2558 # Log RO tasks only when loglevel is DEBUG
2559 if self.logger.getEffectiveLevel() == logging.DEBUG:
2560 db_ro_task_update_log = db_ro_task_update.copy()
2561 db_ro_task_update_log["_id"] = q_filter["_id"]
2562 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2563 """
2564
2565 if not self.db.set_one(
2566 "ro_tasks",
2567 update_dict=db_ro_task_update,
2568 q_filter=q_filter,
2569 fail_on_empty=False,
2570 ):
2571 del db_ro_task_update["to_check_at"]
2572 del q_filter["to_check_at"]
2573 """
2574 # Log RO tasks only when loglevel is DEBUG
2575 if self.logger.getEffectiveLevel() == logging.DEBUG:
2576 self._log_ro_task(
2577 None,
2578 db_ro_task_update_log,
2579 None,
2580 "TASK_WF",
2581 "SET_TASK " + str(q_filter),
2582 )
2583 """
2584 self.db.set_one(
2585 "ro_tasks",
2586 q_filter=q_filter,
2587 update_dict=db_ro_task_update,
2588 fail_on_empty=True,
2589 )
2590 except DbException as e:
2591 self.logger.error(
2592 "ro_task={} Error updating database {}".format(ro_task_id, e)
2593 )
2594 except Exception as e:
2595 self.logger.error(
2596 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2597 )
2598
2599 def _update_target(self, task, ro_vim_item_update):
2600 table, _, temp = task["target_record"].partition(":")
2601 _id, _, path_vim_status = temp.partition(":")
2602 path_item = path_vim_status[: path_vim_status.rfind(".")]
2603 path_item = path_item[: path_item.rfind(".")]
2604 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2605 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2606
2607 if ro_vim_item_update:
2608 update_dict = {
2609 path_vim_status + "." + k: v
2610 for k, v in ro_vim_item_update.items()
2611 if k
2612 in (
2613 "vim_id",
2614 "vim_details",
2615 "vim_message",
2616 "vim_name",
2617 "vim_status",
2618 "interfaces",
2619 "interfaces_backup",
2620 )
2621 }
2622
2623 if path_vim_status.startswith("vdur."):
2624 # for backward compatibility, add vdur.name apart from vdur.vim_name
2625 if ro_vim_item_update.get("vim_name"):
2626 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2627
2628 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2629 if ro_vim_item_update.get("vim_id"):
2630 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2631
2632 # update general status
2633 if ro_vim_item_update.get("vim_status"):
2634 update_dict[path_item + ".status"] = ro_vim_item_update[
2635 "vim_status"
2636 ]
2637
2638 if ro_vim_item_update.get("interfaces"):
2639 path_interfaces = path_item + ".interfaces"
2640
2641 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2642 if iface:
2643 update_dict.update(
2644 {
2645 path_interfaces + ".{}.".format(i) + k: v
2646 for k, v in iface.items()
2647 if k in ("vlan", "compute_node", "pci")
2648 }
2649 )
2650
2651 # put ip_address and mac_address with ip-address and mac-address
2652 if iface.get("ip_address"):
2653 update_dict[
2654 path_interfaces + ".{}.".format(i) + "ip-address"
2655 ] = iface["ip_address"]
2656
2657 if iface.get("mac_address"):
2658 update_dict[
2659 path_interfaces + ".{}.".format(i) + "mac-address"
2660 ] = iface["mac_address"]
2661
2662 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2663 update_dict["ip-address"] = iface.get("ip_address").split(
2664 ";"
2665 )[0]
2666
2667 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2668 update_dict[path_item + ".ip-address"] = iface.get(
2669 "ip_address"
2670 ).split(";")[0]
2671
2672 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2673
2674 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2675 if ro_vim_item_update.get("interfaces"):
2676 search_key = path_vim_status + ".interfaces"
2677 if update_dict.get(search_key):
2678 interfaces_backup_update = {
2679 path_vim_status + ".interfaces_backup": update_dict[search_key]
2680 }
2681
2682 self.db.set_one(
2683 table,
2684 q_filter={"_id": _id},
2685 update_dict=interfaces_backup_update,
2686 )
2687
2688 else:
2689 update_dict = {path_item + ".status": "DELETED"}
2690 self.db.set_one(
2691 table,
2692 q_filter={"_id": _id},
2693 update_dict=update_dict,
2694 unset={path_vim_status: None},
2695 )
2696
2697 def _process_delete_db_tasks(self):
2698 """
2699 Delete task from database because vnfrs or nsrs or both have been deleted
2700 :return: None. Uses and modify self.tasks_to_delete
2701 """
2702 while self.tasks_to_delete:
2703 task = self.tasks_to_delete[0]
2704 vnfrs_deleted = None
2705 nsr_id = task["nsr_id"]
2706
2707 if task["target_record"].startswith("vnfrs:"):
2708 # check if nsrs is present
2709 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2710 vnfrs_deleted = task["target_record"].split(":")[1]
2711
2712 try:
2713 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2714 except Exception as e:
2715 self.logger.error(
2716 "Error deleting task={}: {}".format(task["task_id"], e)
2717 )
2718 self.tasks_to_delete.pop(0)
2719
2720 @staticmethod
2721 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2722 """
2723 Static method because it is called from osm_ng_ro.ns
2724 :param db: instance of database to use
2725 :param nsr_id: affected nsrs id
2726 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2727 :return: None, exception is fails
2728 """
2729 retries = 5
2730 for retry in range(retries):
2731 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2732 now = time.time()
2733 conflict = False
2734
2735 for ro_task in ro_tasks:
2736 db_update = {}
2737 to_delete_ro_task = True
2738
2739 for index, task in enumerate(ro_task["tasks"]):
2740 if not task:
2741 pass
2742 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2743 vnfrs_deleted
2744 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2745 ):
2746 db_update["tasks.{}".format(index)] = None
2747 else:
2748 # used by other nsr, ro_task cannot be deleted
2749 to_delete_ro_task = False
2750
2751 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2752 if to_delete_ro_task:
2753 if not db.del_one(
2754 "ro_tasks",
2755 q_filter={
2756 "_id": ro_task["_id"],
2757 "modified_at": ro_task["modified_at"],
2758 },
2759 fail_on_empty=False,
2760 ):
2761 conflict = True
2762 elif db_update:
2763 db_update["modified_at"] = now
2764 if not db.set_one(
2765 "ro_tasks",
2766 q_filter={
2767 "_id": ro_task["_id"],
2768 "modified_at": ro_task["modified_at"],
2769 },
2770 update_dict=db_update,
2771 fail_on_empty=False,
2772 ):
2773 conflict = True
2774 if not conflict:
2775 return
2776 else:
2777 raise NsWorkerException("Exceeded {} retries".format(retries))
2778
2779 def run(self):
2780 # load database
2781 self.logger.info("Starting")
2782 while True:
2783 # step 1: get commands from queue
2784 try:
2785 if self.vim_targets:
2786 task = self.task_queue.get(block=False)
2787 else:
2788 if not self.idle:
2789 self.logger.debug("enters in idle state")
2790 self.idle = True
2791 task = self.task_queue.get(block=True)
2792 self.idle = False
2793
2794 if task[0] == "terminate":
2795 break
2796 elif task[0] == "load_vim":
2797 self.logger.info("order to load vim {}".format(task[1]))
2798 self._load_vim(task[1])
2799 elif task[0] == "unload_vim":
2800 self.logger.info("order to unload vim {}".format(task[1]))
2801 self._unload_vim(task[1])
2802 elif task[0] == "reload_vim":
2803 self._reload_vim(task[1])
2804 elif task[0] == "check_vim":
2805 self.logger.info("order to check vim {}".format(task[1]))
2806 self._check_vim(task[1])
2807 continue
2808 except Exception as e:
2809 if isinstance(e, queue.Empty):
2810 pass
2811 else:
2812 self.logger.critical(
2813 "Error processing task: {}".format(e), exc_info=True
2814 )
2815
2816 # step 2: process pending_tasks, delete not needed tasks
2817 try:
2818 if self.tasks_to_delete:
2819 self._process_delete_db_tasks()
2820 busy = False
2821 """
2822 # Log RO tasks only when loglevel is DEBUG
2823 if self.logger.getEffectiveLevel() == logging.DEBUG:
2824 _ = self._get_db_all_tasks()
2825 """
2826 ro_task = self._get_db_task()
2827 if ro_task:
2828 self.logger.debug("Task to process: {}".format(ro_task))
2829 time.sleep(1)
2830 self._process_pending_tasks(ro_task)
2831 busy = True
2832 if not busy:
2833 time.sleep(5)
2834 except Exception as e:
2835 self.logger.critical(
2836 "Unexpected exception at run: " + str(e), exc_info=True
2837 )
2838
2839 self.logger.info("Finishing")