Feature 10972: Support of volume multi-attach
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn
43 from osm_ro_plugin import vimconn
44 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 import yaml
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
223
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
234
235 return "FAILED", ro_vim_item_update
236
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
243
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
247
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
263
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 ro_vim_item_update.get("vim_message")
289 if ro_vim_item_update.get("vim_status") != "ACTIVE"
290 else "",
291 )
292 )
293
294 return task_status, ro_vim_item_update
295
296 def delete(self, ro_task, task_index):
297 task = ro_task["tasks"][task_index]
298 task_id = task["task_id"]
299 net_vim_id = ro_task["vim_info"]["vim_id"]
300 ro_vim_item_update_ok = {
301 "vim_status": "DELETED",
302 "created": False,
303 "vim_message": "DELETED",
304 "vim_id": None,
305 }
306
307 try:
308 if net_vim_id or ro_task["vim_info"]["created_items"]:
309 target_vim = self.my_vims[ro_task["target_id"]]
310 target_vim.delete_network(
311 net_vim_id, ro_task["vim_info"]["created_items"]
312 )
313 except vimconn.VimConnNotFoundException:
314 ro_vim_item_update_ok["vim_message"] = "already deleted"
315 except vimconn.VimConnException as e:
316 self.logger.error(
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task["_id"], ro_task["target_id"], net_vim_id, e
319 )
320 )
321 ro_vim_item_update = {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e),
324 }
325
326 return "FAILED", ro_vim_item_update
327
328 self.logger.debug(
329 "task={} {} del-net={} {}".format(
330 task_id,
331 ro_task["target_id"],
332 net_vim_id,
333 ro_vim_item_update_ok.get("vim_message", ""),
334 )
335 )
336
337 return "DONE", ro_vim_item_update_ok
338
339
340 class VimInteractionVdu(VimInteractionBase):
341 max_retries_inject_ssh_key = 20 # 20 times
342 time_retries_inject_ssh_key = 30 # wevery 30 seconds
343
344 def new(self, ro_task, task_index, task_depends):
345 task = ro_task["tasks"][task_index]
346 task_id = task["task_id"]
347 created = False
348 created_items = {}
349 target_vim = self.my_vims[ro_task["target_id"]]
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
392 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393
394 # add to created items previous_created_volumes (healing)
395 if task.get("previous_created_volumes"):
396 for k, v in task["previous_created_volumes"].items():
397 created_items[k] = v
398
399 ro_vim_item_update = {
400 "vim_id": vim_vm_id,
401 "vim_status": "BUILD",
402 "created": created,
403 "created_items": created_items,
404 "vim_details": None,
405 "vim_message": None,
406 "interfaces_vim_ids": interfaces,
407 "interfaces": [],
408 "interfaces_backup": [],
409 }
410 self.logger.debug(
411 "task={} {} new-vm={} created={}".format(
412 task_id, ro_task["target_id"], vim_vm_id, created
413 )
414 )
415
416 return "BUILD", ro_vim_item_update
417 except (vimconn.VimConnException, NsWorkerException) as e:
418 self.logger.debug(traceback.format_exc())
419 self.logger.error(
420 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
421 )
422 ro_vim_item_update = {
423 "vim_status": "VIM_ERROR",
424 "created": created,
425 "vim_message": str(e),
426 }
427
428 return "FAILED", ro_vim_item_update
429
430 def delete(self, ro_task, task_index):
431 task = ro_task["tasks"][task_index]
432 task_id = task["task_id"]
433 vm_vim_id = ro_task["vim_info"]["vim_id"]
434 ro_vim_item_update_ok = {
435 "vim_status": "DELETED",
436 "created": False,
437 "vim_message": "DELETED",
438 "vim_id": None,
439 }
440
441 try:
442 self.logger.debug(
443 "delete_vminstance: vm_vim_id={} created_items={}".format(
444 vm_vim_id, ro_task["vim_info"]["created_items"]
445 )
446 )
447 if vm_vim_id or ro_task["vim_info"]["created_items"]:
448 target_vim = self.my_vims[ro_task["target_id"]]
449 target_vim.delete_vminstance(
450 vm_vim_id,
451 ro_task["vim_info"]["created_items"],
452 ro_task["vim_info"].get("volumes_to_hold", []),
453 )
454 except vimconn.VimConnNotFoundException:
455 ro_vim_item_update_ok["vim_message"] = "already deleted"
456 except vimconn.VimConnException as e:
457 self.logger.error(
458 "ro_task={} vim={} del-vm={}: {}".format(
459 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
460 )
461 )
462 ro_vim_item_update = {
463 "vim_status": "VIM_ERROR",
464 "vim_message": "Error while deleting: {}".format(e),
465 }
466
467 return "FAILED", ro_vim_item_update
468
469 self.logger.debug(
470 "task={} {} del-vm={} {}".format(
471 task_id,
472 ro_task["target_id"],
473 vm_vim_id,
474 ro_vim_item_update_ok.get("vim_message", ""),
475 )
476 )
477
478 return "DONE", ro_vim_item_update_ok
479
480 def refresh(self, ro_task):
481 """Call VIM to get vm status"""
482 ro_task_id = ro_task["_id"]
483 target_vim = self.my_vims[ro_task["target_id"]]
484 vim_id = ro_task["vim_info"]["vim_id"]
485
486 if not vim_id:
487 return None, None
488
489 vm_to_refresh_list = [vim_id]
490 try:
491 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
492 vim_info = vim_dict[vim_id]
493
494 if vim_info["status"] == "ACTIVE":
495 task_status = "DONE"
496 elif vim_info["status"] == "BUILD":
497 task_status = "BUILD"
498 else:
499 task_status = "FAILED"
500
501 # try to load and parse vim_information
502 try:
503 vim_info_info = yaml.safe_load(vim_info["vim_info"])
504 if vim_info_info.get("name"):
505 vim_info["name"] = vim_info_info["name"]
506 except Exception as vim_info_error:
507 self.logger.exception(
508 f"{vim_info_error} occured while getting the vim_info from yaml"
509 )
510 except vimconn.VimConnException as e:
511 # Mark all tasks at VIM_ERROR status
512 self.logger.error(
513 "ro_task={} vim={} get-vm={}: {}".format(
514 ro_task_id, ro_task["target_id"], vim_id, e
515 )
516 )
517 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
518 task_status = "FAILED"
519
520 ro_vim_item_update = {}
521
522 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
523 vim_interfaces = []
524 if vim_info.get("interfaces"):
525 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
526 iface = next(
527 (
528 iface
529 for iface in vim_info["interfaces"]
530 if vim_iface_id == iface["vim_interface_id"]
531 ),
532 None,
533 )
534 # if iface:
535 # iface.pop("vim_info", None)
536 vim_interfaces.append(iface)
537
538 task_create = next(
539 t
540 for t in ro_task["tasks"]
541 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
542 )
543 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
544 vim_interfaces[task_create["mgmt_vnf_interface"]][
545 "mgmt_vnf_interface"
546 ] = True
547
548 mgmt_vdu_iface = task_create.get(
549 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
550 )
551 if vim_interfaces:
552 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
553
554 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
555 ro_vim_item_update["interfaces"] = vim_interfaces
556
557 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
558 ro_vim_item_update["vim_status"] = vim_info["status"]
559
560 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
561 ro_vim_item_update["vim_name"] = vim_info.get("name")
562
563 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
564 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
565 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
566 elif vim_info["status"] == "DELETED":
567 ro_vim_item_update["vim_id"] = None
568 ro_vim_item_update["vim_message"] = "Deleted externally"
569 else:
570 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
571 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
572
573 if ro_vim_item_update:
574 self.logger.debug(
575 "ro_task={} {} get-vm={}: status={} {}".format(
576 ro_task_id,
577 ro_task["target_id"],
578 vim_id,
579 ro_vim_item_update.get("vim_status"),
580 ro_vim_item_update.get("vim_message")
581 if ro_vim_item_update.get("vim_status") != "ACTIVE"
582 else "",
583 )
584 )
585
586 return task_status, ro_vim_item_update
587
588 def exec(self, ro_task, task_index, task_depends):
589 task = ro_task["tasks"][task_index]
590 task_id = task["task_id"]
591 target_vim = self.my_vims[ro_task["target_id"]]
592 db_task_update = {"retries": 0}
593 retries = task.get("retries", 0)
594
595 try:
596 params = task["params"]
597 params_copy = deepcopy(params)
598 params_copy["ro_key"] = self.db.decrypt(
599 params_copy.pop("private_key"),
600 params_copy.pop("schema_version"),
601 params_copy.pop("salt"),
602 )
603 params_copy["ip_addr"] = params_copy.pop("ip_address")
604 target_vim.inject_user_key(**params_copy)
605 self.logger.debug(
606 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
607 )
608
609 return (
610 "DONE",
611 None,
612 db_task_update,
613 ) # params_copy["key"]
614 except (vimconn.VimConnException, NsWorkerException) as e:
615 retries += 1
616
617 self.logger.debug(traceback.format_exc())
618 if retries < self.max_retries_inject_ssh_key:
619 return (
620 "BUILD",
621 None,
622 {
623 "retries": retries,
624 "next_retry": self.time_retries_inject_ssh_key,
625 },
626 )
627
628 self.logger.error(
629 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
630 )
631 ro_vim_item_update = {"vim_message": str(e)}
632
633 return "FAILED", ro_vim_item_update, db_task_update
634
635
636 class VimInteractionImage(VimInteractionBase):
637 def new(self, ro_task, task_index, task_depends):
638 task = ro_task["tasks"][task_index]
639 task_id = task["task_id"]
640 created = False
641 created_items = {}
642 target_vim = self.my_vims[ro_task["target_id"]]
643
644 try:
645 # FIND
646 if task.get("find_params"):
647 vim_images = target_vim.get_image_list(**task["find_params"])
648
649 if not vim_images:
650 raise NsWorkerExceptionNotFound(
651 "Image not found with this criteria: '{}'".format(
652 task["find_params"]
653 )
654 )
655 elif len(vim_images) > 1:
656 raise NsWorkerException(
657 "More than one image found with this criteria: '{}'".format(
658 task["find_params"]
659 )
660 )
661 else:
662 vim_image_id = vim_images[0]["id"]
663
664 ro_vim_item_update = {
665 "vim_id": vim_image_id,
666 "vim_status": "DONE",
667 "created": created,
668 "created_items": created_items,
669 "vim_details": None,
670 "vim_message": None,
671 }
672 self.logger.debug(
673 "task={} {} new-image={} created={}".format(
674 task_id, ro_task["target_id"], vim_image_id, created
675 )
676 )
677
678 return "DONE", ro_vim_item_update
679 except (NsWorkerException, vimconn.VimConnException) as e:
680 self.logger.error(
681 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
682 )
683 ro_vim_item_update = {
684 "vim_status": "VIM_ERROR",
685 "created": created,
686 "vim_message": str(e),
687 }
688
689 return "FAILED", ro_vim_item_update
690
691
692 class VimInteractionSharedVolume(VimInteractionBase):
693 def delete(self, ro_task, task_index):
694 task = ro_task["tasks"][task_index]
695 task_id = task["task_id"]
696 shared_volume_vim_id = ro_task["vim_info"]["vim_id"]
697 ro_vim_item_update_ok = {
698 "vim_status": "DELETED",
699 "created": False,
700 "vim_message": "DELETED",
701 "vim_id": None,
702 }
703 try:
704 if shared_volume_vim_id:
705 target_vim = self.my_vims[ro_task["target_id"]]
706 target_vim.delete_shared_volumes(shared_volume_vim_id)
707 except vimconn.VimConnNotFoundException:
708 ro_vim_item_update_ok["vim_message"] = "already deleted"
709 except vimconn.VimConnException as e:
710 self.logger.error(
711 "ro_task={} vim={} del-shared-volume={}: {}".format(
712 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e
713 )
714 )
715 ro_vim_item_update = {
716 "vim_status": "VIM_ERROR",
717 "vim_message": "Error while deleting: {}".format(e),
718 }
719
720 return "FAILED", ro_vim_item_update
721
722 self.logger.debug(
723 "task={} {} del-shared-volume={} {}".format(
724 task_id,
725 ro_task["target_id"],
726 shared_volume_vim_id,
727 ro_vim_item_update_ok.get("vim_message", ""),
728 )
729 )
730
731 return "DONE", ro_vim_item_update_ok
732
733 def new(self, ro_task, task_index, task_depends):
734 task = ro_task["tasks"][task_index]
735 task_id = task["task_id"]
736 created = False
737 created_items = {}
738 target_vim = self.my_vims[ro_task["target_id"]]
739
740 try:
741 shared_volume_name = None
742 shared_volume_vim_id = None
743 shared_volume_data = None
744
745 if task.get("params"):
746 shared_volume_data = task["params"]
747
748 if shared_volume_data:
749 self.logger.info(
750 f"Creating the new shared_volume for {shared_volume_data}\n"
751 )
752 (
753 shared_volume_name,
754 shared_volume_vim_id,
755 ) = target_vim.new_shared_volumes(shared_volume_data)
756 created = True
757 created_items[shared_volume_vim_id] = shared_volume_name
758
759 ro_vim_item_update = {
760 "vim_id": shared_volume_vim_id,
761 "vim_status": "DONE",
762 "created": created,
763 "created_items": created_items,
764 "vim_details": None,
765 "vim_message": None,
766 }
767 self.logger.debug(
768 "task={} {} new-shared-volume={} created={}".format(
769 task_id, ro_task["target_id"], shared_volume_vim_id, created
770 )
771 )
772
773 return "DONE", ro_vim_item_update
774 except (vimconn.VimConnException, NsWorkerException) as e:
775 self.logger.error(
776 "task={} vim={} new-shared-volume:"
777 " {}".format(task_id, ro_task["target_id"], e)
778 )
779 ro_vim_item_update = {
780 "vim_status": "VIM_ERROR",
781 "created": created,
782 "vim_message": str(e),
783 }
784
785 return "FAILED", ro_vim_item_update
786
787
788 class VimInteractionFlavor(VimInteractionBase):
789 def delete(self, ro_task, task_index):
790 task = ro_task["tasks"][task_index]
791 task_id = task["task_id"]
792 flavor_vim_id = ro_task["vim_info"]["vim_id"]
793 ro_vim_item_update_ok = {
794 "vim_status": "DELETED",
795 "created": False,
796 "vim_message": "DELETED",
797 "vim_id": None,
798 }
799
800 try:
801 if flavor_vim_id:
802 target_vim = self.my_vims[ro_task["target_id"]]
803 target_vim.delete_flavor(flavor_vim_id)
804 except vimconn.VimConnNotFoundException:
805 ro_vim_item_update_ok["vim_message"] = "already deleted"
806 except vimconn.VimConnException as e:
807 self.logger.error(
808 "ro_task={} vim={} del-flavor={}: {}".format(
809 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
810 )
811 )
812 ro_vim_item_update = {
813 "vim_status": "VIM_ERROR",
814 "vim_message": "Error while deleting: {}".format(e),
815 }
816
817 return "FAILED", ro_vim_item_update
818
819 self.logger.debug(
820 "task={} {} del-flavor={} {}".format(
821 task_id,
822 ro_task["target_id"],
823 flavor_vim_id,
824 ro_vim_item_update_ok.get("vim_message", ""),
825 )
826 )
827
828 return "DONE", ro_vim_item_update_ok
829
830 def new(self, ro_task, task_index, task_depends):
831 task = ro_task["tasks"][task_index]
832 task_id = task["task_id"]
833 created = False
834 created_items = {}
835 target_vim = self.my_vims[ro_task["target_id"]]
836 try:
837 # FIND
838 vim_flavor_id = None
839
840 if task.get("find_params"):
841 try:
842 flavor_data = task["find_params"]["flavor_data"]
843 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
844 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
845 self.logger.warning(
846 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
847 )
848
849 if not vim_flavor_id and task.get("params"):
850 # CREATE
851 flavor_data = task["params"]["flavor_data"]
852 vim_flavor_id = target_vim.new_flavor(flavor_data)
853 created = True
854
855 ro_vim_item_update = {
856 "vim_id": vim_flavor_id,
857 "vim_status": "DONE",
858 "created": created,
859 "created_items": created_items,
860 "vim_details": None,
861 "vim_message": None,
862 }
863 self.logger.debug(
864 "task={} {} new-flavor={} created={}".format(
865 task_id, ro_task["target_id"], vim_flavor_id, created
866 )
867 )
868
869 return "DONE", ro_vim_item_update
870 except (vimconn.VimConnException, NsWorkerException) as e:
871 self.logger.error(
872 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
873 )
874 ro_vim_item_update = {
875 "vim_status": "VIM_ERROR",
876 "created": created,
877 "vim_message": str(e),
878 }
879
880 return "FAILED", ro_vim_item_update
881
882
883 class VimInteractionAffinityGroup(VimInteractionBase):
884 def delete(self, ro_task, task_index):
885 task = ro_task["tasks"][task_index]
886 task_id = task["task_id"]
887 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
888 ro_vim_item_update_ok = {
889 "vim_status": "DELETED",
890 "created": False,
891 "vim_message": "DELETED",
892 "vim_id": None,
893 }
894
895 try:
896 if affinity_group_vim_id:
897 target_vim = self.my_vims[ro_task["target_id"]]
898 target_vim.delete_affinity_group(affinity_group_vim_id)
899 except vimconn.VimConnNotFoundException:
900 ro_vim_item_update_ok["vim_message"] = "already deleted"
901 except vimconn.VimConnException as e:
902 self.logger.error(
903 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
904 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
905 )
906 )
907 ro_vim_item_update = {
908 "vim_status": "VIM_ERROR",
909 "vim_message": "Error while deleting: {}".format(e),
910 }
911
912 return "FAILED", ro_vim_item_update
913
914 self.logger.debug(
915 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
916 task_id,
917 ro_task["target_id"],
918 affinity_group_vim_id,
919 ro_vim_item_update_ok.get("vim_message", ""),
920 )
921 )
922
923 return "DONE", ro_vim_item_update_ok
924
925 def new(self, ro_task, task_index, task_depends):
926 task = ro_task["tasks"][task_index]
927 task_id = task["task_id"]
928 created = False
929 created_items = {}
930 target_vim = self.my_vims[ro_task["target_id"]]
931
932 try:
933 affinity_group_vim_id = None
934 affinity_group_data = None
935
936 if task.get("params"):
937 affinity_group_data = task["params"].get("affinity_group_data")
938
939 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
940 try:
941 param_affinity_group_id = task["params"]["affinity_group_data"].get(
942 "vim-affinity-group-id"
943 )
944 affinity_group_vim_id = target_vim.get_affinity_group(
945 param_affinity_group_id
946 ).get("id")
947 except vimconn.VimConnNotFoundException:
948 self.logger.error(
949 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
950 "could not be found at VIM. Creating a new one.".format(
951 task_id, ro_task["target_id"], param_affinity_group_id
952 )
953 )
954
955 if not affinity_group_vim_id and affinity_group_data:
956 affinity_group_vim_id = target_vim.new_affinity_group(
957 affinity_group_data
958 )
959 created = True
960
961 ro_vim_item_update = {
962 "vim_id": affinity_group_vim_id,
963 "vim_status": "DONE",
964 "created": created,
965 "created_items": created_items,
966 "vim_details": None,
967 "vim_message": None,
968 }
969 self.logger.debug(
970 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
971 task_id, ro_task["target_id"], affinity_group_vim_id, created
972 )
973 )
974
975 return "DONE", ro_vim_item_update
976 except (vimconn.VimConnException, NsWorkerException) as e:
977 self.logger.error(
978 "task={} vim={} new-affinity-or-anti-affinity-group:"
979 " {}".format(task_id, ro_task["target_id"], e)
980 )
981 ro_vim_item_update = {
982 "vim_status": "VIM_ERROR",
983 "created": created,
984 "vim_message": str(e),
985 }
986
987 return "FAILED", ro_vim_item_update
988
989
990 class VimInteractionUpdateVdu(VimInteractionBase):
991 def exec(self, ro_task, task_index, task_depends):
992 task = ro_task["tasks"][task_index]
993 task_id = task["task_id"]
994 db_task_update = {"retries": 0}
995 created = False
996 created_items = {}
997 target_vim = self.my_vims[ro_task["target_id"]]
998
999 try:
1000 if task.get("params"):
1001 vim_vm_id = task["params"].get("vim_vm_id")
1002 action = task["params"].get("action")
1003 context = {action: action}
1004 target_vim.action_vminstance(vim_vm_id, context)
1005 # created = True
1006 ro_vim_item_update = {
1007 "vim_id": vim_vm_id,
1008 "vim_status": "DONE",
1009 "created": created,
1010 "created_items": created_items,
1011 "vim_details": None,
1012 "vim_message": None,
1013 }
1014 self.logger.debug(
1015 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1016 )
1017 return "DONE", ro_vim_item_update, db_task_update
1018 except (vimconn.VimConnException, NsWorkerException) as e:
1019 self.logger.error(
1020 "task={} vim={} VM Migration:"
1021 " {}".format(task_id, ro_task["target_id"], e)
1022 )
1023 ro_vim_item_update = {
1024 "vim_status": "VIM_ERROR",
1025 "created": created,
1026 "vim_message": str(e),
1027 }
1028
1029 return "FAILED", ro_vim_item_update, db_task_update
1030
1031
1032 class VimInteractionSdnNet(VimInteractionBase):
1033 @staticmethod
1034 def _match_pci(port_pci, mapping):
1035 """
1036 Check if port_pci matches with mapping
1037 mapping can have brackets to indicate that several chars are accepted. e.g
1038 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1039 :param port_pci: text
1040 :param mapping: text, can contain brackets to indicate several chars are available
1041 :return: True if matches, False otherwise
1042 """
1043 if not port_pci or not mapping:
1044 return False
1045 if port_pci == mapping:
1046 return True
1047
1048 mapping_index = 0
1049 pci_index = 0
1050 while True:
1051 bracket_start = mapping.find("[", mapping_index)
1052
1053 if bracket_start == -1:
1054 break
1055
1056 bracket_end = mapping.find("]", bracket_start)
1057 if bracket_end == -1:
1058 break
1059
1060 length = bracket_start - mapping_index
1061 if (
1062 length
1063 and port_pci[pci_index : pci_index + length]
1064 != mapping[mapping_index:bracket_start]
1065 ):
1066 return False
1067
1068 if (
1069 port_pci[pci_index + length]
1070 not in mapping[bracket_start + 1 : bracket_end]
1071 ):
1072 return False
1073
1074 pci_index += length + 1
1075 mapping_index = bracket_end + 1
1076
1077 if port_pci[pci_index:] != mapping[mapping_index:]:
1078 return False
1079
1080 return True
1081
1082 def _get_interfaces(self, vlds_to_connect, vim_account_id):
1083 """
1084 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1085 :param vim_account_id:
1086 :return:
1087 """
1088 interfaces = []
1089
1090 for vld in vlds_to_connect:
1091 table, _, db_id = vld.partition(":")
1092 db_id, _, vld = db_id.partition(":")
1093 _, _, vld_id = vld.partition(".")
1094
1095 if table == "vnfrs":
1096 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1097 iface_key = "vnf-vld-id"
1098 else: # table == "nsrs"
1099 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1100 iface_key = "ns-vld-id"
1101
1102 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1103
1104 for db_vnfr in db_vnfrs:
1105 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1106 for iface_index, interface in enumerate(vdur["interfaces"]):
1107 if interface.get(iface_key) == vld_id and interface.get(
1108 "type"
1109 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1110 # only SR-IOV o PT
1111 interface_ = interface.copy()
1112 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1113 db_vnfr["_id"], vdu_index, iface_index
1114 )
1115
1116 if vdur.get("status") == "ERROR":
1117 interface_["status"] = "ERROR"
1118
1119 interfaces.append(interface_)
1120
1121 return interfaces
1122
1123 def refresh(self, ro_task):
1124 # look for task create
1125 task_create_index, _ = next(
1126 i_t
1127 for i_t in enumerate(ro_task["tasks"])
1128 if i_t[1]
1129 and i_t[1]["action"] == "CREATE"
1130 and i_t[1]["status"] != "FINISHED"
1131 )
1132
1133 return self.new(ro_task, task_create_index, None)
1134
1135 def new(self, ro_task, task_index, task_depends):
1136 task = ro_task["tasks"][task_index]
1137 task_id = task["task_id"]
1138 target_vim = self.my_vims[ro_task["target_id"]]
1139
1140 sdn_net_id = ro_task["vim_info"]["vim_id"]
1141
1142 created_items = ro_task["vim_info"].get("created_items")
1143 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1144 new_connected_ports = []
1145 last_update = ro_task["vim_info"].get("last_update", 0)
1146 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1147 error_list = []
1148 created = ro_task["vim_info"].get("created", False)
1149
1150 try:
1151 # CREATE
1152 params = task["params"]
1153 vlds_to_connect = params.get("vlds", [])
1154 associated_vim = params.get("target_vim")
1155 # external additional ports
1156 additional_ports = params.get("sdn-ports") or ()
1157 _, _, vim_account_id = (
1158 (None, None, None)
1159 if associated_vim is None
1160 else associated_vim.partition(":")
1161 )
1162
1163 if associated_vim:
1164 # get associated VIM
1165 if associated_vim not in self.db_vims:
1166 self.db_vims[associated_vim] = self.db.get_one(
1167 "vim_accounts", {"_id": vim_account_id}
1168 )
1169
1170 db_vim = self.db_vims[associated_vim]
1171
1172 # look for ports to connect
1173 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1174 # print(ports)
1175
1176 sdn_ports = []
1177 pending_ports = error_ports = 0
1178 vlan_used = None
1179 sdn_need_update = False
1180
1181 for port in ports:
1182 vlan_used = port.get("vlan") or vlan_used
1183
1184 # TODO. Do not connect if already done
1185 if not port.get("compute_node") or not port.get("pci"):
1186 if port.get("status") == "ERROR":
1187 error_ports += 1
1188 else:
1189 pending_ports += 1
1190 continue
1191
1192 pmap = None
1193 compute_node_mappings = next(
1194 (
1195 c
1196 for c in db_vim["config"].get("sdn-port-mapping", ())
1197 if c and c["compute_node"] == port["compute_node"]
1198 ),
1199 None,
1200 )
1201
1202 if compute_node_mappings:
1203 # process port_mapping pci of type 0000:af:1[01].[1357]
1204 pmap = next(
1205 (
1206 p
1207 for p in compute_node_mappings["ports"]
1208 if self._match_pci(port["pci"], p.get("pci"))
1209 ),
1210 None,
1211 )
1212
1213 if not pmap:
1214 if not db_vim["config"].get("mapping_not_needed"):
1215 error_list.append(
1216 "Port mapping not found for compute_node={} pci={}".format(
1217 port["compute_node"], port["pci"]
1218 )
1219 )
1220 continue
1221
1222 pmap = {}
1223
1224 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1225 new_port = {
1226 "service_endpoint_id": pmap.get("service_endpoint_id")
1227 or service_endpoint_id,
1228 "service_endpoint_encapsulation_type": "dot1q"
1229 if port["type"] == "SR-IOV"
1230 else None,
1231 "service_endpoint_encapsulation_info": {
1232 "vlan": port.get("vlan"),
1233 "mac": port.get("mac-address"),
1234 "device_id": pmap.get("device_id") or port["compute_node"],
1235 "device_interface_id": pmap.get("device_interface_id")
1236 or port["pci"],
1237 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1238 "switch_port": pmap.get("switch_port"),
1239 "service_mapping_info": pmap.get("service_mapping_info"),
1240 },
1241 }
1242
1243 # TODO
1244 # if port["modified_at"] > last_update:
1245 # sdn_need_update = True
1246 new_connected_ports.append(port["id"]) # TODO
1247 sdn_ports.append(new_port)
1248
1249 if error_ports:
1250 error_list.append(
1251 "{} interfaces have not been created as VDU is on ERROR status".format(
1252 error_ports
1253 )
1254 )
1255
1256 # connect external ports
1257 for index, additional_port in enumerate(additional_ports):
1258 additional_port_id = additional_port.get(
1259 "service_endpoint_id"
1260 ) or "external-{}".format(index)
1261 sdn_ports.append(
1262 {
1263 "service_endpoint_id": additional_port_id,
1264 "service_endpoint_encapsulation_type": additional_port.get(
1265 "service_endpoint_encapsulation_type", "dot1q"
1266 ),
1267 "service_endpoint_encapsulation_info": {
1268 "vlan": additional_port.get("vlan") or vlan_used,
1269 "mac": additional_port.get("mac_address"),
1270 "device_id": additional_port.get("device_id"),
1271 "device_interface_id": additional_port.get(
1272 "device_interface_id"
1273 ),
1274 "switch_dpid": additional_port.get("switch_dpid")
1275 or additional_port.get("switch_id"),
1276 "switch_port": additional_port.get("switch_port"),
1277 "service_mapping_info": additional_port.get(
1278 "service_mapping_info"
1279 ),
1280 },
1281 }
1282 )
1283 new_connected_ports.append(additional_port_id)
1284 sdn_info = ""
1285
1286 # if there are more ports to connect or they have been modified, call create/update
1287 if error_list:
1288 sdn_status = "ERROR"
1289 sdn_info = "; ".join(error_list)
1290 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1291 last_update = time.time()
1292
1293 if not sdn_net_id:
1294 if len(sdn_ports) < 2:
1295 sdn_status = "ACTIVE"
1296
1297 if not pending_ports:
1298 self.logger.debug(
1299 "task={} {} new-sdn-net done, less than 2 ports".format(
1300 task_id, ro_task["target_id"]
1301 )
1302 )
1303 else:
1304 net_type = params.get("type") or "ELAN"
1305 (
1306 sdn_net_id,
1307 created_items,
1308 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1309 created = True
1310 self.logger.debug(
1311 "task={} {} new-sdn-net={} created={}".format(
1312 task_id, ro_task["target_id"], sdn_net_id, created
1313 )
1314 )
1315 else:
1316 created_items = target_vim.edit_connectivity_service(
1317 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1318 )
1319 created = True
1320 self.logger.debug(
1321 "task={} {} update-sdn-net={} created={}".format(
1322 task_id, ro_task["target_id"], sdn_net_id, created
1323 )
1324 )
1325
1326 connected_ports = new_connected_ports
1327 elif sdn_net_id:
1328 wim_status_dict = target_vim.get_connectivity_service_status(
1329 sdn_net_id, conn_info=created_items
1330 )
1331 sdn_status = wim_status_dict["sdn_status"]
1332
1333 if wim_status_dict.get("sdn_info"):
1334 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1335
1336 if wim_status_dict.get("error_msg"):
1337 sdn_info = wim_status_dict.get("error_msg") or ""
1338
1339 if pending_ports:
1340 if sdn_status != "ERROR":
1341 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1342 len(ports) - pending_ports, len(ports)
1343 )
1344
1345 if sdn_status == "ACTIVE":
1346 sdn_status = "BUILD"
1347
1348 ro_vim_item_update = {
1349 "vim_id": sdn_net_id,
1350 "vim_status": sdn_status,
1351 "created": created,
1352 "created_items": created_items,
1353 "connected_ports": connected_ports,
1354 "vim_details": sdn_info,
1355 "vim_message": None,
1356 "last_update": last_update,
1357 }
1358
1359 return sdn_status, ro_vim_item_update
1360 except Exception as e:
1361 self.logger.error(
1362 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1363 exc_info=not isinstance(
1364 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1365 ),
1366 )
1367 ro_vim_item_update = {
1368 "vim_status": "VIM_ERROR",
1369 "created": created,
1370 "vim_message": str(e),
1371 }
1372
1373 return "FAILED", ro_vim_item_update
1374
1375 def delete(self, ro_task, task_index):
1376 task = ro_task["tasks"][task_index]
1377 task_id = task["task_id"]
1378 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1379 ro_vim_item_update_ok = {
1380 "vim_status": "DELETED",
1381 "created": False,
1382 "vim_message": "DELETED",
1383 "vim_id": None,
1384 }
1385
1386 try:
1387 if sdn_vim_id:
1388 target_vim = self.my_vims[ro_task["target_id"]]
1389 target_vim.delete_connectivity_service(
1390 sdn_vim_id, ro_task["vim_info"].get("created_items")
1391 )
1392
1393 except Exception as e:
1394 if (
1395 isinstance(e, sdnconn.SdnConnectorError)
1396 and e.http_code == HTTPStatus.NOT_FOUND.value
1397 ):
1398 ro_vim_item_update_ok["vim_message"] = "already deleted"
1399 else:
1400 self.logger.error(
1401 "ro_task={} vim={} del-sdn-net={}: {}".format(
1402 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1403 ),
1404 exc_info=not isinstance(
1405 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1406 ),
1407 )
1408 ro_vim_item_update = {
1409 "vim_status": "VIM_ERROR",
1410 "vim_message": "Error while deleting: {}".format(e),
1411 }
1412
1413 return "FAILED", ro_vim_item_update
1414
1415 self.logger.debug(
1416 "task={} {} del-sdn-net={} {}".format(
1417 task_id,
1418 ro_task["target_id"],
1419 sdn_vim_id,
1420 ro_vim_item_update_ok.get("vim_message", ""),
1421 )
1422 )
1423
1424 return "DONE", ro_vim_item_update_ok
1425
1426
1427 class VimInteractionMigration(VimInteractionBase):
1428 def exec(self, ro_task, task_index, task_depends):
1429 task = ro_task["tasks"][task_index]
1430 task_id = task["task_id"]
1431 db_task_update = {"retries": 0}
1432 target_vim = self.my_vims[ro_task["target_id"]]
1433 vim_interfaces = []
1434 created = False
1435 created_items = {}
1436 refreshed_vim_info = {}
1437
1438 try:
1439 if task.get("params"):
1440 vim_vm_id = task["params"].get("vim_vm_id")
1441 migrate_host = task["params"].get("migrate_host")
1442 _, migrated_compute_node = target_vim.migrate_instance(
1443 vim_vm_id, migrate_host
1444 )
1445
1446 if migrated_compute_node:
1447 # When VM is migrated, vdu["vim_info"] needs to be updated
1448 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1449 ro_task["target_id"]
1450 )
1451
1452 # Refresh VM to get new vim_info
1453 vm_to_refresh_list = [vim_vm_id]
1454 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1455 refreshed_vim_info = vim_dict[vim_vm_id]
1456
1457 if refreshed_vim_info.get("interfaces"):
1458 for old_iface in vdu_old_vim_info.get("interfaces"):
1459 iface = next(
1460 (
1461 iface
1462 for iface in refreshed_vim_info["interfaces"]
1463 if old_iface["vim_interface_id"]
1464 == iface["vim_interface_id"]
1465 ),
1466 None,
1467 )
1468 vim_interfaces.append(iface)
1469
1470 ro_vim_item_update = {
1471 "vim_id": vim_vm_id,
1472 "vim_status": "ACTIVE",
1473 "created": created,
1474 "created_items": created_items,
1475 "vim_details": None,
1476 "vim_message": None,
1477 }
1478
1479 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1480 "ERROR",
1481 "VIM_ERROR",
1482 ):
1483 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1484
1485 if vim_interfaces:
1486 ro_vim_item_update["interfaces"] = vim_interfaces
1487
1488 self.logger.debug(
1489 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1490 )
1491
1492 return "DONE", ro_vim_item_update, db_task_update
1493
1494 except (vimconn.VimConnException, NsWorkerException) as e:
1495 self.logger.error(
1496 "task={} vim={} VM Migration:"
1497 " {}".format(task_id, ro_task["target_id"], e)
1498 )
1499 ro_vim_item_update = {
1500 "vim_status": "VIM_ERROR",
1501 "created": created,
1502 "vim_message": str(e),
1503 }
1504
1505 return "FAILED", ro_vim_item_update, db_task_update
1506
1507
1508 class VimInteractionResize(VimInteractionBase):
1509 def exec(self, ro_task, task_index, task_depends):
1510 task = ro_task["tasks"][task_index]
1511 task_id = task["task_id"]
1512 db_task_update = {"retries": 0}
1513 created = False
1514 target_flavor_uuid = None
1515 created_items = {}
1516 refreshed_vim_info = {}
1517 target_vim = self.my_vims[ro_task["target_id"]]
1518
1519 try:
1520 if task.get("params"):
1521 vim_vm_id = task["params"].get("vim_vm_id")
1522 flavor_dict = task["params"].get("flavor_dict")
1523 self.logger.info("flavor_dict %s", flavor_dict)
1524
1525 try:
1526 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1527 except Exception as e:
1528 self.logger.info("Cannot find any flavor matching %s.", str(e))
1529 try:
1530 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1531 except Exception as e:
1532 self.logger.error("Error creating flavor at VIM %s.", str(e))
1533
1534 if target_flavor_uuid is not None:
1535 resized_status = target_vim.resize_instance(
1536 vim_vm_id, target_flavor_uuid
1537 )
1538
1539 if resized_status:
1540 # Refresh VM to get new vim_info
1541 vm_to_refresh_list = [vim_vm_id]
1542 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1543 refreshed_vim_info = vim_dict[vim_vm_id]
1544
1545 ro_vim_item_update = {
1546 "vim_id": vim_vm_id,
1547 "vim_status": "DONE",
1548 "created": created,
1549 "created_items": created_items,
1550 "vim_details": None,
1551 "vim_message": None,
1552 }
1553
1554 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1555 "ERROR",
1556 "VIM_ERROR",
1557 ):
1558 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1559
1560 self.logger.debug(
1561 "task={} {} resize done".format(task_id, ro_task["target_id"])
1562 )
1563 return "DONE", ro_vim_item_update, db_task_update
1564 except (vimconn.VimConnException, NsWorkerException) as e:
1565 self.logger.error(
1566 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1567 )
1568 ro_vim_item_update = {
1569 "vim_status": "VIM_ERROR",
1570 "created": created,
1571 "vim_message": str(e),
1572 }
1573
1574 return "FAILED", ro_vim_item_update, db_task_update
1575
1576
1577 class ConfigValidate:
1578 def __init__(self, config: Dict):
1579 self.conf = config
1580
1581 @property
1582 def active(self):
1583 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1584 if (
1585 self.conf["period"]["refresh_active"] >= 60
1586 or self.conf["period"]["refresh_active"] == -1
1587 ):
1588 return self.conf["period"]["refresh_active"]
1589
1590 return 60
1591
1592 @property
1593 def build(self):
1594 return self.conf["period"]["refresh_build"]
1595
1596 @property
1597 def image(self):
1598 return self.conf["period"]["refresh_image"]
1599
1600 @property
1601 def error(self):
1602 return self.conf["period"]["refresh_error"]
1603
1604 @property
1605 def queue_size(self):
1606 return self.conf["period"]["queue_size"]
1607
1608
1609 class NsWorker(threading.Thread):
1610 def __init__(self, worker_index, config, plugins, db):
1611 """
1612 :param worker_index: thread index
1613 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1614 :param plugins: global shared dict with the loaded plugins
1615 :param db: database class instance to use
1616 """
1617 threading.Thread.__init__(self)
1618 self.config = config
1619 self.plugins = plugins
1620 self.plugin_name = "unknown"
1621 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1622 self.worker_index = worker_index
1623 # refresh periods for created items
1624 self.refresh_config = ConfigValidate(config)
1625 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1626 # targetvim: vimplugin class
1627 self.my_vims = {}
1628 # targetvim: vim information from database
1629 self.db_vims = {}
1630 # targetvim list
1631 self.vim_targets = []
1632 self.my_id = config["process_id"] + ":" + str(worker_index)
1633 self.db = db
1634 self.item2class = {
1635 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1636 "shared-volumes": VimInteractionSharedVolume(
1637 self.db, self.my_vims, self.db_vims, self.logger
1638 ),
1639 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1640 "image": VimInteractionImage(
1641 self.db, self.my_vims, self.db_vims, self.logger
1642 ),
1643 "flavor": VimInteractionFlavor(
1644 self.db, self.my_vims, self.db_vims, self.logger
1645 ),
1646 "sdn_net": VimInteractionSdnNet(
1647 self.db, self.my_vims, self.db_vims, self.logger
1648 ),
1649 "update": VimInteractionUpdateVdu(
1650 self.db, self.my_vims, self.db_vims, self.logger
1651 ),
1652 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1653 self.db, self.my_vims, self.db_vims, self.logger
1654 ),
1655 "migrate": VimInteractionMigration(
1656 self.db, self.my_vims, self.db_vims, self.logger
1657 ),
1658 "verticalscale": VimInteractionResize(
1659 self.db, self.my_vims, self.db_vims, self.logger
1660 ),
1661 }
1662 self.time_last_task_processed = None
1663 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1664 self.tasks_to_delete = []
1665 # it is idle when there are not vim_targets associated
1666 self.idle = True
1667 self.task_locked_time = config["global"]["task_locked_time"]
1668
1669 def insert_task(self, task):
1670 try:
1671 self.task_queue.put(task, False)
1672 return None
1673 except queue.Full:
1674 raise NsWorkerException("timeout inserting a task")
1675
1676 def terminate(self):
1677 self.insert_task("exit")
1678
1679 def del_task(self, task):
1680 with self.task_lock:
1681 if task["status"] == "SCHEDULED":
1682 task["status"] = "SUPERSEDED"
1683 return True
1684 else: # task["status"] == "processing"
1685 self.task_lock.release()
1686 return False
1687
1688 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1689 """
1690 Process vim config, creating vim configuration files as ca_cert
1691 :param target_id: vim/sdn/wim + id
1692 :param db_vim: Vim dictionary obtained from database
1693 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1694 """
1695 if not db_vim.get("config"):
1696 return
1697
1698 file_name = ""
1699 work_dir = "/app/osm_ro/certs"
1700
1701 try:
1702 if db_vim["config"].get("ca_cert_content"):
1703 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1704
1705 if not path.isdir(file_name):
1706 makedirs(file_name)
1707
1708 file_name = file_name + "/ca_cert"
1709
1710 with open(file_name, "w") as f:
1711 f.write(db_vim["config"]["ca_cert_content"])
1712 del db_vim["config"]["ca_cert_content"]
1713 db_vim["config"]["ca_cert"] = file_name
1714 except Exception as e:
1715 raise NsWorkerException(
1716 "Error writing to file '{}': {}".format(file_name, e)
1717 )
1718
1719 def _load_plugin(self, name, type="vim"):
1720 # type can be vim or sdn
1721 if "rovim_dummy" not in self.plugins:
1722 self.plugins["rovim_dummy"] = VimDummyConnector
1723
1724 if "rosdn_dummy" not in self.plugins:
1725 self.plugins["rosdn_dummy"] = SdnDummyConnector
1726
1727 if name in self.plugins:
1728 return self.plugins[name]
1729
1730 try:
1731 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1732 self.plugins[name] = ep.load()
1733 except Exception as e:
1734 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1735
1736 if name and name not in self.plugins:
1737 raise NsWorkerException(
1738 "Plugin 'osm_{n}' has not been installed".format(n=name)
1739 )
1740
1741 return self.plugins[name]
1742
1743 def _unload_vim(self, target_id):
1744 """
1745 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1746 :param target_id: Contains type:_id; where type can be 'vim', ...
1747 :return: None.
1748 """
1749 try:
1750 self.db_vims.pop(target_id, None)
1751 self.my_vims.pop(target_id, None)
1752
1753 if target_id in self.vim_targets:
1754 self.vim_targets.remove(target_id)
1755
1756 self.logger.info("Unloaded {}".format(target_id))
1757 except Exception as e:
1758 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1759
1760 def _check_vim(self, target_id):
1761 """
1762 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1763 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1764 :return: None.
1765 """
1766 target, _, _id = target_id.partition(":")
1767 now = time.time()
1768 update_dict = {}
1769 unset_dict = {}
1770 op_text = ""
1771 step = ""
1772 loaded = target_id in self.vim_targets
1773 target_database = (
1774 "vim_accounts"
1775 if target == "vim"
1776 else "wim_accounts"
1777 if target == "wim"
1778 else "sdns"
1779 )
1780
1781 try:
1782 step = "Getting {} from db".format(target_id)
1783 db_vim = self.db.get_one(target_database, {"_id": _id})
1784
1785 for op_index, operation in enumerate(
1786 db_vim["_admin"].get("operations", ())
1787 ):
1788 if operation["operationState"] != "PROCESSING":
1789 continue
1790
1791 locked_at = operation.get("locked_at")
1792
1793 if locked_at is not None and locked_at >= now - self.task_locked_time:
1794 # some other thread is doing this operation
1795 return
1796
1797 # lock
1798 op_text = "_admin.operations.{}.".format(op_index)
1799
1800 if not self.db.set_one(
1801 target_database,
1802 q_filter={
1803 "_id": _id,
1804 op_text + "operationState": "PROCESSING",
1805 op_text + "locked_at": locked_at,
1806 },
1807 update_dict={
1808 op_text + "locked_at": now,
1809 "admin.current_operation": op_index,
1810 },
1811 fail_on_empty=False,
1812 ):
1813 return
1814
1815 unset_dict[op_text + "locked_at"] = None
1816 unset_dict["current_operation"] = None
1817 step = "Loading " + target_id
1818 error_text = self._load_vim(target_id)
1819
1820 if not error_text:
1821 step = "Checking connectivity"
1822
1823 if target == "vim":
1824 self.my_vims[target_id].check_vim_connectivity()
1825 else:
1826 self.my_vims[target_id].check_credentials()
1827
1828 update_dict["_admin.operationalState"] = "ENABLED"
1829 update_dict["_admin.detailed-status"] = ""
1830 unset_dict[op_text + "detailed-status"] = None
1831 update_dict[op_text + "operationState"] = "COMPLETED"
1832
1833 return
1834
1835 except Exception as e:
1836 error_text = "{}: {}".format(step, e)
1837 self.logger.error("{} for {}: {}".format(step, target_id, e))
1838
1839 finally:
1840 if update_dict or unset_dict:
1841 if error_text:
1842 update_dict[op_text + "operationState"] = "FAILED"
1843 update_dict[op_text + "detailed-status"] = error_text
1844 unset_dict.pop(op_text + "detailed-status", None)
1845 update_dict["_admin.operationalState"] = "ERROR"
1846 update_dict["_admin.detailed-status"] = error_text
1847
1848 if op_text:
1849 update_dict[op_text + "statusEnteredTime"] = now
1850
1851 self.db.set_one(
1852 target_database,
1853 q_filter={"_id": _id},
1854 update_dict=update_dict,
1855 unset=unset_dict,
1856 fail_on_empty=False,
1857 )
1858
1859 if not loaded:
1860 self._unload_vim(target_id)
1861
1862 def _reload_vim(self, target_id):
1863 if target_id in self.vim_targets:
1864 self._load_vim(target_id)
1865 else:
1866 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1867 # just remove it to force load again next time it is needed
1868 self.db_vims.pop(target_id, None)
1869
1870 def _load_vim(self, target_id):
1871 """
1872 Load or reload a vim_account, sdn_controller or wim_account.
1873 Read content from database, load the plugin if not loaded.
1874 In case of error loading the plugin, it load a failing VIM_connector
1875 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1876 :param target_id: Contains type:_id; where type can be 'vim', ...
1877 :return: None if ok, descriptive text if error
1878 """
1879 target, _, _id = target_id.partition(":")
1880 target_database = (
1881 "vim_accounts"
1882 if target == "vim"
1883 else "wim_accounts"
1884 if target == "wim"
1885 else "sdns"
1886 )
1887 plugin_name = ""
1888 vim = None
1889 step = "Getting {}={} from db".format(target, _id)
1890
1891 try:
1892 # TODO process for wim, sdnc, ...
1893 vim = self.db.get_one(target_database, {"_id": _id})
1894
1895 # if deep_get(vim, "config", "sdn-controller"):
1896 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1897 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1898
1899 step = "Decrypting password"
1900 schema_version = vim.get("schema_version")
1901 self.db.encrypt_decrypt_fields(
1902 vim,
1903 "decrypt",
1904 fields=("password", "secret"),
1905 schema_version=schema_version,
1906 salt=_id,
1907 )
1908 self._process_vim_config(target_id, vim)
1909
1910 if target == "vim":
1911 plugin_name = "rovim_" + vim["vim_type"]
1912 step = "Loading plugin '{}'".format(plugin_name)
1913 vim_module_conn = self._load_plugin(plugin_name)
1914 step = "Loading {}'".format(target_id)
1915 self.my_vims[target_id] = vim_module_conn(
1916 uuid=vim["_id"],
1917 name=vim["name"],
1918 tenant_id=vim.get("vim_tenant_id"),
1919 tenant_name=vim.get("vim_tenant_name"),
1920 url=vim["vim_url"],
1921 url_admin=None,
1922 user=vim["vim_user"],
1923 passwd=vim["vim_password"],
1924 config=vim.get("config") or {},
1925 persistent_info={},
1926 )
1927 else: # sdn
1928 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1929 step = "Loading plugin '{}'".format(plugin_name)
1930 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1931 step = "Loading {}'".format(target_id)
1932 wim = deepcopy(vim)
1933 wim_config = wim.pop("config", {}) or {}
1934 wim["uuid"] = wim["_id"]
1935 if "url" in wim and "wim_url" not in wim:
1936 wim["wim_url"] = wim["url"]
1937 elif "url" not in wim and "wim_url" in wim:
1938 wim["url"] = wim["wim_url"]
1939
1940 if wim.get("dpid"):
1941 wim_config["dpid"] = wim.pop("dpid")
1942
1943 if wim.get("switch_id"):
1944 wim_config["switch_id"] = wim.pop("switch_id")
1945
1946 # wim, wim_account, config
1947 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1948 self.db_vims[target_id] = vim
1949 self.error_status = None
1950
1951 self.logger.info(
1952 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1953 )
1954 except Exception as e:
1955 self.logger.error(
1956 "Cannot load {} plugin={}: {} {}".format(
1957 target_id, plugin_name, step, e
1958 )
1959 )
1960
1961 self.db_vims[target_id] = vim or {}
1962 self.db_vims[target_id] = FailingConnector(str(e))
1963 error_status = "{} Error: {}".format(step, e)
1964
1965 return error_status
1966 finally:
1967 if target_id not in self.vim_targets:
1968 self.vim_targets.append(target_id)
1969
1970 def _get_db_task(self):
1971 """
1972 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1973 :return: None
1974 """
1975 now = time.time()
1976
1977 if not self.time_last_task_processed:
1978 self.time_last_task_processed = now
1979
1980 try:
1981 while True:
1982 """
1983 # Log RO tasks only when loglevel is DEBUG
1984 if self.logger.getEffectiveLevel() == logging.DEBUG:
1985 self._log_ro_task(
1986 None,
1987 None,
1988 None,
1989 "TASK_WF",
1990 "task_locked_time="
1991 + str(self.task_locked_time)
1992 + " "
1993 + "time_last_task_processed="
1994 + str(self.time_last_task_processed)
1995 + " "
1996 + "now="
1997 + str(now),
1998 )
1999 """
2000 locked = self.db.set_one(
2001 "ro_tasks",
2002 q_filter={
2003 "target_id": self.vim_targets,
2004 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2005 "locked_at.lt": now - self.task_locked_time,
2006 "to_check_at.lt": self.time_last_task_processed,
2007 "to_check_at.gt": -1,
2008 },
2009 update_dict={"locked_by": self.my_id, "locked_at": now},
2010 fail_on_empty=False,
2011 )
2012
2013 if locked:
2014 # read and return
2015 ro_task = self.db.get_one(
2016 "ro_tasks",
2017 q_filter={
2018 "target_id": self.vim_targets,
2019 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2020 "locked_at": now,
2021 },
2022 )
2023 return ro_task
2024
2025 if self.time_last_task_processed == now:
2026 self.time_last_task_processed = None
2027 return None
2028 else:
2029 self.time_last_task_processed = now
2030 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2031
2032 except DbException as e:
2033 self.logger.error("Database exception at _get_db_task: {}".format(e))
2034 except Exception as e:
2035 self.logger.critical(
2036 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
2037 )
2038
2039 return None
2040
2041 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2042 """
2043 Determine if this task need to be done or superseded
2044 :return: None
2045 """
2046 my_task = ro_task["tasks"][task_index]
2047 task_id = my_task["task_id"]
2048 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2049 "created_items", False
2050 )
2051
2052 self.logger.debug("Needed delete: {}".format(needed_delete))
2053 if my_task["status"] == "FAILED":
2054 return None, None # TODO need to be retry??
2055
2056 try:
2057 for index, task in enumerate(ro_task["tasks"]):
2058 if index == task_index or not task:
2059 continue # own task
2060
2061 if (
2062 my_task["target_record"] == task["target_record"]
2063 and task["action"] == "CREATE"
2064 ):
2065 # set to finished
2066 db_update["tasks.{}.status".format(index)] = task[
2067 "status"
2068 ] = "FINISHED"
2069 elif task["action"] == "CREATE" and task["status"] not in (
2070 "FINISHED",
2071 "SUPERSEDED",
2072 ):
2073 needed_delete = False
2074
2075 if needed_delete:
2076 self.logger.debug(
2077 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2078 )
2079 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2080 else:
2081 return "SUPERSEDED", None
2082 except Exception as e:
2083 if not isinstance(e, NsWorkerException):
2084 self.logger.critical(
2085 "Unexpected exception at _delete_task task={}: {}".format(
2086 task_id, e
2087 ),
2088 exc_info=True,
2089 )
2090
2091 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2092
2093 def _create_task(self, ro_task, task_index, task_depends, db_update):
2094 """
2095 Determine if this task need to create something at VIM
2096 :return: None
2097 """
2098 my_task = ro_task["tasks"][task_index]
2099 task_id = my_task["task_id"]
2100 task_status = None
2101
2102 if my_task["status"] == "FAILED":
2103 return None, None # TODO need to be retry??
2104 elif my_task["status"] == "SCHEDULED":
2105 # check if already created by another task
2106 for index, task in enumerate(ro_task["tasks"]):
2107 if index == task_index or not task:
2108 continue # own task
2109
2110 if task["action"] == "CREATE" and task["status"] not in (
2111 "SCHEDULED",
2112 "FINISHED",
2113 "SUPERSEDED",
2114 ):
2115 return task["status"], "COPY_VIM_INFO"
2116
2117 try:
2118 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2119 ro_task, task_index, task_depends
2120 )
2121 # TODO update other CREATE tasks
2122 except Exception as e:
2123 if not isinstance(e, NsWorkerException):
2124 self.logger.error(
2125 "Error executing task={}: {}".format(task_id, e), exc_info=True
2126 )
2127
2128 task_status = "FAILED"
2129 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2130 # TODO update ro_vim_item_update
2131
2132 return task_status, ro_vim_item_update
2133 else:
2134 return None, None
2135
2136 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2137 """
2138 Look for dependency task
2139 :param task_id: Can be one of
2140 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2141 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2142 3. task.task_id: "<action_id>:number"
2143 :param ro_task:
2144 :param target_id:
2145 :return: database ro_task plus index of task
2146 """
2147 if (
2148 task_id.startswith("vim:")
2149 or task_id.startswith("sdn:")
2150 or task_id.startswith("wim:")
2151 ):
2152 target_id, _, task_id = task_id.partition(" ")
2153
2154 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2155 ro_task_dependency = self.db.get_one(
2156 "ro_tasks",
2157 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2158 fail_on_empty=False,
2159 )
2160
2161 if ro_task_dependency:
2162 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2163 if task["target_record_id"] == task_id:
2164 return ro_task_dependency, task_index
2165
2166 else:
2167 if ro_task:
2168 for task_index, task in enumerate(ro_task["tasks"]):
2169 if task and task["task_id"] == task_id:
2170 return ro_task, task_index
2171
2172 ro_task_dependency = self.db.get_one(
2173 "ro_tasks",
2174 q_filter={
2175 "tasks.ANYINDEX.task_id": task_id,
2176 "tasks.ANYINDEX.target_record.ne": None,
2177 },
2178 fail_on_empty=False,
2179 )
2180
2181 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2182 if ro_task_dependency:
2183 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2184 if task["task_id"] == task_id:
2185 return ro_task_dependency, task_index
2186 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2187
2188 def update_vm_refresh(self, ro_task):
2189 """Enables the VM status updates if self.refresh_config.active parameter
2190 is not -1 and then updates the DB accordingly
2191
2192 """
2193 try:
2194 self.logger.debug("Checking if VM status update config")
2195 next_refresh = time.time()
2196 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2197
2198 if next_refresh != -1:
2199 db_ro_task_update = {}
2200 now = time.time()
2201 next_check_at = now + (24 * 60 * 60)
2202 next_check_at = min(next_check_at, next_refresh)
2203 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2204 db_ro_task_update["to_check_at"] = next_check_at
2205
2206 self.logger.debug(
2207 "Finding tasks which to be updated to enable VM status updates"
2208 )
2209 refresh_tasks = self.db.get_list(
2210 "ro_tasks",
2211 q_filter={
2212 "tasks.status": "DONE",
2213 "to_check_at.lt": 0,
2214 },
2215 )
2216 self.logger.debug("Updating tasks to change the to_check_at status")
2217 for task in refresh_tasks:
2218 q_filter = {
2219 "_id": task["_id"],
2220 }
2221 self.db.set_one(
2222 "ro_tasks",
2223 q_filter=q_filter,
2224 update_dict=db_ro_task_update,
2225 fail_on_empty=True,
2226 )
2227
2228 except Exception as e:
2229 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2230
2231 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2232 """Decide the next_refresh according to vim type and refresh config period.
2233 Args:
2234 ro_task (dict): ro_task details
2235 next_refresh (float): next refresh time as epoch format
2236
2237 Returns:
2238 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2239 """
2240 target_vim = ro_task["target_id"]
2241 vim_type = self.db_vims[target_vim]["vim_type"]
2242 if self.refresh_config.active == -1 or vim_type == "openstack":
2243 next_refresh = -1
2244 else:
2245 next_refresh += self.refresh_config.active
2246 return next_refresh
2247
2248 def _process_pending_tasks(self, ro_task):
2249 ro_task_id = ro_task["_id"]
2250 now = time.time()
2251 # one day
2252 next_check_at = now + (24 * 60 * 60)
2253 db_ro_task_update = {}
2254
2255 def _update_refresh(new_status):
2256 # compute next_refresh
2257 nonlocal task
2258 nonlocal next_check_at
2259 nonlocal db_ro_task_update
2260 nonlocal ro_task
2261
2262 next_refresh = time.time()
2263
2264 if task["item"] in ("image", "flavor"):
2265 next_refresh += self.refresh_config.image
2266 elif new_status == "BUILD":
2267 next_refresh += self.refresh_config.build
2268 elif new_status == "DONE":
2269 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2270 else:
2271 next_refresh += self.refresh_config.error
2272
2273 next_check_at = min(next_check_at, next_refresh)
2274 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2275 ro_task["vim_info"]["refresh_at"] = next_refresh
2276
2277 try:
2278 """
2279 # Log RO tasks only when loglevel is DEBUG
2280 if self.logger.getEffectiveLevel() == logging.DEBUG:
2281 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2282 """
2283 # Check if vim status refresh is enabled again
2284 self.update_vm_refresh(ro_task)
2285 # 0: get task_status_create
2286 lock_object = None
2287 task_status_create = None
2288 task_create = next(
2289 (
2290 t
2291 for t in ro_task["tasks"]
2292 if t
2293 and t["action"] == "CREATE"
2294 and t["status"] in ("BUILD", "DONE")
2295 ),
2296 None,
2297 )
2298
2299 if task_create:
2300 task_status_create = task_create["status"]
2301
2302 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2303 for task_action in ("DELETE", "CREATE", "EXEC"):
2304 db_vim_update = None
2305 new_status = None
2306
2307 for task_index, task in enumerate(ro_task["tasks"]):
2308 if not task:
2309 continue # task deleted
2310
2311 task_depends = {}
2312 target_update = None
2313
2314 if (
2315 (
2316 task_action in ("DELETE", "EXEC")
2317 and task["status"] not in ("SCHEDULED", "BUILD")
2318 )
2319 or task["action"] != task_action
2320 or (
2321 task_action == "CREATE"
2322 and task["status"] in ("FINISHED", "SUPERSEDED")
2323 )
2324 ):
2325 continue
2326
2327 task_path = "tasks.{}.status".format(task_index)
2328 try:
2329 db_vim_info_update = None
2330
2331 if task["status"] == "SCHEDULED":
2332 # check if tasks that this depends on have been completed
2333 dependency_not_completed = False
2334
2335 for dependency_task_id in task.get("depends_on") or ():
2336 (
2337 dependency_ro_task,
2338 dependency_task_index,
2339 ) = self._get_dependency(
2340 dependency_task_id, target_id=ro_task["target_id"]
2341 )
2342 dependency_task = dependency_ro_task["tasks"][
2343 dependency_task_index
2344 ]
2345 self.logger.debug(
2346 "dependency_ro_task={} dependency_task_index={}".format(
2347 dependency_ro_task, dependency_task_index
2348 )
2349 )
2350
2351 if dependency_task["status"] == "SCHEDULED":
2352 dependency_not_completed = True
2353 next_check_at = min(
2354 next_check_at, dependency_ro_task["to_check_at"]
2355 )
2356 # must allow dependent task to be processed first
2357 # to do this set time after last_task_processed
2358 next_check_at = max(
2359 self.time_last_task_processed, next_check_at
2360 )
2361 break
2362 elif dependency_task["status"] == "FAILED":
2363 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2364 task["action"],
2365 task["item"],
2366 dependency_task["action"],
2367 dependency_task["item"],
2368 dependency_task_id,
2369 dependency_ro_task["vim_info"].get(
2370 "vim_message"
2371 ),
2372 )
2373 self.logger.error(
2374 "task={} {}".format(task["task_id"], error_text)
2375 )
2376 raise NsWorkerException(error_text)
2377
2378 task_depends[dependency_task_id] = dependency_ro_task[
2379 "vim_info"
2380 ]["vim_id"]
2381 task_depends[
2382 "TASK-{}".format(dependency_task_id)
2383 ] = dependency_ro_task["vim_info"]["vim_id"]
2384
2385 if dependency_not_completed:
2386 self.logger.warning(
2387 "DEPENDENCY NOT COMPLETED {}".format(
2388 dependency_ro_task["vim_info"]["vim_id"]
2389 )
2390 )
2391 # TODO set at vim_info.vim_details that it is waiting
2392 continue
2393
2394 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2395 # the task of renew this locking. It will update database locket_at periodically
2396 if not lock_object:
2397 lock_object = LockRenew.add_lock_object(
2398 "ro_tasks", ro_task, self
2399 )
2400 if task["action"] == "DELETE":
2401 (
2402 new_status,
2403 db_vim_info_update,
2404 ) = self._delete_task(
2405 ro_task, task_index, task_depends, db_ro_task_update
2406 )
2407 new_status = (
2408 "FINISHED" if new_status == "DONE" else new_status
2409 )
2410 # ^with FINISHED instead of DONE it will not be refreshing
2411
2412 if new_status in ("FINISHED", "SUPERSEDED"):
2413 target_update = "DELETE"
2414 elif task["action"] == "EXEC":
2415 (
2416 new_status,
2417 db_vim_info_update,
2418 db_task_update,
2419 ) = self.item2class[task["item"]].exec(
2420 ro_task, task_index, task_depends
2421 )
2422 new_status = (
2423 "FINISHED" if new_status == "DONE" else new_status
2424 )
2425 # ^with FINISHED instead of DONE it will not be refreshing
2426
2427 if db_task_update:
2428 # load into database the modified db_task_update "retries" and "next_retry"
2429 if db_task_update.get("retries"):
2430 db_ro_task_update[
2431 "tasks.{}.retries".format(task_index)
2432 ] = db_task_update["retries"]
2433
2434 next_check_at = time.time() + db_task_update.get(
2435 "next_retry", 60
2436 )
2437 target_update = None
2438 elif task["action"] == "CREATE":
2439 if task["status"] == "SCHEDULED":
2440 if task_status_create:
2441 new_status = task_status_create
2442 target_update = "COPY_VIM_INFO"
2443 else:
2444 new_status, db_vim_info_update = self.item2class[
2445 task["item"]
2446 ].new(ro_task, task_index, task_depends)
2447 _update_refresh(new_status)
2448 else:
2449 refresh_at = ro_task["vim_info"]["refresh_at"]
2450 if refresh_at and refresh_at != -1 and now > refresh_at:
2451 (
2452 new_status,
2453 db_vim_info_update,
2454 ) = self.item2class[
2455 task["item"]
2456 ].refresh(ro_task)
2457 _update_refresh(new_status)
2458 else:
2459 # The refresh is updated to avoid set the value of "refresh_at" to
2460 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2461 # because it can happen that in this case the task is never processed
2462 _update_refresh(task["status"])
2463
2464 except Exception as e:
2465 new_status = "FAILED"
2466 db_vim_info_update = {
2467 "vim_status": "VIM_ERROR",
2468 "vim_message": str(e),
2469 }
2470
2471 if not isinstance(
2472 e, (NsWorkerException, vimconn.VimConnException)
2473 ):
2474 self.logger.error(
2475 "Unexpected exception at _delete_task task={}: {}".format(
2476 task["task_id"], e
2477 ),
2478 exc_info=True,
2479 )
2480
2481 try:
2482 if db_vim_info_update:
2483 db_vim_update = db_vim_info_update.copy()
2484 db_ro_task_update.update(
2485 {
2486 "vim_info." + k: v
2487 for k, v in db_vim_info_update.items()
2488 }
2489 )
2490 ro_task["vim_info"].update(db_vim_info_update)
2491
2492 if new_status:
2493 if task_action == "CREATE":
2494 task_status_create = new_status
2495 db_ro_task_update[task_path] = new_status
2496
2497 if target_update or db_vim_update:
2498 if target_update == "DELETE":
2499 self._update_target(task, None)
2500 elif target_update == "COPY_VIM_INFO":
2501 self._update_target(task, ro_task["vim_info"])
2502 else:
2503 self._update_target(task, db_vim_update)
2504
2505 except Exception as e:
2506 if (
2507 isinstance(e, DbException)
2508 and e.http_code == HTTPStatus.NOT_FOUND
2509 ):
2510 # if the vnfrs or nsrs has been removed from database, this task must be removed
2511 self.logger.debug(
2512 "marking to delete task={}".format(task["task_id"])
2513 )
2514 self.tasks_to_delete.append(task)
2515 else:
2516 self.logger.error(
2517 "Unexpected exception at _update_target task={}: {}".format(
2518 task["task_id"], e
2519 ),
2520 exc_info=True,
2521 )
2522
2523 locked_at = ro_task["locked_at"]
2524
2525 if lock_object:
2526 locked_at = [
2527 lock_object["locked_at"],
2528 lock_object["locked_at"] + self.task_locked_time,
2529 ]
2530 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2531 # contain exactly locked_at + self.task_locked_time
2532 LockRenew.remove_lock_object(lock_object)
2533
2534 q_filter = {
2535 "_id": ro_task["_id"],
2536 "to_check_at": ro_task["to_check_at"],
2537 "locked_at": locked_at,
2538 }
2539 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2540 # outside this task (by ro_nbi) do not update it
2541 db_ro_task_update["locked_by"] = None
2542 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2543 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2544 db_ro_task_update["modified_at"] = now
2545 db_ro_task_update["to_check_at"] = next_check_at
2546
2547 """
2548 # Log RO tasks only when loglevel is DEBUG
2549 if self.logger.getEffectiveLevel() == logging.DEBUG:
2550 db_ro_task_update_log = db_ro_task_update.copy()
2551 db_ro_task_update_log["_id"] = q_filter["_id"]
2552 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2553 """
2554
2555 if not self.db.set_one(
2556 "ro_tasks",
2557 update_dict=db_ro_task_update,
2558 q_filter=q_filter,
2559 fail_on_empty=False,
2560 ):
2561 del db_ro_task_update["to_check_at"]
2562 del q_filter["to_check_at"]
2563 """
2564 # Log RO tasks only when loglevel is DEBUG
2565 if self.logger.getEffectiveLevel() == logging.DEBUG:
2566 self._log_ro_task(
2567 None,
2568 db_ro_task_update_log,
2569 None,
2570 "TASK_WF",
2571 "SET_TASK " + str(q_filter),
2572 )
2573 """
2574 self.db.set_one(
2575 "ro_tasks",
2576 q_filter=q_filter,
2577 update_dict=db_ro_task_update,
2578 fail_on_empty=True,
2579 )
2580 except DbException as e:
2581 self.logger.error(
2582 "ro_task={} Error updating database {}".format(ro_task_id, e)
2583 )
2584 except Exception as e:
2585 self.logger.error(
2586 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2587 )
2588
2589 def _update_target(self, task, ro_vim_item_update):
2590 table, _, temp = task["target_record"].partition(":")
2591 _id, _, path_vim_status = temp.partition(":")
2592 path_item = path_vim_status[: path_vim_status.rfind(".")]
2593 path_item = path_item[: path_item.rfind(".")]
2594 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2595 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2596
2597 if ro_vim_item_update:
2598 update_dict = {
2599 path_vim_status + "." + k: v
2600 for k, v in ro_vim_item_update.items()
2601 if k
2602 in (
2603 "vim_id",
2604 "vim_details",
2605 "vim_message",
2606 "vim_name",
2607 "vim_status",
2608 "interfaces",
2609 "interfaces_backup",
2610 )
2611 }
2612
2613 if path_vim_status.startswith("vdur."):
2614 # for backward compatibility, add vdur.name apart from vdur.vim_name
2615 if ro_vim_item_update.get("vim_name"):
2616 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2617
2618 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2619 if ro_vim_item_update.get("vim_id"):
2620 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2621
2622 # update general status
2623 if ro_vim_item_update.get("vim_status"):
2624 update_dict[path_item + ".status"] = ro_vim_item_update[
2625 "vim_status"
2626 ]
2627
2628 if ro_vim_item_update.get("interfaces"):
2629 path_interfaces = path_item + ".interfaces"
2630
2631 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2632 if iface:
2633 update_dict.update(
2634 {
2635 path_interfaces + ".{}.".format(i) + k: v
2636 for k, v in iface.items()
2637 if k in ("vlan", "compute_node", "pci")
2638 }
2639 )
2640
2641 # put ip_address and mac_address with ip-address and mac-address
2642 if iface.get("ip_address"):
2643 update_dict[
2644 path_interfaces + ".{}.".format(i) + "ip-address"
2645 ] = iface["ip_address"]
2646
2647 if iface.get("mac_address"):
2648 update_dict[
2649 path_interfaces + ".{}.".format(i) + "mac-address"
2650 ] = iface["mac_address"]
2651
2652 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2653 update_dict["ip-address"] = iface.get("ip_address").split(
2654 ";"
2655 )[0]
2656
2657 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2658 update_dict[path_item + ".ip-address"] = iface.get(
2659 "ip_address"
2660 ).split(";")[0]
2661
2662 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2663
2664 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2665 if ro_vim_item_update.get("interfaces"):
2666 search_key = path_vim_status + ".interfaces"
2667 if update_dict.get(search_key):
2668 interfaces_backup_update = {
2669 path_vim_status + ".interfaces_backup": update_dict[search_key]
2670 }
2671
2672 self.db.set_one(
2673 table,
2674 q_filter={"_id": _id},
2675 update_dict=interfaces_backup_update,
2676 )
2677
2678 else:
2679 update_dict = {path_item + ".status": "DELETED"}
2680 self.db.set_one(
2681 table,
2682 q_filter={"_id": _id},
2683 update_dict=update_dict,
2684 unset={path_vim_status: None},
2685 )
2686
2687 def _process_delete_db_tasks(self):
2688 """
2689 Delete task from database because vnfrs or nsrs or both have been deleted
2690 :return: None. Uses and modify self.tasks_to_delete
2691 """
2692 while self.tasks_to_delete:
2693 task = self.tasks_to_delete[0]
2694 vnfrs_deleted = None
2695 nsr_id = task["nsr_id"]
2696
2697 if task["target_record"].startswith("vnfrs:"):
2698 # check if nsrs is present
2699 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2700 vnfrs_deleted = task["target_record"].split(":")[1]
2701
2702 try:
2703 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2704 except Exception as e:
2705 self.logger.error(
2706 "Error deleting task={}: {}".format(task["task_id"], e)
2707 )
2708 self.tasks_to_delete.pop(0)
2709
2710 @staticmethod
2711 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2712 """
2713 Static method because it is called from osm_ng_ro.ns
2714 :param db: instance of database to use
2715 :param nsr_id: affected nsrs id
2716 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2717 :return: None, exception is fails
2718 """
2719 retries = 5
2720 for retry in range(retries):
2721 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2722 now = time.time()
2723 conflict = False
2724
2725 for ro_task in ro_tasks:
2726 db_update = {}
2727 to_delete_ro_task = True
2728
2729 for index, task in enumerate(ro_task["tasks"]):
2730 if not task:
2731 pass
2732 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2733 vnfrs_deleted
2734 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2735 ):
2736 db_update["tasks.{}".format(index)] = None
2737 else:
2738 # used by other nsr, ro_task cannot be deleted
2739 to_delete_ro_task = False
2740
2741 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2742 if to_delete_ro_task:
2743 if not db.del_one(
2744 "ro_tasks",
2745 q_filter={
2746 "_id": ro_task["_id"],
2747 "modified_at": ro_task["modified_at"],
2748 },
2749 fail_on_empty=False,
2750 ):
2751 conflict = True
2752 elif db_update:
2753 db_update["modified_at"] = now
2754 if not db.set_one(
2755 "ro_tasks",
2756 q_filter={
2757 "_id": ro_task["_id"],
2758 "modified_at": ro_task["modified_at"],
2759 },
2760 update_dict=db_update,
2761 fail_on_empty=False,
2762 ):
2763 conflict = True
2764 if not conflict:
2765 return
2766 else:
2767 raise NsWorkerException("Exceeded {} retries".format(retries))
2768
2769 def run(self):
2770 # load database
2771 self.logger.info("Starting")
2772 while True:
2773 # step 1: get commands from queue
2774 try:
2775 if self.vim_targets:
2776 task = self.task_queue.get(block=False)
2777 else:
2778 if not self.idle:
2779 self.logger.debug("enters in idle state")
2780 self.idle = True
2781 task = self.task_queue.get(block=True)
2782 self.idle = False
2783
2784 if task[0] == "terminate":
2785 break
2786 elif task[0] == "load_vim":
2787 self.logger.info("order to load vim {}".format(task[1]))
2788 self._load_vim(task[1])
2789 elif task[0] == "unload_vim":
2790 self.logger.info("order to unload vim {}".format(task[1]))
2791 self._unload_vim(task[1])
2792 elif task[0] == "reload_vim":
2793 self._reload_vim(task[1])
2794 elif task[0] == "check_vim":
2795 self.logger.info("order to check vim {}".format(task[1]))
2796 self._check_vim(task[1])
2797 continue
2798 except Exception as e:
2799 if isinstance(e, queue.Empty):
2800 pass
2801 else:
2802 self.logger.critical(
2803 "Error processing task: {}".format(e), exc_info=True
2804 )
2805
2806 # step 2: process pending_tasks, delete not needed tasks
2807 try:
2808 if self.tasks_to_delete:
2809 self._process_delete_db_tasks()
2810 busy = False
2811 """
2812 # Log RO tasks only when loglevel is DEBUG
2813 if self.logger.getEffectiveLevel() == logging.DEBUG:
2814 _ = self._get_db_all_tasks()
2815 """
2816 ro_task = self._get_db_task()
2817 if ro_task:
2818 self.logger.debug("Task to process: {}".format(ro_task))
2819 time.sleep(1)
2820 self._process_pending_tasks(ro_task)
2821 busy = True
2822 if not busy:
2823 time.sleep(5)
2824 except Exception as e:
2825 self.logger.critical(
2826 "Unexpected exception at run: " + str(e), exc_info=True
2827 )
2828
2829 self.logger.info("Finishing")