80f911d8250359f95ae3f1a7aba432866ccb35c7
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn
43 from osm_ro_plugin import vimconn
44 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 import yaml
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
223
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
234
235 return "FAILED", ro_vim_item_update
236
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
243
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
247
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
263
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 ro_vim_item_update.get("vim_message")
289 if ro_vim_item_update.get("vim_status") != "ACTIVE"
290 else "",
291 )
292 )
293
294 return task_status, ro_vim_item_update
295
296 def delete(self, ro_task, task_index):
297 task = ro_task["tasks"][task_index]
298 task_id = task["task_id"]
299 net_vim_id = ro_task["vim_info"]["vim_id"]
300 ro_vim_item_update_ok = {
301 "vim_status": "DELETED",
302 "created": False,
303 "vim_message": "DELETED",
304 "vim_id": None,
305 }
306
307 try:
308 if net_vim_id or ro_task["vim_info"]["created_items"]:
309 target_vim = self.my_vims[ro_task["target_id"]]
310 target_vim.delete_network(
311 net_vim_id, ro_task["vim_info"]["created_items"]
312 )
313 except vimconn.VimConnNotFoundException:
314 ro_vim_item_update_ok["vim_message"] = "already deleted"
315 except vimconn.VimConnException as e:
316 self.logger.error(
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task["_id"], ro_task["target_id"], net_vim_id, e
319 )
320 )
321 ro_vim_item_update = {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e),
324 }
325
326 return "FAILED", ro_vim_item_update
327
328 self.logger.debug(
329 "task={} {} del-net={} {}".format(
330 task_id,
331 ro_task["target_id"],
332 net_vim_id,
333 ro_vim_item_update_ok.get("vim_message", ""),
334 )
335 )
336
337 return "DONE", ro_vim_item_update_ok
338
339
340 class VimInteractionVdu(VimInteractionBase):
341 max_retries_inject_ssh_key = 20 # 20 times
342 time_retries_inject_ssh_key = 30 # wevery 30 seconds
343
344 def new(self, ro_task, task_index, task_depends):
345 task = ro_task["tasks"][task_index]
346 task_id = task["task_id"]
347 created = False
348 created_items = {}
349 target_vim = self.my_vims[ro_task["target_id"]]
350
351 try:
352 created = True
353 params = task["params"]
354 params_copy = deepcopy(params)
355 net_list = params_copy["net_list"]
356
357 for net in net_list:
358 # change task_id into network_id
359 if "net_id" in net and net["net_id"].startswith("TASK-"):
360 network_id = task_depends[net["net_id"]]
361
362 if not network_id:
363 raise NsWorkerException(
364 "Cannot create VM because depends on a network not created or found "
365 "for {}".format(net["net_id"])
366 )
367
368 net["net_id"] = network_id
369
370 if params_copy["image_id"].startswith("TASK-"):
371 params_copy["image_id"] = task_depends[params_copy["image_id"]]
372
373 if params_copy["flavor_id"].startswith("TASK-"):
374 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
375
376 affinity_group_list = params_copy["affinity_group_list"]
377 for affinity_group in affinity_group_list:
378 # change task_id into affinity_group_id
379 if "affinity_group_id" in affinity_group and affinity_group[
380 "affinity_group_id"
381 ].startswith("TASK-"):
382 affinity_group_id = task_depends[
383 affinity_group["affinity_group_id"]
384 ]
385
386 if not affinity_group_id:
387 raise NsWorkerException(
388 "found for {}".format(affinity_group["affinity_group_id"])
389 )
390
391 affinity_group["affinity_group_id"] = affinity_group_id
392
393 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
394 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
395
396 # add to created items previous_created_volumes (healing)
397 if task.get("previous_created_volumes"):
398 for k, v in task["previous_created_volumes"].items():
399 created_items[k] = v
400
401 ro_vim_item_update = {
402 "vim_id": vim_vm_id,
403 "vim_status": "BUILD",
404 "created": created,
405 "created_items": created_items,
406 "vim_details": None,
407 "vim_message": None,
408 "interfaces_vim_ids": interfaces,
409 "interfaces": [],
410 "interfaces_backup": [],
411 }
412 self.logger.debug(
413 "task={} {} new-vm={} created={}".format(
414 task_id, ro_task["target_id"], vim_vm_id, created
415 )
416 )
417
418 return "BUILD", ro_vim_item_update
419 except (vimconn.VimConnException, NsWorkerException) as e:
420 self.logger.debug(traceback.format_exc())
421 self.logger.error(
422 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
423 )
424 ro_vim_item_update = {
425 "vim_status": "VIM_ERROR",
426 "created": created,
427 "vim_message": str(e),
428 }
429
430 return "FAILED", ro_vim_item_update
431
432 def delete(self, ro_task, task_index):
433 task = ro_task["tasks"][task_index]
434 task_id = task["task_id"]
435 vm_vim_id = ro_task["vim_info"]["vim_id"]
436 ro_vim_item_update_ok = {
437 "vim_status": "DELETED",
438 "created": False,
439 "vim_message": "DELETED",
440 "vim_id": None,
441 }
442
443 try:
444 self.logger.debug(
445 "delete_vminstance: vm_vim_id={} created_items={}".format(
446 vm_vim_id, ro_task["vim_info"]["created_items"]
447 )
448 )
449 if vm_vim_id or ro_task["vim_info"]["created_items"]:
450 target_vim = self.my_vims[ro_task["target_id"]]
451 target_vim.delete_vminstance(
452 vm_vim_id,
453 ro_task["vim_info"]["created_items"],
454 ro_task["vim_info"].get("volumes_to_hold", []),
455 )
456 except vimconn.VimConnNotFoundException:
457 ro_vim_item_update_ok["vim_message"] = "already deleted"
458 except vimconn.VimConnException as e:
459 self.logger.error(
460 "ro_task={} vim={} del-vm={}: {}".format(
461 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
462 )
463 )
464 ro_vim_item_update = {
465 "vim_status": "VIM_ERROR",
466 "vim_message": "Error while deleting: {}".format(e),
467 }
468
469 return "FAILED", ro_vim_item_update
470
471 self.logger.debug(
472 "task={} {} del-vm={} {}".format(
473 task_id,
474 ro_task["target_id"],
475 vm_vim_id,
476 ro_vim_item_update_ok.get("vim_message", ""),
477 )
478 )
479
480 return "DONE", ro_vim_item_update_ok
481
482 def refresh(self, ro_task):
483 """Call VIM to get vm status"""
484 ro_task_id = ro_task["_id"]
485 target_vim = self.my_vims[ro_task["target_id"]]
486 vim_id = ro_task["vim_info"]["vim_id"]
487
488 if not vim_id:
489 return None, None
490
491 vm_to_refresh_list = [vim_id]
492 try:
493 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
494 vim_info = vim_dict[vim_id]
495
496 if vim_info["status"] == "ACTIVE":
497 task_status = "DONE"
498 elif vim_info["status"] == "BUILD":
499 task_status = "BUILD"
500 else:
501 task_status = "FAILED"
502
503 # try to load and parse vim_information
504 try:
505 vim_info_info = yaml.safe_load(vim_info["vim_info"])
506 if vim_info_info.get("name"):
507 vim_info["name"] = vim_info_info["name"]
508 except Exception as vim_info_error:
509 self.logger.exception(
510 f"{vim_info_error} occured while getting the vim_info from yaml"
511 )
512 except vimconn.VimConnException as e:
513 # Mark all tasks at VIM_ERROR status
514 self.logger.error(
515 "ro_task={} vim={} get-vm={}: {}".format(
516 ro_task_id, ro_task["target_id"], vim_id, e
517 )
518 )
519 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
520 task_status = "FAILED"
521
522 ro_vim_item_update = {}
523
524 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
525 vim_interfaces = []
526 if vim_info.get("interfaces"):
527 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
528 iface = next(
529 (
530 iface
531 for iface in vim_info["interfaces"]
532 if vim_iface_id == iface["vim_interface_id"]
533 ),
534 None,
535 )
536 # if iface:
537 # iface.pop("vim_info", None)
538 vim_interfaces.append(iface)
539
540 task_create = next(
541 t
542 for t in ro_task["tasks"]
543 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
544 )
545 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
546 vim_interfaces[task_create["mgmt_vnf_interface"]][
547 "mgmt_vnf_interface"
548 ] = True
549
550 mgmt_vdu_iface = task_create.get(
551 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
552 )
553 if vim_interfaces:
554 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
555
556 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
557 ro_vim_item_update["interfaces"] = vim_interfaces
558
559 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
560 ro_vim_item_update["vim_status"] = vim_info["status"]
561
562 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
563 ro_vim_item_update["vim_name"] = vim_info.get("name")
564
565 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
566 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
567 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
568 elif vim_info["status"] == "DELETED":
569 ro_vim_item_update["vim_id"] = None
570 ro_vim_item_update["vim_message"] = "Deleted externally"
571 else:
572 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
573 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
574
575 if ro_vim_item_update:
576 self.logger.debug(
577 "ro_task={} {} get-vm={}: status={} {}".format(
578 ro_task_id,
579 ro_task["target_id"],
580 vim_id,
581 ro_vim_item_update.get("vim_status"),
582 ro_vim_item_update.get("vim_message")
583 if ro_vim_item_update.get("vim_status") != "ACTIVE"
584 else "",
585 )
586 )
587
588 return task_status, ro_vim_item_update
589
590 def exec(self, ro_task, task_index, task_depends):
591 task = ro_task["tasks"][task_index]
592 task_id = task["task_id"]
593 target_vim = self.my_vims[ro_task["target_id"]]
594 db_task_update = {"retries": 0}
595 retries = task.get("retries", 0)
596
597 try:
598 params = task["params"]
599 params_copy = deepcopy(params)
600 params_copy["ro_key"] = self.db.decrypt(
601 params_copy.pop("private_key"),
602 params_copy.pop("schema_version"),
603 params_copy.pop("salt"),
604 )
605 params_copy["ip_addr"] = params_copy.pop("ip_address")
606 target_vim.inject_user_key(**params_copy)
607 self.logger.debug(
608 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
609 )
610
611 return (
612 "DONE",
613 None,
614 db_task_update,
615 ) # params_copy["key"]
616 except (vimconn.VimConnException, NsWorkerException) as e:
617 retries += 1
618
619 self.logger.debug(traceback.format_exc())
620 if retries < self.max_retries_inject_ssh_key:
621 return (
622 "BUILD",
623 None,
624 {
625 "retries": retries,
626 "next_retry": self.time_retries_inject_ssh_key,
627 },
628 )
629
630 self.logger.error(
631 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
632 )
633 ro_vim_item_update = {"vim_message": str(e)}
634
635 return "FAILED", ro_vim_item_update, db_task_update
636
637
638 class VimInteractionImage(VimInteractionBase):
639 def new(self, ro_task, task_index, task_depends):
640 task = ro_task["tasks"][task_index]
641 task_id = task["task_id"]
642 created = False
643 created_items = {}
644 target_vim = self.my_vims[ro_task["target_id"]]
645
646 try:
647 # FIND
648 if task.get("find_params"):
649 vim_images = target_vim.get_image_list(**task["find_params"])
650
651 if not vim_images:
652 raise NsWorkerExceptionNotFound(
653 "Image not found with this criteria: '{}'".format(
654 task["find_params"]
655 )
656 )
657 elif len(vim_images) > 1:
658 raise NsWorkerException(
659 "More than one image found with this criteria: '{}'".format(
660 task["find_params"]
661 )
662 )
663 else:
664 vim_image_id = vim_images[0]["id"]
665
666 ro_vim_item_update = {
667 "vim_id": vim_image_id,
668 "vim_status": "DONE",
669 "created": created,
670 "created_items": created_items,
671 "vim_details": None,
672 "vim_message": None,
673 }
674 self.logger.debug(
675 "task={} {} new-image={} created={}".format(
676 task_id, ro_task["target_id"], vim_image_id, created
677 )
678 )
679
680 return "DONE", ro_vim_item_update
681 except (NsWorkerException, vimconn.VimConnException) as e:
682 self.logger.error(
683 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
684 )
685 ro_vim_item_update = {
686 "vim_status": "VIM_ERROR",
687 "created": created,
688 "vim_message": str(e),
689 }
690
691 return "FAILED", ro_vim_item_update
692
693
694 class VimInteractionFlavor(VimInteractionBase):
695 def delete(self, ro_task, task_index):
696 task = ro_task["tasks"][task_index]
697 task_id = task["task_id"]
698 flavor_vim_id = ro_task["vim_info"]["vim_id"]
699 ro_vim_item_update_ok = {
700 "vim_status": "DELETED",
701 "created": False,
702 "vim_message": "DELETED",
703 "vim_id": None,
704 }
705
706 try:
707 if flavor_vim_id:
708 target_vim = self.my_vims[ro_task["target_id"]]
709 target_vim.delete_flavor(flavor_vim_id)
710 except vimconn.VimConnNotFoundException:
711 ro_vim_item_update_ok["vim_message"] = "already deleted"
712 except vimconn.VimConnException as e:
713 self.logger.error(
714 "ro_task={} vim={} del-flavor={}: {}".format(
715 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
716 )
717 )
718 ro_vim_item_update = {
719 "vim_status": "VIM_ERROR",
720 "vim_message": "Error while deleting: {}".format(e),
721 }
722
723 return "FAILED", ro_vim_item_update
724
725 self.logger.debug(
726 "task={} {} del-flavor={} {}".format(
727 task_id,
728 ro_task["target_id"],
729 flavor_vim_id,
730 ro_vim_item_update_ok.get("vim_message", ""),
731 )
732 )
733
734 return "DONE", ro_vim_item_update_ok
735
736 def new(self, ro_task, task_index, task_depends):
737 task = ro_task["tasks"][task_index]
738 task_id = task["task_id"]
739 created = False
740 created_items = {}
741 target_vim = self.my_vims[ro_task["target_id"]]
742
743 try:
744 # FIND
745 vim_flavor_id = None
746
747 if task.get("find_params"):
748 try:
749 flavor_data = task["find_params"]["flavor_data"]
750 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
751 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
752 self.logger.warning(
753 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
754 )
755
756 if not vim_flavor_id and task.get("params"):
757 # CREATE
758 flavor_data = task["params"]["flavor_data"]
759 vim_flavor_id = target_vim.new_flavor(flavor_data)
760 created = True
761
762 ro_vim_item_update = {
763 "vim_id": vim_flavor_id,
764 "vim_status": "DONE",
765 "created": created,
766 "created_items": created_items,
767 "vim_details": None,
768 "vim_message": None,
769 }
770 self.logger.debug(
771 "task={} {} new-flavor={} created={}".format(
772 task_id, ro_task["target_id"], vim_flavor_id, created
773 )
774 )
775
776 return "DONE", ro_vim_item_update
777 except (vimconn.VimConnException, NsWorkerException) as e:
778 self.logger.error(
779 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
780 )
781 ro_vim_item_update = {
782 "vim_status": "VIM_ERROR",
783 "created": created,
784 "vim_message": str(e),
785 }
786
787 return "FAILED", ro_vim_item_update
788
789
790 class VimInteractionAffinityGroup(VimInteractionBase):
791 def delete(self, ro_task, task_index):
792 task = ro_task["tasks"][task_index]
793 task_id = task["task_id"]
794 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
795 ro_vim_item_update_ok = {
796 "vim_status": "DELETED",
797 "created": False,
798 "vim_message": "DELETED",
799 "vim_id": None,
800 }
801
802 try:
803 if affinity_group_vim_id:
804 target_vim = self.my_vims[ro_task["target_id"]]
805 target_vim.delete_affinity_group(affinity_group_vim_id)
806 except vimconn.VimConnNotFoundException:
807 ro_vim_item_update_ok["vim_message"] = "already deleted"
808 except vimconn.VimConnException as e:
809 self.logger.error(
810 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
811 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
812 )
813 )
814 ro_vim_item_update = {
815 "vim_status": "VIM_ERROR",
816 "vim_message": "Error while deleting: {}".format(e),
817 }
818
819 return "FAILED", ro_vim_item_update
820
821 self.logger.debug(
822 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
823 task_id,
824 ro_task["target_id"],
825 affinity_group_vim_id,
826 ro_vim_item_update_ok.get("vim_message", ""),
827 )
828 )
829
830 return "DONE", ro_vim_item_update_ok
831
832 def new(self, ro_task, task_index, task_depends):
833 task = ro_task["tasks"][task_index]
834 task_id = task["task_id"]
835 created = False
836 created_items = {}
837 target_vim = self.my_vims[ro_task["target_id"]]
838
839 try:
840 affinity_group_vim_id = None
841 affinity_group_data = None
842
843 if task.get("params"):
844 affinity_group_data = task["params"].get("affinity_group_data")
845
846 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
847 try:
848 param_affinity_group_id = task["params"]["affinity_group_data"].get(
849 "vim-affinity-group-id"
850 )
851 affinity_group_vim_id = target_vim.get_affinity_group(
852 param_affinity_group_id
853 ).get("id")
854 except vimconn.VimConnNotFoundException:
855 self.logger.error(
856 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
857 "could not be found at VIM. Creating a new one.".format(
858 task_id, ro_task["target_id"], param_affinity_group_id
859 )
860 )
861
862 if not affinity_group_vim_id and affinity_group_data:
863 affinity_group_vim_id = target_vim.new_affinity_group(
864 affinity_group_data
865 )
866 created = True
867
868 ro_vim_item_update = {
869 "vim_id": affinity_group_vim_id,
870 "vim_status": "DONE",
871 "created": created,
872 "created_items": created_items,
873 "vim_details": None,
874 "vim_message": None,
875 }
876 self.logger.debug(
877 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
878 task_id, ro_task["target_id"], affinity_group_vim_id, created
879 )
880 )
881
882 return "DONE", ro_vim_item_update
883 except (vimconn.VimConnException, NsWorkerException) as e:
884 self.logger.error(
885 "task={} vim={} new-affinity-or-anti-affinity-group:"
886 " {}".format(task_id, ro_task["target_id"], e)
887 )
888 ro_vim_item_update = {
889 "vim_status": "VIM_ERROR",
890 "created": created,
891 "vim_message": str(e),
892 }
893
894 return "FAILED", ro_vim_item_update
895
896
897 class VimInteractionUpdateVdu(VimInteractionBase):
898 def exec(self, ro_task, task_index, task_depends):
899 task = ro_task["tasks"][task_index]
900 task_id = task["task_id"]
901 db_task_update = {"retries": 0}
902 created = False
903 created_items = {}
904 target_vim = self.my_vims[ro_task["target_id"]]
905
906 try:
907 if task.get("params"):
908 vim_vm_id = task["params"].get("vim_vm_id")
909 action = task["params"].get("action")
910 context = {action: action}
911 target_vim.action_vminstance(vim_vm_id, context)
912 # created = True
913 ro_vim_item_update = {
914 "vim_id": vim_vm_id,
915 "vim_status": "DONE",
916 "created": created,
917 "created_items": created_items,
918 "vim_details": None,
919 "vim_message": None,
920 }
921 self.logger.debug(
922 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
923 )
924 return "DONE", ro_vim_item_update, db_task_update
925 except (vimconn.VimConnException, NsWorkerException) as e:
926 self.logger.error(
927 "task={} vim={} VM Migration:"
928 " {}".format(task_id, ro_task["target_id"], e)
929 )
930 ro_vim_item_update = {
931 "vim_status": "VIM_ERROR",
932 "created": created,
933 "vim_message": str(e),
934 }
935
936 return "FAILED", ro_vim_item_update, db_task_update
937
938
939 class VimInteractionSdnNet(VimInteractionBase):
940 @staticmethod
941 def _match_pci(port_pci, mapping):
942 """
943 Check if port_pci matches with mapping
944 mapping can have brackets to indicate that several chars are accepted. e.g
945 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
946 :param port_pci: text
947 :param mapping: text, can contain brackets to indicate several chars are available
948 :return: True if matches, False otherwise
949 """
950 if not port_pci or not mapping:
951 return False
952 if port_pci == mapping:
953 return True
954
955 mapping_index = 0
956 pci_index = 0
957 while True:
958 bracket_start = mapping.find("[", mapping_index)
959
960 if bracket_start == -1:
961 break
962
963 bracket_end = mapping.find("]", bracket_start)
964 if bracket_end == -1:
965 break
966
967 length = bracket_start - mapping_index
968 if (
969 length
970 and port_pci[pci_index : pci_index + length]
971 != mapping[mapping_index:bracket_start]
972 ):
973 return False
974
975 if (
976 port_pci[pci_index + length]
977 not in mapping[bracket_start + 1 : bracket_end]
978 ):
979 return False
980
981 pci_index += length + 1
982 mapping_index = bracket_end + 1
983
984 if port_pci[pci_index:] != mapping[mapping_index:]:
985 return False
986
987 return True
988
989 def _get_interfaces(self, vlds_to_connect, vim_account_id):
990 """
991 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
992 :param vim_account_id:
993 :return:
994 """
995 interfaces = []
996
997 for vld in vlds_to_connect:
998 table, _, db_id = vld.partition(":")
999 db_id, _, vld = db_id.partition(":")
1000 _, _, vld_id = vld.partition(".")
1001
1002 if table == "vnfrs":
1003 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1004 iface_key = "vnf-vld-id"
1005 else: # table == "nsrs"
1006 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1007 iface_key = "ns-vld-id"
1008
1009 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1010
1011 for db_vnfr in db_vnfrs:
1012 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1013 for iface_index, interface in enumerate(vdur["interfaces"]):
1014 if interface.get(iface_key) == vld_id and interface.get(
1015 "type"
1016 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1017 # only SR-IOV o PT
1018 interface_ = interface.copy()
1019 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1020 db_vnfr["_id"], vdu_index, iface_index
1021 )
1022
1023 if vdur.get("status") == "ERROR":
1024 interface_["status"] = "ERROR"
1025
1026 interfaces.append(interface_)
1027
1028 return interfaces
1029
1030 def refresh(self, ro_task):
1031 # look for task create
1032 task_create_index, _ = next(
1033 i_t
1034 for i_t in enumerate(ro_task["tasks"])
1035 if i_t[1]
1036 and i_t[1]["action"] == "CREATE"
1037 and i_t[1]["status"] != "FINISHED"
1038 )
1039
1040 return self.new(ro_task, task_create_index, None)
1041
1042 def new(self, ro_task, task_index, task_depends):
1043
1044 task = ro_task["tasks"][task_index]
1045 task_id = task["task_id"]
1046 target_vim = self.my_vims[ro_task["target_id"]]
1047
1048 sdn_net_id = ro_task["vim_info"]["vim_id"]
1049
1050 created_items = ro_task["vim_info"].get("created_items")
1051 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1052 new_connected_ports = []
1053 last_update = ro_task["vim_info"].get("last_update", 0)
1054 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1055 error_list = []
1056 created = ro_task["vim_info"].get("created", False)
1057
1058 try:
1059 # CREATE
1060 params = task["params"]
1061 vlds_to_connect = params.get("vlds", [])
1062 associated_vim = params.get("target_vim")
1063 # external additional ports
1064 additional_ports = params.get("sdn-ports") or ()
1065 _, _, vim_account_id = (
1066 (None, None, None)
1067 if associated_vim is None
1068 else associated_vim.partition(":")
1069 )
1070
1071 if associated_vim:
1072 # get associated VIM
1073 if associated_vim not in self.db_vims:
1074 self.db_vims[associated_vim] = self.db.get_one(
1075 "vim_accounts", {"_id": vim_account_id}
1076 )
1077
1078 db_vim = self.db_vims[associated_vim]
1079
1080 # look for ports to connect
1081 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1082 # print(ports)
1083
1084 sdn_ports = []
1085 pending_ports = error_ports = 0
1086 vlan_used = None
1087 sdn_need_update = False
1088
1089 for port in ports:
1090 vlan_used = port.get("vlan") or vlan_used
1091
1092 # TODO. Do not connect if already done
1093 if not port.get("compute_node") or not port.get("pci"):
1094 if port.get("status") == "ERROR":
1095 error_ports += 1
1096 else:
1097 pending_ports += 1
1098 continue
1099
1100 pmap = None
1101 compute_node_mappings = next(
1102 (
1103 c
1104 for c in db_vim["config"].get("sdn-port-mapping", ())
1105 if c and c["compute_node"] == port["compute_node"]
1106 ),
1107 None,
1108 )
1109
1110 if compute_node_mappings:
1111 # process port_mapping pci of type 0000:af:1[01].[1357]
1112 pmap = next(
1113 (
1114 p
1115 for p in compute_node_mappings["ports"]
1116 if self._match_pci(port["pci"], p.get("pci"))
1117 ),
1118 None,
1119 )
1120
1121 if not pmap:
1122 if not db_vim["config"].get("mapping_not_needed"):
1123 error_list.append(
1124 "Port mapping not found for compute_node={} pci={}".format(
1125 port["compute_node"], port["pci"]
1126 )
1127 )
1128 continue
1129
1130 pmap = {}
1131
1132 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1133 new_port = {
1134 "service_endpoint_id": pmap.get("service_endpoint_id")
1135 or service_endpoint_id,
1136 "service_endpoint_encapsulation_type": "dot1q"
1137 if port["type"] == "SR-IOV"
1138 else None,
1139 "service_endpoint_encapsulation_info": {
1140 "vlan": port.get("vlan"),
1141 "mac": port.get("mac-address"),
1142 "device_id": pmap.get("device_id") or port["compute_node"],
1143 "device_interface_id": pmap.get("device_interface_id")
1144 or port["pci"],
1145 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1146 "switch_port": pmap.get("switch_port"),
1147 "service_mapping_info": pmap.get("service_mapping_info"),
1148 },
1149 }
1150
1151 # TODO
1152 # if port["modified_at"] > last_update:
1153 # sdn_need_update = True
1154 new_connected_ports.append(port["id"]) # TODO
1155 sdn_ports.append(new_port)
1156
1157 if error_ports:
1158 error_list.append(
1159 "{} interfaces have not been created as VDU is on ERROR status".format(
1160 error_ports
1161 )
1162 )
1163
1164 # connect external ports
1165 for index, additional_port in enumerate(additional_ports):
1166 additional_port_id = additional_port.get(
1167 "service_endpoint_id"
1168 ) or "external-{}".format(index)
1169 sdn_ports.append(
1170 {
1171 "service_endpoint_id": additional_port_id,
1172 "service_endpoint_encapsulation_type": additional_port.get(
1173 "service_endpoint_encapsulation_type", "dot1q"
1174 ),
1175 "service_endpoint_encapsulation_info": {
1176 "vlan": additional_port.get("vlan") or vlan_used,
1177 "mac": additional_port.get("mac_address"),
1178 "device_id": additional_port.get("device_id"),
1179 "device_interface_id": additional_port.get(
1180 "device_interface_id"
1181 ),
1182 "switch_dpid": additional_port.get("switch_dpid")
1183 or additional_port.get("switch_id"),
1184 "switch_port": additional_port.get("switch_port"),
1185 "service_mapping_info": additional_port.get(
1186 "service_mapping_info"
1187 ),
1188 },
1189 }
1190 )
1191 new_connected_ports.append(additional_port_id)
1192 sdn_info = ""
1193
1194 # if there are more ports to connect or they have been modified, call create/update
1195 if error_list:
1196 sdn_status = "ERROR"
1197 sdn_info = "; ".join(error_list)
1198 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1199 last_update = time.time()
1200
1201 if not sdn_net_id:
1202 if len(sdn_ports) < 2:
1203 sdn_status = "ACTIVE"
1204
1205 if not pending_ports:
1206 self.logger.debug(
1207 "task={} {} new-sdn-net done, less than 2 ports".format(
1208 task_id, ro_task["target_id"]
1209 )
1210 )
1211 else:
1212 net_type = params.get("type") or "ELAN"
1213 (
1214 sdn_net_id,
1215 created_items,
1216 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1217 created = True
1218 self.logger.debug(
1219 "task={} {} new-sdn-net={} created={}".format(
1220 task_id, ro_task["target_id"], sdn_net_id, created
1221 )
1222 )
1223 else:
1224 created_items = target_vim.edit_connectivity_service(
1225 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1226 )
1227 created = True
1228 self.logger.debug(
1229 "task={} {} update-sdn-net={} created={}".format(
1230 task_id, ro_task["target_id"], sdn_net_id, created
1231 )
1232 )
1233
1234 connected_ports = new_connected_ports
1235 elif sdn_net_id:
1236 wim_status_dict = target_vim.get_connectivity_service_status(
1237 sdn_net_id, conn_info=created_items
1238 )
1239 sdn_status = wim_status_dict["sdn_status"]
1240
1241 if wim_status_dict.get("sdn_info"):
1242 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1243
1244 if wim_status_dict.get("error_msg"):
1245 sdn_info = wim_status_dict.get("error_msg") or ""
1246
1247 if pending_ports:
1248 if sdn_status != "ERROR":
1249 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1250 len(ports) - pending_ports, len(ports)
1251 )
1252
1253 if sdn_status == "ACTIVE":
1254 sdn_status = "BUILD"
1255
1256 ro_vim_item_update = {
1257 "vim_id": sdn_net_id,
1258 "vim_status": sdn_status,
1259 "created": created,
1260 "created_items": created_items,
1261 "connected_ports": connected_ports,
1262 "vim_details": sdn_info,
1263 "vim_message": None,
1264 "last_update": last_update,
1265 }
1266
1267 return sdn_status, ro_vim_item_update
1268 except Exception as e:
1269 self.logger.error(
1270 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1271 exc_info=not isinstance(
1272 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1273 ),
1274 )
1275 ro_vim_item_update = {
1276 "vim_status": "VIM_ERROR",
1277 "created": created,
1278 "vim_message": str(e),
1279 }
1280
1281 return "FAILED", ro_vim_item_update
1282
1283 def delete(self, ro_task, task_index):
1284 task = ro_task["tasks"][task_index]
1285 task_id = task["task_id"]
1286 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1287 ro_vim_item_update_ok = {
1288 "vim_status": "DELETED",
1289 "created": False,
1290 "vim_message": "DELETED",
1291 "vim_id": None,
1292 }
1293
1294 try:
1295 if sdn_vim_id:
1296 target_vim = self.my_vims[ro_task["target_id"]]
1297 target_vim.delete_connectivity_service(
1298 sdn_vim_id, ro_task["vim_info"].get("created_items")
1299 )
1300
1301 except Exception as e:
1302 if (
1303 isinstance(e, sdnconn.SdnConnectorError)
1304 and e.http_code == HTTPStatus.NOT_FOUND.value
1305 ):
1306 ro_vim_item_update_ok["vim_message"] = "already deleted"
1307 else:
1308 self.logger.error(
1309 "ro_task={} vim={} del-sdn-net={}: {}".format(
1310 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1311 ),
1312 exc_info=not isinstance(
1313 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1314 ),
1315 )
1316 ro_vim_item_update = {
1317 "vim_status": "VIM_ERROR",
1318 "vim_message": "Error while deleting: {}".format(e),
1319 }
1320
1321 return "FAILED", ro_vim_item_update
1322
1323 self.logger.debug(
1324 "task={} {} del-sdn-net={} {}".format(
1325 task_id,
1326 ro_task["target_id"],
1327 sdn_vim_id,
1328 ro_vim_item_update_ok.get("vim_message", ""),
1329 )
1330 )
1331
1332 return "DONE", ro_vim_item_update_ok
1333
1334
1335 class VimInteractionMigration(VimInteractionBase):
1336 def exec(self, ro_task, task_index, task_depends):
1337 task = ro_task["tasks"][task_index]
1338 task_id = task["task_id"]
1339 db_task_update = {"retries": 0}
1340 target_vim = self.my_vims[ro_task["target_id"]]
1341 vim_interfaces = []
1342 created = False
1343 created_items = {}
1344 refreshed_vim_info = {}
1345
1346 try:
1347 if task.get("params"):
1348 vim_vm_id = task["params"].get("vim_vm_id")
1349 migrate_host = task["params"].get("migrate_host")
1350 _, migrated_compute_node = target_vim.migrate_instance(
1351 vim_vm_id, migrate_host
1352 )
1353
1354 if migrated_compute_node:
1355 # When VM is migrated, vdu["vim_info"] needs to be updated
1356 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1357 ro_task["target_id"]
1358 )
1359
1360 # Refresh VM to get new vim_info
1361 vm_to_refresh_list = [vim_vm_id]
1362 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1363 refreshed_vim_info = vim_dict[vim_vm_id]
1364
1365 if refreshed_vim_info.get("interfaces"):
1366 for old_iface in vdu_old_vim_info.get("interfaces"):
1367 iface = next(
1368 (
1369 iface
1370 for iface in refreshed_vim_info["interfaces"]
1371 if old_iface["vim_interface_id"]
1372 == iface["vim_interface_id"]
1373 ),
1374 None,
1375 )
1376 vim_interfaces.append(iface)
1377
1378 ro_vim_item_update = {
1379 "vim_id": vim_vm_id,
1380 "vim_status": "ACTIVE",
1381 "created": created,
1382 "created_items": created_items,
1383 "vim_details": None,
1384 "vim_message": None,
1385 }
1386
1387 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1388 "ERROR",
1389 "VIM_ERROR",
1390 ):
1391 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1392
1393 if vim_interfaces:
1394 ro_vim_item_update["interfaces"] = vim_interfaces
1395
1396 self.logger.debug(
1397 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1398 )
1399
1400 return "DONE", ro_vim_item_update, db_task_update
1401
1402 except (vimconn.VimConnException, NsWorkerException) as e:
1403 self.logger.error(
1404 "task={} vim={} VM Migration:"
1405 " {}".format(task_id, ro_task["target_id"], e)
1406 )
1407 ro_vim_item_update = {
1408 "vim_status": "VIM_ERROR",
1409 "created": created,
1410 "vim_message": str(e),
1411 }
1412
1413 return "FAILED", ro_vim_item_update, db_task_update
1414
1415
1416 class VimInteractionResize(VimInteractionBase):
1417 def exec(self, ro_task, task_index, task_depends):
1418 task = ro_task["tasks"][task_index]
1419 task_id = task["task_id"]
1420 db_task_update = {"retries": 0}
1421 created = False
1422 target_flavor_uuid = None
1423 created_items = {}
1424 refreshed_vim_info = {}
1425 target_vim = self.my_vims[ro_task["target_id"]]
1426
1427 try:
1428 if task.get("params"):
1429 vim_vm_id = task["params"].get("vim_vm_id")
1430 flavor_dict = task["params"].get("flavor_dict")
1431 self.logger.info("flavor_dict %s", flavor_dict)
1432
1433 try:
1434 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1435 except Exception as e:
1436 self.logger.info("Cannot find any flavor matching %s.", str(e))
1437 try:
1438 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1439 except Exception as e:
1440 self.logger.error("Error creating flavor at VIM %s.", str(e))
1441
1442 if target_flavor_uuid is not None:
1443 resized_status = target_vim.resize_instance(
1444 vim_vm_id, target_flavor_uuid
1445 )
1446
1447 if resized_status:
1448 # Refresh VM to get new vim_info
1449 vm_to_refresh_list = [vim_vm_id]
1450 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1451 refreshed_vim_info = vim_dict[vim_vm_id]
1452
1453 ro_vim_item_update = {
1454 "vim_id": vim_vm_id,
1455 "vim_status": "DONE",
1456 "created": created,
1457 "created_items": created_items,
1458 "vim_details": None,
1459 "vim_message": None,
1460 }
1461
1462 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1463 "ERROR",
1464 "VIM_ERROR",
1465 ):
1466 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1467
1468 self.logger.debug(
1469 "task={} {} resize done".format(task_id, ro_task["target_id"])
1470 )
1471 return "DONE", ro_vim_item_update, db_task_update
1472 except (vimconn.VimConnException, NsWorkerException) as e:
1473 self.logger.error(
1474 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1475 )
1476 ro_vim_item_update = {
1477 "vim_status": "VIM_ERROR",
1478 "created": created,
1479 "vim_message": str(e),
1480 }
1481
1482 return "FAILED", ro_vim_item_update, db_task_update
1483
1484
1485 class ConfigValidate:
1486 def __init__(self, config: Dict):
1487 self.conf = config
1488
1489 @property
1490 def active(self):
1491 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1492 if (
1493 self.conf["period"]["refresh_active"] >= 60
1494 or self.conf["period"]["refresh_active"] == -1
1495 ):
1496 return self.conf["period"]["refresh_active"]
1497
1498 return 60
1499
1500 @property
1501 def build(self):
1502 return self.conf["period"]["refresh_build"]
1503
1504 @property
1505 def image(self):
1506 return self.conf["period"]["refresh_image"]
1507
1508 @property
1509 def error(self):
1510 return self.conf["period"]["refresh_error"]
1511
1512 @property
1513 def queue_size(self):
1514 return self.conf["period"]["queue_size"]
1515
1516
1517 class NsWorker(threading.Thread):
1518 def __init__(self, worker_index, config, plugins, db):
1519 """
1520 :param worker_index: thread index
1521 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1522 :param plugins: global shared dict with the loaded plugins
1523 :param db: database class instance to use
1524 """
1525 threading.Thread.__init__(self)
1526 self.config = config
1527 self.plugins = plugins
1528 self.plugin_name = "unknown"
1529 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1530 self.worker_index = worker_index
1531 # refresh periods for created items
1532 self.refresh_config = ConfigValidate(config)
1533 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1534 # targetvim: vimplugin class
1535 self.my_vims = {}
1536 # targetvim: vim information from database
1537 self.db_vims = {}
1538 # targetvim list
1539 self.vim_targets = []
1540 self.my_id = config["process_id"] + ":" + str(worker_index)
1541 self.db = db
1542 self.item2class = {
1543 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1544 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1545 "image": VimInteractionImage(
1546 self.db, self.my_vims, self.db_vims, self.logger
1547 ),
1548 "flavor": VimInteractionFlavor(
1549 self.db, self.my_vims, self.db_vims, self.logger
1550 ),
1551 "sdn_net": VimInteractionSdnNet(
1552 self.db, self.my_vims, self.db_vims, self.logger
1553 ),
1554 "update": VimInteractionUpdateVdu(
1555 self.db, self.my_vims, self.db_vims, self.logger
1556 ),
1557 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1558 self.db, self.my_vims, self.db_vims, self.logger
1559 ),
1560 "migrate": VimInteractionMigration(
1561 self.db, self.my_vims, self.db_vims, self.logger
1562 ),
1563 "verticalscale": VimInteractionResize(
1564 self.db, self.my_vims, self.db_vims, self.logger
1565 ),
1566 }
1567 self.time_last_task_processed = None
1568 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1569 self.tasks_to_delete = []
1570 # it is idle when there are not vim_targets associated
1571 self.idle = True
1572 self.task_locked_time = config["global"]["task_locked_time"]
1573
1574 def insert_task(self, task):
1575 try:
1576 self.task_queue.put(task, False)
1577 return None
1578 except queue.Full:
1579 raise NsWorkerException("timeout inserting a task")
1580
1581 def terminate(self):
1582 self.insert_task("exit")
1583
1584 def del_task(self, task):
1585 with self.task_lock:
1586 if task["status"] == "SCHEDULED":
1587 task["status"] = "SUPERSEDED"
1588 return True
1589 else: # task["status"] == "processing"
1590 self.task_lock.release()
1591 return False
1592
1593 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1594 """
1595 Process vim config, creating vim configuration files as ca_cert
1596 :param target_id: vim/sdn/wim + id
1597 :param db_vim: Vim dictionary obtained from database
1598 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1599 """
1600 if not db_vim.get("config"):
1601 return
1602
1603 file_name = ""
1604 work_dir = "/app/osm_ro/certs"
1605
1606 try:
1607 if db_vim["config"].get("ca_cert_content"):
1608 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1609
1610 if not path.isdir(file_name):
1611 makedirs(file_name)
1612
1613 file_name = file_name + "/ca_cert"
1614
1615 with open(file_name, "w") as f:
1616 f.write(db_vim["config"]["ca_cert_content"])
1617 del db_vim["config"]["ca_cert_content"]
1618 db_vim["config"]["ca_cert"] = file_name
1619 except Exception as e:
1620 raise NsWorkerException(
1621 "Error writing to file '{}': {}".format(file_name, e)
1622 )
1623
1624 def _load_plugin(self, name, type="vim"):
1625 # type can be vim or sdn
1626 if "rovim_dummy" not in self.plugins:
1627 self.plugins["rovim_dummy"] = VimDummyConnector
1628
1629 if "rosdn_dummy" not in self.plugins:
1630 self.plugins["rosdn_dummy"] = SdnDummyConnector
1631
1632 if name in self.plugins:
1633 return self.plugins[name]
1634
1635 try:
1636 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1637 self.plugins[name] = ep.load()
1638 except Exception as e:
1639 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1640
1641 if name and name not in self.plugins:
1642 raise NsWorkerException(
1643 "Plugin 'osm_{n}' has not been installed".format(n=name)
1644 )
1645
1646 return self.plugins[name]
1647
1648 def _unload_vim(self, target_id):
1649 """
1650 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1651 :param target_id: Contains type:_id; where type can be 'vim', ...
1652 :return: None.
1653 """
1654 try:
1655 self.db_vims.pop(target_id, None)
1656 self.my_vims.pop(target_id, None)
1657
1658 if target_id in self.vim_targets:
1659 self.vim_targets.remove(target_id)
1660
1661 self.logger.info("Unloaded {}".format(target_id))
1662 except Exception as e:
1663 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1664
1665 def _check_vim(self, target_id):
1666 """
1667 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1668 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1669 :return: None.
1670 """
1671 target, _, _id = target_id.partition(":")
1672 now = time.time()
1673 update_dict = {}
1674 unset_dict = {}
1675 op_text = ""
1676 step = ""
1677 loaded = target_id in self.vim_targets
1678 target_database = (
1679 "vim_accounts"
1680 if target == "vim"
1681 else "wim_accounts"
1682 if target == "wim"
1683 else "sdns"
1684 )
1685
1686 try:
1687 step = "Getting {} from db".format(target_id)
1688 db_vim = self.db.get_one(target_database, {"_id": _id})
1689
1690 for op_index, operation in enumerate(
1691 db_vim["_admin"].get("operations", ())
1692 ):
1693 if operation["operationState"] != "PROCESSING":
1694 continue
1695
1696 locked_at = operation.get("locked_at")
1697
1698 if locked_at is not None and locked_at >= now - self.task_locked_time:
1699 # some other thread is doing this operation
1700 return
1701
1702 # lock
1703 op_text = "_admin.operations.{}.".format(op_index)
1704
1705 if not self.db.set_one(
1706 target_database,
1707 q_filter={
1708 "_id": _id,
1709 op_text + "operationState": "PROCESSING",
1710 op_text + "locked_at": locked_at,
1711 },
1712 update_dict={
1713 op_text + "locked_at": now,
1714 "admin.current_operation": op_index,
1715 },
1716 fail_on_empty=False,
1717 ):
1718 return
1719
1720 unset_dict[op_text + "locked_at"] = None
1721 unset_dict["current_operation"] = None
1722 step = "Loading " + target_id
1723 error_text = self._load_vim(target_id)
1724
1725 if not error_text:
1726 step = "Checking connectivity"
1727
1728 if target == "vim":
1729 self.my_vims[target_id].check_vim_connectivity()
1730 else:
1731 self.my_vims[target_id].check_credentials()
1732
1733 update_dict["_admin.operationalState"] = "ENABLED"
1734 update_dict["_admin.detailed-status"] = ""
1735 unset_dict[op_text + "detailed-status"] = None
1736 update_dict[op_text + "operationState"] = "COMPLETED"
1737
1738 return
1739
1740 except Exception as e:
1741 error_text = "{}: {}".format(step, e)
1742 self.logger.error("{} for {}: {}".format(step, target_id, e))
1743
1744 finally:
1745 if update_dict or unset_dict:
1746 if error_text:
1747 update_dict[op_text + "operationState"] = "FAILED"
1748 update_dict[op_text + "detailed-status"] = error_text
1749 unset_dict.pop(op_text + "detailed-status", None)
1750 update_dict["_admin.operationalState"] = "ERROR"
1751 update_dict["_admin.detailed-status"] = error_text
1752
1753 if op_text:
1754 update_dict[op_text + "statusEnteredTime"] = now
1755
1756 self.db.set_one(
1757 target_database,
1758 q_filter={"_id": _id},
1759 update_dict=update_dict,
1760 unset=unset_dict,
1761 fail_on_empty=False,
1762 )
1763
1764 if not loaded:
1765 self._unload_vim(target_id)
1766
1767 def _reload_vim(self, target_id):
1768 if target_id in self.vim_targets:
1769 self._load_vim(target_id)
1770 else:
1771 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1772 # just remove it to force load again next time it is needed
1773 self.db_vims.pop(target_id, None)
1774
1775 def _load_vim(self, target_id):
1776 """
1777 Load or reload a vim_account, sdn_controller or wim_account.
1778 Read content from database, load the plugin if not loaded.
1779 In case of error loading the plugin, it load a failing VIM_connector
1780 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1781 :param target_id: Contains type:_id; where type can be 'vim', ...
1782 :return: None if ok, descriptive text if error
1783 """
1784 target, _, _id = target_id.partition(":")
1785 target_database = (
1786 "vim_accounts"
1787 if target == "vim"
1788 else "wim_accounts"
1789 if target == "wim"
1790 else "sdns"
1791 )
1792 plugin_name = ""
1793 vim = None
1794 step = "Getting {}={} from db".format(target, _id)
1795
1796 try:
1797 # TODO process for wim, sdnc, ...
1798 vim = self.db.get_one(target_database, {"_id": _id})
1799
1800 # if deep_get(vim, "config", "sdn-controller"):
1801 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1802 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1803
1804 step = "Decrypting password"
1805 schema_version = vim.get("schema_version")
1806 self.db.encrypt_decrypt_fields(
1807 vim,
1808 "decrypt",
1809 fields=("password", "secret"),
1810 schema_version=schema_version,
1811 salt=_id,
1812 )
1813 self._process_vim_config(target_id, vim)
1814
1815 if target == "vim":
1816 plugin_name = "rovim_" + vim["vim_type"]
1817 step = "Loading plugin '{}'".format(plugin_name)
1818 vim_module_conn = self._load_plugin(plugin_name)
1819 step = "Loading {}'".format(target_id)
1820 self.my_vims[target_id] = vim_module_conn(
1821 uuid=vim["_id"],
1822 name=vim["name"],
1823 tenant_id=vim.get("vim_tenant_id"),
1824 tenant_name=vim.get("vim_tenant_name"),
1825 url=vim["vim_url"],
1826 url_admin=None,
1827 user=vim["vim_user"],
1828 passwd=vim["vim_password"],
1829 config=vim.get("config") or {},
1830 persistent_info={},
1831 )
1832 else: # sdn
1833 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1834 step = "Loading plugin '{}'".format(plugin_name)
1835 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1836 step = "Loading {}'".format(target_id)
1837 wim = deepcopy(vim)
1838 wim_config = wim.pop("config", {}) or {}
1839 wim["uuid"] = wim["_id"]
1840 if "url" in wim and "wim_url" not in wim:
1841 wim["wim_url"] = wim["url"]
1842 elif "url" not in wim and "wim_url" in wim:
1843 wim["url"] = wim["wim_url"]
1844
1845 if wim.get("dpid"):
1846 wim_config["dpid"] = wim.pop("dpid")
1847
1848 if wim.get("switch_id"):
1849 wim_config["switch_id"] = wim.pop("switch_id")
1850
1851 # wim, wim_account, config
1852 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1853 self.db_vims[target_id] = vim
1854 self.error_status = None
1855
1856 self.logger.info(
1857 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1858 )
1859 except Exception as e:
1860 self.logger.error(
1861 "Cannot load {} plugin={}: {} {}".format(
1862 target_id, plugin_name, step, e
1863 )
1864 )
1865
1866 self.db_vims[target_id] = vim or {}
1867 self.db_vims[target_id] = FailingConnector(str(e))
1868 error_status = "{} Error: {}".format(step, e)
1869
1870 return error_status
1871 finally:
1872 if target_id not in self.vim_targets:
1873 self.vim_targets.append(target_id)
1874
1875 def _get_db_task(self):
1876 """
1877 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1878 :return: None
1879 """
1880 now = time.time()
1881
1882 if not self.time_last_task_processed:
1883 self.time_last_task_processed = now
1884
1885 try:
1886 while True:
1887 """
1888 # Log RO tasks only when loglevel is DEBUG
1889 if self.logger.getEffectiveLevel() == logging.DEBUG:
1890 self._log_ro_task(
1891 None,
1892 None,
1893 None,
1894 "TASK_WF",
1895 "task_locked_time="
1896 + str(self.task_locked_time)
1897 + " "
1898 + "time_last_task_processed="
1899 + str(self.time_last_task_processed)
1900 + " "
1901 + "now="
1902 + str(now),
1903 )
1904 """
1905 locked = self.db.set_one(
1906 "ro_tasks",
1907 q_filter={
1908 "target_id": self.vim_targets,
1909 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1910 "locked_at.lt": now - self.task_locked_time,
1911 "to_check_at.lt": self.time_last_task_processed,
1912 "to_check_at.gt": -1,
1913 },
1914 update_dict={"locked_by": self.my_id, "locked_at": now},
1915 fail_on_empty=False,
1916 )
1917
1918 if locked:
1919 # read and return
1920 ro_task = self.db.get_one(
1921 "ro_tasks",
1922 q_filter={
1923 "target_id": self.vim_targets,
1924 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1925 "locked_at": now,
1926 },
1927 )
1928 return ro_task
1929
1930 if self.time_last_task_processed == now:
1931 self.time_last_task_processed = None
1932 return None
1933 else:
1934 self.time_last_task_processed = now
1935 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1936
1937 except DbException as e:
1938 self.logger.error("Database exception at _get_db_task: {}".format(e))
1939 except Exception as e:
1940 self.logger.critical(
1941 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1942 )
1943
1944 return None
1945
1946 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1947 """
1948 Determine if this task need to be done or superseded
1949 :return: None
1950 """
1951 my_task = ro_task["tasks"][task_index]
1952 task_id = my_task["task_id"]
1953 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1954 "created_items", False
1955 )
1956
1957 self.logger.debug("Needed delete: {}".format(needed_delete))
1958 if my_task["status"] == "FAILED":
1959 return None, None # TODO need to be retry??
1960
1961 try:
1962 for index, task in enumerate(ro_task["tasks"]):
1963 if index == task_index or not task:
1964 continue # own task
1965
1966 if (
1967 my_task["target_record"] == task["target_record"]
1968 and task["action"] == "CREATE"
1969 ):
1970 # set to finished
1971 db_update["tasks.{}.status".format(index)] = task[
1972 "status"
1973 ] = "FINISHED"
1974 elif task["action"] == "CREATE" and task["status"] not in (
1975 "FINISHED",
1976 "SUPERSEDED",
1977 ):
1978 needed_delete = False
1979
1980 if needed_delete:
1981 self.logger.debug(
1982 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
1983 )
1984 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1985 else:
1986 return "SUPERSEDED", None
1987 except Exception as e:
1988 if not isinstance(e, NsWorkerException):
1989 self.logger.critical(
1990 "Unexpected exception at _delete_task task={}: {}".format(
1991 task_id, e
1992 ),
1993 exc_info=True,
1994 )
1995
1996 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
1997
1998 def _create_task(self, ro_task, task_index, task_depends, db_update):
1999 """
2000 Determine if this task need to create something at VIM
2001 :return: None
2002 """
2003 my_task = ro_task["tasks"][task_index]
2004 task_id = my_task["task_id"]
2005 task_status = None
2006
2007 if my_task["status"] == "FAILED":
2008 return None, None # TODO need to be retry??
2009 elif my_task["status"] == "SCHEDULED":
2010 # check if already created by another task
2011 for index, task in enumerate(ro_task["tasks"]):
2012 if index == task_index or not task:
2013 continue # own task
2014
2015 if task["action"] == "CREATE" and task["status"] not in (
2016 "SCHEDULED",
2017 "FINISHED",
2018 "SUPERSEDED",
2019 ):
2020 return task["status"], "COPY_VIM_INFO"
2021
2022 try:
2023 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2024 ro_task, task_index, task_depends
2025 )
2026 # TODO update other CREATE tasks
2027 except Exception as e:
2028 if not isinstance(e, NsWorkerException):
2029 self.logger.error(
2030 "Error executing task={}: {}".format(task_id, e), exc_info=True
2031 )
2032
2033 task_status = "FAILED"
2034 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2035 # TODO update ro_vim_item_update
2036
2037 return task_status, ro_vim_item_update
2038 else:
2039 return None, None
2040
2041 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2042 """
2043 Look for dependency task
2044 :param task_id: Can be one of
2045 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2046 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2047 3. task.task_id: "<action_id>:number"
2048 :param ro_task:
2049 :param target_id:
2050 :return: database ro_task plus index of task
2051 """
2052 if (
2053 task_id.startswith("vim:")
2054 or task_id.startswith("sdn:")
2055 or task_id.startswith("wim:")
2056 ):
2057 target_id, _, task_id = task_id.partition(" ")
2058
2059 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2060 ro_task_dependency = self.db.get_one(
2061 "ro_tasks",
2062 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2063 fail_on_empty=False,
2064 )
2065
2066 if ro_task_dependency:
2067 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2068 if task["target_record_id"] == task_id:
2069 return ro_task_dependency, task_index
2070
2071 else:
2072 if ro_task:
2073 for task_index, task in enumerate(ro_task["tasks"]):
2074 if task and task["task_id"] == task_id:
2075 return ro_task, task_index
2076
2077 ro_task_dependency = self.db.get_one(
2078 "ro_tasks",
2079 q_filter={
2080 "tasks.ANYINDEX.task_id": task_id,
2081 "tasks.ANYINDEX.target_record.ne": None,
2082 },
2083 fail_on_empty=False,
2084 )
2085
2086 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2087 if ro_task_dependency:
2088 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2089 if task["task_id"] == task_id:
2090 return ro_task_dependency, task_index
2091 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2092
2093 def update_vm_refresh(self):
2094 """Enables the VM status updates if self.refresh_config.active parameter
2095 is not -1 and than updates the DB accordingly
2096
2097 """
2098 try:
2099 self.logger.debug("Checking if VM status update config")
2100 next_refresh = time.time()
2101 if self.refresh_config.active == -1:
2102 next_refresh = -1
2103 else:
2104 next_refresh += self.refresh_config.active
2105
2106 if next_refresh != -1:
2107 db_ro_task_update = {}
2108 now = time.time()
2109 next_check_at = now + (24 * 60 * 60)
2110 next_check_at = min(next_check_at, next_refresh)
2111 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2112 db_ro_task_update["to_check_at"] = next_check_at
2113
2114 self.logger.debug(
2115 "Finding tasks which to be updated to enable VM status updates"
2116 )
2117 refresh_tasks = self.db.get_list(
2118 "ro_tasks",
2119 q_filter={
2120 "tasks.status": "DONE",
2121 "to_check_at.lt": 0,
2122 },
2123 )
2124 self.logger.debug("Updating tasks to change the to_check_at status")
2125 for task in refresh_tasks:
2126 q_filter = {
2127 "_id": task["_id"],
2128 }
2129 self.db.set_one(
2130 "ro_tasks",
2131 q_filter=q_filter,
2132 update_dict=db_ro_task_update,
2133 fail_on_empty=True,
2134 )
2135
2136 except Exception as e:
2137 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2138
2139 def _process_pending_tasks(self, ro_task):
2140 ro_task_id = ro_task["_id"]
2141 now = time.time()
2142 # one day
2143 next_check_at = now + (24 * 60 * 60)
2144 db_ro_task_update = {}
2145
2146 def _update_refresh(new_status):
2147 # compute next_refresh
2148 nonlocal task
2149 nonlocal next_check_at
2150 nonlocal db_ro_task_update
2151 nonlocal ro_task
2152
2153 next_refresh = time.time()
2154
2155 if task["item"] in ("image", "flavor"):
2156 next_refresh += self.refresh_config.image
2157 elif new_status == "BUILD":
2158 next_refresh += self.refresh_config.build
2159 elif new_status == "DONE":
2160 if self.refresh_config.active == -1:
2161 next_refresh = -1
2162 else:
2163 next_refresh += self.refresh_config.active
2164 else:
2165 next_refresh += self.refresh_config.error
2166
2167 next_check_at = min(next_check_at, next_refresh)
2168 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2169 ro_task["vim_info"]["refresh_at"] = next_refresh
2170
2171 try:
2172 """
2173 # Log RO tasks only when loglevel is DEBUG
2174 if self.logger.getEffectiveLevel() == logging.DEBUG:
2175 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2176 """
2177 # Check if vim status refresh is enabled again
2178 self.update_vm_refresh()
2179 # 0: get task_status_create
2180 lock_object = None
2181 task_status_create = None
2182 task_create = next(
2183 (
2184 t
2185 for t in ro_task["tasks"]
2186 if t
2187 and t["action"] == "CREATE"
2188 and t["status"] in ("BUILD", "DONE")
2189 ),
2190 None,
2191 )
2192
2193 if task_create:
2194 task_status_create = task_create["status"]
2195
2196 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2197 for task_action in ("DELETE", "CREATE", "EXEC"):
2198 db_vim_update = None
2199 new_status = None
2200
2201 for task_index, task in enumerate(ro_task["tasks"]):
2202 if not task:
2203 continue # task deleted
2204
2205 task_depends = {}
2206 target_update = None
2207
2208 if (
2209 (
2210 task_action in ("DELETE", "EXEC")
2211 and task["status"] not in ("SCHEDULED", "BUILD")
2212 )
2213 or task["action"] != task_action
2214 or (
2215 task_action == "CREATE"
2216 and task["status"] in ("FINISHED", "SUPERSEDED")
2217 )
2218 ):
2219 continue
2220
2221 task_path = "tasks.{}.status".format(task_index)
2222 try:
2223 db_vim_info_update = None
2224
2225 if task["status"] == "SCHEDULED":
2226 # check if tasks that this depends on have been completed
2227 dependency_not_completed = False
2228
2229 for dependency_task_id in task.get("depends_on") or ():
2230 (
2231 dependency_ro_task,
2232 dependency_task_index,
2233 ) = self._get_dependency(
2234 dependency_task_id, target_id=ro_task["target_id"]
2235 )
2236 dependency_task = dependency_ro_task["tasks"][
2237 dependency_task_index
2238 ]
2239 self.logger.debug(
2240 "dependency_ro_task={} dependency_task_index={}".format(
2241 dependency_ro_task, dependency_task_index
2242 )
2243 )
2244
2245 if dependency_task["status"] == "SCHEDULED":
2246 dependency_not_completed = True
2247 next_check_at = min(
2248 next_check_at, dependency_ro_task["to_check_at"]
2249 )
2250 # must allow dependent task to be processed first
2251 # to do this set time after last_task_processed
2252 next_check_at = max(
2253 self.time_last_task_processed, next_check_at
2254 )
2255 break
2256 elif dependency_task["status"] == "FAILED":
2257 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2258 task["action"],
2259 task["item"],
2260 dependency_task["action"],
2261 dependency_task["item"],
2262 dependency_task_id,
2263 dependency_ro_task["vim_info"].get(
2264 "vim_message"
2265 ),
2266 )
2267 self.logger.error(
2268 "task={} {}".format(task["task_id"], error_text)
2269 )
2270 raise NsWorkerException(error_text)
2271
2272 task_depends[dependency_task_id] = dependency_ro_task[
2273 "vim_info"
2274 ]["vim_id"]
2275 task_depends[
2276 "TASK-{}".format(dependency_task_id)
2277 ] = dependency_ro_task["vim_info"]["vim_id"]
2278
2279 if dependency_not_completed:
2280 self.logger.warning(
2281 "DEPENDENCY NOT COMPLETED {}".format(
2282 dependency_ro_task["vim_info"]["vim_id"]
2283 )
2284 )
2285 # TODO set at vim_info.vim_details that it is waiting
2286 continue
2287
2288 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2289 # the task of renew this locking. It will update database locket_at periodically
2290 if not lock_object:
2291 lock_object = LockRenew.add_lock_object(
2292 "ro_tasks", ro_task, self
2293 )
2294
2295 if task["action"] == "DELETE":
2296 (new_status, db_vim_info_update,) = self._delete_task(
2297 ro_task, task_index, task_depends, db_ro_task_update
2298 )
2299 new_status = (
2300 "FINISHED" if new_status == "DONE" else new_status
2301 )
2302 # ^with FINISHED instead of DONE it will not be refreshing
2303
2304 if new_status in ("FINISHED", "SUPERSEDED"):
2305 target_update = "DELETE"
2306 elif task["action"] == "EXEC":
2307 (
2308 new_status,
2309 db_vim_info_update,
2310 db_task_update,
2311 ) = self.item2class[task["item"]].exec(
2312 ro_task, task_index, task_depends
2313 )
2314 new_status = (
2315 "FINISHED" if new_status == "DONE" else new_status
2316 )
2317 # ^with FINISHED instead of DONE it will not be refreshing
2318
2319 if db_task_update:
2320 # load into database the modified db_task_update "retries" and "next_retry"
2321 if db_task_update.get("retries"):
2322 db_ro_task_update[
2323 "tasks.{}.retries".format(task_index)
2324 ] = db_task_update["retries"]
2325
2326 next_check_at = time.time() + db_task_update.get(
2327 "next_retry", 60
2328 )
2329 target_update = None
2330 elif task["action"] == "CREATE":
2331 if task["status"] == "SCHEDULED":
2332 if task_status_create:
2333 new_status = task_status_create
2334 target_update = "COPY_VIM_INFO"
2335 else:
2336 new_status, db_vim_info_update = self.item2class[
2337 task["item"]
2338 ].new(ro_task, task_index, task_depends)
2339 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2340 _update_refresh(new_status)
2341 else:
2342 refresh_at = ro_task["vim_info"]["refresh_at"]
2343 if refresh_at and refresh_at != -1 and now > refresh_at:
2344 (new_status, db_vim_info_update,) = self.item2class[
2345 task["item"]
2346 ].refresh(ro_task)
2347 _update_refresh(new_status)
2348 else:
2349 # The refresh is updated to avoid set the value of "refresh_at" to
2350 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2351 # because it can happen that in this case the task is never processed
2352 _update_refresh(task["status"])
2353
2354 except Exception as e:
2355 new_status = "FAILED"
2356 db_vim_info_update = {
2357 "vim_status": "VIM_ERROR",
2358 "vim_message": str(e),
2359 }
2360
2361 if not isinstance(
2362 e, (NsWorkerException, vimconn.VimConnException)
2363 ):
2364 self.logger.error(
2365 "Unexpected exception at _delete_task task={}: {}".format(
2366 task["task_id"], e
2367 ),
2368 exc_info=True,
2369 )
2370
2371 try:
2372 if db_vim_info_update:
2373 db_vim_update = db_vim_info_update.copy()
2374 db_ro_task_update.update(
2375 {
2376 "vim_info." + k: v
2377 for k, v in db_vim_info_update.items()
2378 }
2379 )
2380 ro_task["vim_info"].update(db_vim_info_update)
2381
2382 if new_status:
2383 if task_action == "CREATE":
2384 task_status_create = new_status
2385 db_ro_task_update[task_path] = new_status
2386
2387 if target_update or db_vim_update:
2388 if target_update == "DELETE":
2389 self._update_target(task, None)
2390 elif target_update == "COPY_VIM_INFO":
2391 self._update_target(task, ro_task["vim_info"])
2392 else:
2393 self._update_target(task, db_vim_update)
2394
2395 except Exception as e:
2396 if (
2397 isinstance(e, DbException)
2398 and e.http_code == HTTPStatus.NOT_FOUND
2399 ):
2400 # if the vnfrs or nsrs has been removed from database, this task must be removed
2401 self.logger.debug(
2402 "marking to delete task={}".format(task["task_id"])
2403 )
2404 self.tasks_to_delete.append(task)
2405 else:
2406 self.logger.error(
2407 "Unexpected exception at _update_target task={}: {}".format(
2408 task["task_id"], e
2409 ),
2410 exc_info=True,
2411 )
2412
2413 locked_at = ro_task["locked_at"]
2414
2415 if lock_object:
2416 locked_at = [
2417 lock_object["locked_at"],
2418 lock_object["locked_at"] + self.task_locked_time,
2419 ]
2420 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2421 # contain exactly locked_at + self.task_locked_time
2422 LockRenew.remove_lock_object(lock_object)
2423
2424 q_filter = {
2425 "_id": ro_task["_id"],
2426 "to_check_at": ro_task["to_check_at"],
2427 "locked_at": locked_at,
2428 }
2429 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2430 # outside this task (by ro_nbi) do not update it
2431 db_ro_task_update["locked_by"] = None
2432 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2433 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2434 db_ro_task_update["modified_at"] = now
2435 db_ro_task_update["to_check_at"] = next_check_at
2436
2437 """
2438 # Log RO tasks only when loglevel is DEBUG
2439 if self.logger.getEffectiveLevel() == logging.DEBUG:
2440 db_ro_task_update_log = db_ro_task_update.copy()
2441 db_ro_task_update_log["_id"] = q_filter["_id"]
2442 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2443 """
2444
2445 if not self.db.set_one(
2446 "ro_tasks",
2447 update_dict=db_ro_task_update,
2448 q_filter=q_filter,
2449 fail_on_empty=False,
2450 ):
2451 del db_ro_task_update["to_check_at"]
2452 del q_filter["to_check_at"]
2453 """
2454 # Log RO tasks only when loglevel is DEBUG
2455 if self.logger.getEffectiveLevel() == logging.DEBUG:
2456 self._log_ro_task(
2457 None,
2458 db_ro_task_update_log,
2459 None,
2460 "TASK_WF",
2461 "SET_TASK " + str(q_filter),
2462 )
2463 """
2464 self.db.set_one(
2465 "ro_tasks",
2466 q_filter=q_filter,
2467 update_dict=db_ro_task_update,
2468 fail_on_empty=True,
2469 )
2470 except DbException as e:
2471 self.logger.error(
2472 "ro_task={} Error updating database {}".format(ro_task_id, e)
2473 )
2474 except Exception as e:
2475 self.logger.error(
2476 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2477 )
2478
2479 def _update_target(self, task, ro_vim_item_update):
2480 table, _, temp = task["target_record"].partition(":")
2481 _id, _, path_vim_status = temp.partition(":")
2482 path_item = path_vim_status[: path_vim_status.rfind(".")]
2483 path_item = path_item[: path_item.rfind(".")]
2484 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2485 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2486
2487 if ro_vim_item_update:
2488 update_dict = {
2489 path_vim_status + "." + k: v
2490 for k, v in ro_vim_item_update.items()
2491 if k
2492 in (
2493 "vim_id",
2494 "vim_details",
2495 "vim_message",
2496 "vim_name",
2497 "vim_status",
2498 "interfaces",
2499 "interfaces_backup",
2500 )
2501 }
2502
2503 if path_vim_status.startswith("vdur."):
2504 # for backward compatibility, add vdur.name apart from vdur.vim_name
2505 if ro_vim_item_update.get("vim_name"):
2506 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2507
2508 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2509 if ro_vim_item_update.get("vim_id"):
2510 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2511
2512 # update general status
2513 if ro_vim_item_update.get("vim_status"):
2514 update_dict[path_item + ".status"] = ro_vim_item_update[
2515 "vim_status"
2516 ]
2517
2518 if ro_vim_item_update.get("interfaces"):
2519 path_interfaces = path_item + ".interfaces"
2520
2521 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2522 if iface:
2523 update_dict.update(
2524 {
2525 path_interfaces + ".{}.".format(i) + k: v
2526 for k, v in iface.items()
2527 if k in ("vlan", "compute_node", "pci")
2528 }
2529 )
2530
2531 # put ip_address and mac_address with ip-address and mac-address
2532 if iface.get("ip_address"):
2533 update_dict[
2534 path_interfaces + ".{}.".format(i) + "ip-address"
2535 ] = iface["ip_address"]
2536
2537 if iface.get("mac_address"):
2538 update_dict[
2539 path_interfaces + ".{}.".format(i) + "mac-address"
2540 ] = iface["mac_address"]
2541
2542 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2543 update_dict["ip-address"] = iface.get("ip_address").split(
2544 ";"
2545 )[0]
2546
2547 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2548 update_dict[path_item + ".ip-address"] = iface.get(
2549 "ip_address"
2550 ).split(";")[0]
2551
2552 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2553
2554 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2555 if ro_vim_item_update.get("interfaces"):
2556 search_key = path_vim_status + ".interfaces"
2557 if update_dict.get(search_key):
2558 interfaces_backup_update = {
2559 path_vim_status + ".interfaces_backup": update_dict[search_key]
2560 }
2561
2562 self.db.set_one(
2563 table,
2564 q_filter={"_id": _id},
2565 update_dict=interfaces_backup_update,
2566 )
2567
2568 else:
2569 update_dict = {path_item + ".status": "DELETED"}
2570 self.db.set_one(
2571 table,
2572 q_filter={"_id": _id},
2573 update_dict=update_dict,
2574 unset={path_vim_status: None},
2575 )
2576
2577 def _process_delete_db_tasks(self):
2578 """
2579 Delete task from database because vnfrs or nsrs or both have been deleted
2580 :return: None. Uses and modify self.tasks_to_delete
2581 """
2582 while self.tasks_to_delete:
2583 task = self.tasks_to_delete[0]
2584 vnfrs_deleted = None
2585 nsr_id = task["nsr_id"]
2586
2587 if task["target_record"].startswith("vnfrs:"):
2588 # check if nsrs is present
2589 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2590 vnfrs_deleted = task["target_record"].split(":")[1]
2591
2592 try:
2593 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2594 except Exception as e:
2595 self.logger.error(
2596 "Error deleting task={}: {}".format(task["task_id"], e)
2597 )
2598 self.tasks_to_delete.pop(0)
2599
2600 @staticmethod
2601 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2602 """
2603 Static method because it is called from osm_ng_ro.ns
2604 :param db: instance of database to use
2605 :param nsr_id: affected nsrs id
2606 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2607 :return: None, exception is fails
2608 """
2609 retries = 5
2610 for retry in range(retries):
2611 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2612 now = time.time()
2613 conflict = False
2614
2615 for ro_task in ro_tasks:
2616 db_update = {}
2617 to_delete_ro_task = True
2618
2619 for index, task in enumerate(ro_task["tasks"]):
2620 if not task:
2621 pass
2622 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2623 vnfrs_deleted
2624 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2625 ):
2626 db_update["tasks.{}".format(index)] = None
2627 else:
2628 # used by other nsr, ro_task cannot be deleted
2629 to_delete_ro_task = False
2630
2631 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2632 if to_delete_ro_task:
2633 if not db.del_one(
2634 "ro_tasks",
2635 q_filter={
2636 "_id": ro_task["_id"],
2637 "modified_at": ro_task["modified_at"],
2638 },
2639 fail_on_empty=False,
2640 ):
2641 conflict = True
2642 elif db_update:
2643 db_update["modified_at"] = now
2644 if not db.set_one(
2645 "ro_tasks",
2646 q_filter={
2647 "_id": ro_task["_id"],
2648 "modified_at": ro_task["modified_at"],
2649 },
2650 update_dict=db_update,
2651 fail_on_empty=False,
2652 ):
2653 conflict = True
2654 if not conflict:
2655 return
2656 else:
2657 raise NsWorkerException("Exceeded {} retries".format(retries))
2658
2659 def run(self):
2660 # load database
2661 self.logger.info("Starting")
2662 while True:
2663 # step 1: get commands from queue
2664 try:
2665 if self.vim_targets:
2666 task = self.task_queue.get(block=False)
2667 else:
2668 if not self.idle:
2669 self.logger.debug("enters in idle state")
2670 self.idle = True
2671 task = self.task_queue.get(block=True)
2672 self.idle = False
2673
2674 if task[0] == "terminate":
2675 break
2676 elif task[0] == "load_vim":
2677 self.logger.info("order to load vim {}".format(task[1]))
2678 self._load_vim(task[1])
2679 elif task[0] == "unload_vim":
2680 self.logger.info("order to unload vim {}".format(task[1]))
2681 self._unload_vim(task[1])
2682 elif task[0] == "reload_vim":
2683 self._reload_vim(task[1])
2684 elif task[0] == "check_vim":
2685 self.logger.info("order to check vim {}".format(task[1]))
2686 self._check_vim(task[1])
2687 continue
2688 except Exception as e:
2689 if isinstance(e, queue.Empty):
2690 pass
2691 else:
2692 self.logger.critical(
2693 "Error processing task: {}".format(e), exc_info=True
2694 )
2695
2696 # step 2: process pending_tasks, delete not needed tasks
2697 try:
2698 if self.tasks_to_delete:
2699 self._process_delete_db_tasks()
2700 busy = False
2701 """
2702 # Log RO tasks only when loglevel is DEBUG
2703 if self.logger.getEffectiveLevel() == logging.DEBUG:
2704 _ = self._get_db_all_tasks()
2705 """
2706 ro_task = self._get_db_task()
2707 if ro_task:
2708 self.logger.debug("Task to process: {}".format(ro_task))
2709 time.sleep(1)
2710 self._process_pending_tasks(ro_task)
2711 busy = True
2712 if not busy:
2713 time.sleep(5)
2714 except Exception as e:
2715 self.logger.critical(
2716 "Unexpected exception at run: " + str(e), exc_info=True
2717 )
2718
2719 self.logger.info("Finishing")