Fix Bug 2012 use existing volumes as instantiation parameters
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from unittest.mock import Mock
37
38 from importlib_metadata import entry_points
39 from osm_common.dbbase import DbException
40 from osm_ng_ro.vim_admin import LockRenew
41 from osm_ro_plugin import sdnconn, vimconn
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin.vim_dummy import VimDummyConnector
44 import yaml
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 "vim_message": None,
215 }
216 self.logger.debug(
217 "task={} {} new-net={} created={}".format(
218 task_id, ro_task["target_id"], vim_net_id, created
219 )
220 )
221
222 return "BUILD", ro_vim_item_update
223 except (vimconn.VimConnException, NsWorkerException) as e:
224 self.logger.error(
225 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
226 )
227 ro_vim_item_update = {
228 "vim_status": "VIM_ERROR",
229 "created": created,
230 "vim_message": str(e),
231 }
232
233 return "FAILED", ro_vim_item_update
234
235 def refresh(self, ro_task):
236 """Call VIM to get network status"""
237 ro_task_id = ro_task["_id"]
238 target_vim = self.my_vims[ro_task["target_id"]]
239 vim_id = ro_task["vim_info"]["vim_id"]
240 net_to_refresh_list = [vim_id]
241
242 try:
243 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
244 vim_info = vim_dict[vim_id]
245
246 if vim_info["status"] == "ACTIVE":
247 task_status = "DONE"
248 elif vim_info["status"] == "BUILD":
249 task_status = "BUILD"
250 else:
251 task_status = "FAILED"
252 except vimconn.VimConnException as e:
253 # Mark all tasks at VIM_ERROR status
254 self.logger.error(
255 "ro_task={} vim={} get-net={}: {}".format(
256 ro_task_id, ro_task["target_id"], vim_id, e
257 )
258 )
259 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
260 task_status = "FAILED"
261
262 ro_vim_item_update = {}
263 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
264 ro_vim_item_update["vim_status"] = vim_info["status"]
265
266 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
267 ro_vim_item_update["vim_name"] = vim_info.get("name")
268
269 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
270 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
271 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
272 elif vim_info["status"] == "DELETED":
273 ro_vim_item_update["vim_id"] = None
274 ro_vim_item_update["vim_message"] = "Deleted externally"
275 else:
276 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
277 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
278
279 if ro_vim_item_update:
280 self.logger.debug(
281 "ro_task={} {} get-net={}: status={} {}".format(
282 ro_task_id,
283 ro_task["target_id"],
284 vim_id,
285 ro_vim_item_update.get("vim_status"),
286 ro_vim_item_update.get("vim_message")
287 if ro_vim_item_update.get("vim_status") != "ACTIVE"
288 else "",
289 )
290 )
291
292 return task_status, ro_vim_item_update
293
294 def delete(self, ro_task, task_index):
295 task = ro_task["tasks"][task_index]
296 task_id = task["task_id"]
297 net_vim_id = ro_task["vim_info"]["vim_id"]
298 ro_vim_item_update_ok = {
299 "vim_status": "DELETED",
300 "created": False,
301 "vim_message": "DELETED",
302 "vim_id": None,
303 }
304
305 try:
306 if net_vim_id or ro_task["vim_info"]["created_items"]:
307 target_vim = self.my_vims[ro_task["target_id"]]
308 target_vim.delete_network(
309 net_vim_id, ro_task["vim_info"]["created_items"]
310 )
311 except vimconn.VimConnNotFoundException:
312 ro_vim_item_update_ok["vim_message"] = "already deleted"
313 except vimconn.VimConnException as e:
314 self.logger.error(
315 "ro_task={} vim={} del-net={}: {}".format(
316 ro_task["_id"], ro_task["target_id"], net_vim_id, e
317 )
318 )
319 ro_vim_item_update = {
320 "vim_status": "VIM_ERROR",
321 "vim_message": "Error while deleting: {}".format(e),
322 }
323
324 return "FAILED", ro_vim_item_update
325
326 self.logger.debug(
327 "task={} {} del-net={} {}".format(
328 task_id,
329 ro_task["target_id"],
330 net_vim_id,
331 ro_vim_item_update_ok.get("vim_message", ""),
332 )
333 )
334
335 return "DONE", ro_vim_item_update_ok
336
337
338 class VimInteractionVdu(VimInteractionBase):
339 max_retries_inject_ssh_key = 20 # 20 times
340 time_retries_inject_ssh_key = 30 # wevery 30 seconds
341
342 def new(self, ro_task, task_index, task_depends):
343 task = ro_task["tasks"][task_index]
344 task_id = task["task_id"]
345 created = False
346 created_items = {}
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390
391 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
392 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393
394 # add to created items previous_created_volumes (healing)
395 if task.get("previous_created_volumes"):
396 for k, v in task["previous_created_volumes"].items():
397 created_items[k] = v
398
399 ro_vim_item_update = {
400 "vim_id": vim_vm_id,
401 "vim_status": "BUILD",
402 "created": created,
403 "created_items": created_items,
404 "vim_details": None,
405 "vim_message": None,
406 "interfaces_vim_ids": interfaces,
407 "interfaces": [],
408 "interfaces_backup": [],
409 }
410 self.logger.debug(
411 "task={} {} new-vm={} created={}".format(
412 task_id, ro_task["target_id"], vim_vm_id, created
413 )
414 )
415
416 return "BUILD", ro_vim_item_update
417 except (vimconn.VimConnException, NsWorkerException) as e:
418 self.logger.debug(traceback.format_exc())
419 self.logger.error(
420 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
421 )
422 ro_vim_item_update = {
423 "vim_status": "VIM_ERROR",
424 "created": created,
425 "vim_message": str(e),
426 }
427
428 return "FAILED", ro_vim_item_update
429
430 def delete(self, ro_task, task_index):
431 task = ro_task["tasks"][task_index]
432 task_id = task["task_id"]
433 vm_vim_id = ro_task["vim_info"]["vim_id"]
434 ro_vim_item_update_ok = {
435 "vim_status": "DELETED",
436 "created": False,
437 "vim_message": "DELETED",
438 "vim_id": None,
439 }
440
441 try:
442 self.logger.debug(
443 "delete_vminstance: vm_vim_id={} created_items={}".format(
444 vm_vim_id, ro_task["vim_info"]["created_items"]
445 )
446 )
447 if vm_vim_id or ro_task["vim_info"]["created_items"]:
448 target_vim = self.my_vims[ro_task["target_id"]]
449 target_vim.delete_vminstance(
450 vm_vim_id,
451 ro_task["vim_info"]["created_items"],
452 ro_task["vim_info"].get("volumes_to_hold", []),
453 )
454 except vimconn.VimConnNotFoundException:
455 ro_vim_item_update_ok["vim_message"] = "already deleted"
456 except vimconn.VimConnException as e:
457 self.logger.error(
458 "ro_task={} vim={} del-vm={}: {}".format(
459 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
460 )
461 )
462 ro_vim_item_update = {
463 "vim_status": "VIM_ERROR",
464 "vim_message": "Error while deleting: {}".format(e),
465 }
466
467 return "FAILED", ro_vim_item_update
468
469 self.logger.debug(
470 "task={} {} del-vm={} {}".format(
471 task_id,
472 ro_task["target_id"],
473 vm_vim_id,
474 ro_vim_item_update_ok.get("vim_message", ""),
475 )
476 )
477
478 return "DONE", ro_vim_item_update_ok
479
480 def refresh(self, ro_task):
481 """Call VIM to get vm status"""
482 ro_task_id = ro_task["_id"]
483 target_vim = self.my_vims[ro_task["target_id"]]
484 vim_id = ro_task["vim_info"]["vim_id"]
485
486 if not vim_id:
487 return None, None
488
489 vm_to_refresh_list = [vim_id]
490 try:
491 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
492 vim_info = vim_dict[vim_id]
493
494 if vim_info["status"] == "ACTIVE":
495 task_status = "DONE"
496 elif vim_info["status"] == "BUILD":
497 task_status = "BUILD"
498 else:
499 task_status = "FAILED"
500
501 # try to load and parse vim_information
502 try:
503 vim_info_info = yaml.safe_load(vim_info["vim_info"])
504 if vim_info_info.get("name"):
505 vim_info["name"] = vim_info_info["name"]
506 except Exception:
507 pass
508 except vimconn.VimConnException as e:
509 # Mark all tasks at VIM_ERROR status
510 self.logger.error(
511 "ro_task={} vim={} get-vm={}: {}".format(
512 ro_task_id, ro_task["target_id"], vim_id, e
513 )
514 )
515 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
516 task_status = "FAILED"
517
518 ro_vim_item_update = {}
519
520 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
521 vim_interfaces = []
522 if vim_info.get("interfaces"):
523 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
524 iface = next(
525 (
526 iface
527 for iface in vim_info["interfaces"]
528 if vim_iface_id == iface["vim_interface_id"]
529 ),
530 None,
531 )
532 # if iface:
533 # iface.pop("vim_info", None)
534 vim_interfaces.append(iface)
535
536 task_create = next(
537 t
538 for t in ro_task["tasks"]
539 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
540 )
541 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
542 vim_interfaces[task_create["mgmt_vnf_interface"]][
543 "mgmt_vnf_interface"
544 ] = True
545
546 mgmt_vdu_iface = task_create.get(
547 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
548 )
549 if vim_interfaces:
550 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
551
552 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
553 ro_vim_item_update["interfaces"] = vim_interfaces
554
555 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
556 ro_vim_item_update["vim_status"] = vim_info["status"]
557
558 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
559 ro_vim_item_update["vim_name"] = vim_info.get("name")
560
561 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
562 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
563 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
564 elif vim_info["status"] == "DELETED":
565 ro_vim_item_update["vim_id"] = None
566 ro_vim_item_update["vim_message"] = "Deleted externally"
567 else:
568 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
569 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
570
571 if ro_vim_item_update:
572 self.logger.debug(
573 "ro_task={} {} get-vm={}: status={} {}".format(
574 ro_task_id,
575 ro_task["target_id"],
576 vim_id,
577 ro_vim_item_update.get("vim_status"),
578 ro_vim_item_update.get("vim_message")
579 if ro_vim_item_update.get("vim_status") != "ACTIVE"
580 else "",
581 )
582 )
583
584 return task_status, ro_vim_item_update
585
586 def exec(self, ro_task, task_index, task_depends):
587 task = ro_task["tasks"][task_index]
588 task_id = task["task_id"]
589 target_vim = self.my_vims[ro_task["target_id"]]
590 db_task_update = {"retries": 0}
591 retries = task.get("retries", 0)
592
593 try:
594 params = task["params"]
595 params_copy = deepcopy(params)
596 params_copy["ro_key"] = self.db.decrypt(
597 params_copy.pop("private_key"),
598 params_copy.pop("schema_version"),
599 params_copy.pop("salt"),
600 )
601 params_copy["ip_addr"] = params_copy.pop("ip_address")
602 target_vim.inject_user_key(**params_copy)
603 self.logger.debug(
604 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
605 )
606
607 return (
608 "DONE",
609 None,
610 db_task_update,
611 ) # params_copy["key"]
612 except (vimconn.VimConnException, NsWorkerException) as e:
613 retries += 1
614
615 self.logger.debug(traceback.format_exc())
616 if retries < self.max_retries_inject_ssh_key:
617 return (
618 "BUILD",
619 None,
620 {
621 "retries": retries,
622 "next_retry": self.time_retries_inject_ssh_key,
623 },
624 )
625
626 self.logger.error(
627 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
628 )
629 ro_vim_item_update = {"vim_message": str(e)}
630
631 return "FAILED", ro_vim_item_update, db_task_update
632
633
634 class VimInteractionImage(VimInteractionBase):
635 def new(self, ro_task, task_index, task_depends):
636 task = ro_task["tasks"][task_index]
637 task_id = task["task_id"]
638 created = False
639 created_items = {}
640 target_vim = self.my_vims[ro_task["target_id"]]
641
642 try:
643 # FIND
644 if task.get("find_params"):
645 vim_images = target_vim.get_image_list(**task["find_params"])
646
647 if not vim_images:
648 raise NsWorkerExceptionNotFound(
649 "Image not found with this criteria: '{}'".format(
650 task["find_params"]
651 )
652 )
653 elif len(vim_images) > 1:
654 raise NsWorkerException(
655 "More than one image found with this criteria: '{}'".format(
656 task["find_params"]
657 )
658 )
659 else:
660 vim_image_id = vim_images[0]["id"]
661
662 ro_vim_item_update = {
663 "vim_id": vim_image_id,
664 "vim_status": "DONE",
665 "created": created,
666 "created_items": created_items,
667 "vim_details": None,
668 "vim_message": None,
669 }
670 self.logger.debug(
671 "task={} {} new-image={} created={}".format(
672 task_id, ro_task["target_id"], vim_image_id, created
673 )
674 )
675
676 return "DONE", ro_vim_item_update
677 except (NsWorkerException, vimconn.VimConnException) as e:
678 self.logger.error(
679 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
680 )
681 ro_vim_item_update = {
682 "vim_status": "VIM_ERROR",
683 "created": created,
684 "vim_message": str(e),
685 }
686
687 return "FAILED", ro_vim_item_update
688
689
690 class VimInteractionFlavor(VimInteractionBase):
691 def delete(self, ro_task, task_index):
692 task = ro_task["tasks"][task_index]
693 task_id = task["task_id"]
694 flavor_vim_id = ro_task["vim_info"]["vim_id"]
695 ro_vim_item_update_ok = {
696 "vim_status": "DELETED",
697 "created": False,
698 "vim_message": "DELETED",
699 "vim_id": None,
700 }
701
702 try:
703 if flavor_vim_id:
704 target_vim = self.my_vims[ro_task["target_id"]]
705 target_vim.delete_flavor(flavor_vim_id)
706 except vimconn.VimConnNotFoundException:
707 ro_vim_item_update_ok["vim_message"] = "already deleted"
708 except vimconn.VimConnException as e:
709 self.logger.error(
710 "ro_task={} vim={} del-flavor={}: {}".format(
711 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
712 )
713 )
714 ro_vim_item_update = {
715 "vim_status": "VIM_ERROR",
716 "vim_message": "Error while deleting: {}".format(e),
717 }
718
719 return "FAILED", ro_vim_item_update
720
721 self.logger.debug(
722 "task={} {} del-flavor={} {}".format(
723 task_id,
724 ro_task["target_id"],
725 flavor_vim_id,
726 ro_vim_item_update_ok.get("vim_message", ""),
727 )
728 )
729
730 return "DONE", ro_vim_item_update_ok
731
732 def new(self, ro_task, task_index, task_depends):
733 task = ro_task["tasks"][task_index]
734 task_id = task["task_id"]
735 created = False
736 created_items = {}
737 target_vim = self.my_vims[ro_task["target_id"]]
738
739 try:
740 # FIND
741 vim_flavor_id = None
742
743 if task.get("find_params"):
744 try:
745 flavor_data = task["find_params"]["flavor_data"]
746 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
747 except vimconn.VimConnNotFoundException:
748 pass
749
750 if not vim_flavor_id and task.get("params"):
751 # CREATE
752 flavor_data = task["params"]["flavor_data"]
753 vim_flavor_id = target_vim.new_flavor(flavor_data)
754 created = True
755
756 ro_vim_item_update = {
757 "vim_id": vim_flavor_id,
758 "vim_status": "DONE",
759 "created": created,
760 "created_items": created_items,
761 "vim_details": None,
762 "vim_message": None,
763 }
764 self.logger.debug(
765 "task={} {} new-flavor={} created={}".format(
766 task_id, ro_task["target_id"], vim_flavor_id, created
767 )
768 )
769
770 return "DONE", ro_vim_item_update
771 except (vimconn.VimConnException, NsWorkerException) as e:
772 self.logger.error(
773 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
774 )
775 ro_vim_item_update = {
776 "vim_status": "VIM_ERROR",
777 "created": created,
778 "vim_message": str(e),
779 }
780
781 return "FAILED", ro_vim_item_update
782
783
784 class VimInteractionAffinityGroup(VimInteractionBase):
785 def delete(self, ro_task, task_index):
786 task = ro_task["tasks"][task_index]
787 task_id = task["task_id"]
788 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
789 ro_vim_item_update_ok = {
790 "vim_status": "DELETED",
791 "created": False,
792 "vim_message": "DELETED",
793 "vim_id": None,
794 }
795
796 try:
797 if affinity_group_vim_id:
798 target_vim = self.my_vims[ro_task["target_id"]]
799 target_vim.delete_affinity_group(affinity_group_vim_id)
800 except vimconn.VimConnNotFoundException:
801 ro_vim_item_update_ok["vim_message"] = "already deleted"
802 except vimconn.VimConnException as e:
803 self.logger.error(
804 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
805 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
806 )
807 )
808 ro_vim_item_update = {
809 "vim_status": "VIM_ERROR",
810 "vim_message": "Error while deleting: {}".format(e),
811 }
812
813 return "FAILED", ro_vim_item_update
814
815 self.logger.debug(
816 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
817 task_id,
818 ro_task["target_id"],
819 affinity_group_vim_id,
820 ro_vim_item_update_ok.get("vim_message", ""),
821 )
822 )
823
824 return "DONE", ro_vim_item_update_ok
825
826 def new(self, ro_task, task_index, task_depends):
827 task = ro_task["tasks"][task_index]
828 task_id = task["task_id"]
829 created = False
830 created_items = {}
831 target_vim = self.my_vims[ro_task["target_id"]]
832
833 try:
834 affinity_group_vim_id = None
835 affinity_group_data = None
836
837 if task.get("params"):
838 affinity_group_data = task["params"].get("affinity_group_data")
839
840 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
841 try:
842 param_affinity_group_id = task["params"]["affinity_group_data"].get(
843 "vim-affinity-group-id"
844 )
845 affinity_group_vim_id = target_vim.get_affinity_group(
846 param_affinity_group_id
847 ).get("id")
848 except vimconn.VimConnNotFoundException:
849 self.logger.error(
850 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
851 "could not be found at VIM. Creating a new one.".format(
852 task_id, ro_task["target_id"], param_affinity_group_id
853 )
854 )
855
856 if not affinity_group_vim_id and affinity_group_data:
857 affinity_group_vim_id = target_vim.new_affinity_group(
858 affinity_group_data
859 )
860 created = True
861
862 ro_vim_item_update = {
863 "vim_id": affinity_group_vim_id,
864 "vim_status": "DONE",
865 "created": created,
866 "created_items": created_items,
867 "vim_details": None,
868 "vim_message": None,
869 }
870 self.logger.debug(
871 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
872 task_id, ro_task["target_id"], affinity_group_vim_id, created
873 )
874 )
875
876 return "DONE", ro_vim_item_update
877 except (vimconn.VimConnException, NsWorkerException) as e:
878 self.logger.error(
879 "task={} vim={} new-affinity-or-anti-affinity-group:"
880 " {}".format(task_id, ro_task["target_id"], e)
881 )
882 ro_vim_item_update = {
883 "vim_status": "VIM_ERROR",
884 "created": created,
885 "vim_message": str(e),
886 }
887
888 return "FAILED", ro_vim_item_update
889
890
891 class VimInteractionUpdateVdu(VimInteractionBase):
892 def exec(self, ro_task, task_index, task_depends):
893 task = ro_task["tasks"][task_index]
894 task_id = task["task_id"]
895 db_task_update = {"retries": 0}
896 created = False
897 created_items = {}
898 target_vim = self.my_vims[ro_task["target_id"]]
899
900 try:
901 if task.get("params"):
902 vim_vm_id = task["params"].get("vim_vm_id")
903 action = task["params"].get("action")
904 context = {action: action}
905 target_vim.action_vminstance(vim_vm_id, context)
906 # created = True
907 ro_vim_item_update = {
908 "vim_id": vim_vm_id,
909 "vim_status": "DONE",
910 "created": created,
911 "created_items": created_items,
912 "vim_details": None,
913 "vim_message": None,
914 }
915 self.logger.debug(
916 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
917 )
918 return "DONE", ro_vim_item_update, db_task_update
919 except (vimconn.VimConnException, NsWorkerException) as e:
920 self.logger.error(
921 "task={} vim={} VM Migration:"
922 " {}".format(task_id, ro_task["target_id"], e)
923 )
924 ro_vim_item_update = {
925 "vim_status": "VIM_ERROR",
926 "created": created,
927 "vim_message": str(e),
928 }
929
930 return "FAILED", ro_vim_item_update, db_task_update
931
932
933 class VimInteractionSdnNet(VimInteractionBase):
934 @staticmethod
935 def _match_pci(port_pci, mapping):
936 """
937 Check if port_pci matches with mapping
938 mapping can have brackets to indicate that several chars are accepted. e.g
939 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
940 :param port_pci: text
941 :param mapping: text, can contain brackets to indicate several chars are available
942 :return: True if matches, False otherwise
943 """
944 if not port_pci or not mapping:
945 return False
946 if port_pci == mapping:
947 return True
948
949 mapping_index = 0
950 pci_index = 0
951 while True:
952 bracket_start = mapping.find("[", mapping_index)
953
954 if bracket_start == -1:
955 break
956
957 bracket_end = mapping.find("]", bracket_start)
958 if bracket_end == -1:
959 break
960
961 length = bracket_start - mapping_index
962 if (
963 length
964 and port_pci[pci_index : pci_index + length]
965 != mapping[mapping_index:bracket_start]
966 ):
967 return False
968
969 if (
970 port_pci[pci_index + length]
971 not in mapping[bracket_start + 1 : bracket_end]
972 ):
973 return False
974
975 pci_index += length + 1
976 mapping_index = bracket_end + 1
977
978 if port_pci[pci_index:] != mapping[mapping_index:]:
979 return False
980
981 return True
982
983 def _get_interfaces(self, vlds_to_connect, vim_account_id):
984 """
985 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
986 :param vim_account_id:
987 :return:
988 """
989 interfaces = []
990
991 for vld in vlds_to_connect:
992 table, _, db_id = vld.partition(":")
993 db_id, _, vld = db_id.partition(":")
994 _, _, vld_id = vld.partition(".")
995
996 if table == "vnfrs":
997 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
998 iface_key = "vnf-vld-id"
999 else: # table == "nsrs"
1000 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1001 iface_key = "ns-vld-id"
1002
1003 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1004
1005 for db_vnfr in db_vnfrs:
1006 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1007 for iface_index, interface in enumerate(vdur["interfaces"]):
1008 if interface.get(iface_key) == vld_id and interface.get(
1009 "type"
1010 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1011 # only SR-IOV o PT
1012 interface_ = interface.copy()
1013 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1014 db_vnfr["_id"], vdu_index, iface_index
1015 )
1016
1017 if vdur.get("status") == "ERROR":
1018 interface_["status"] = "ERROR"
1019
1020 interfaces.append(interface_)
1021
1022 return interfaces
1023
1024 def refresh(self, ro_task):
1025 # look for task create
1026 task_create_index, _ = next(
1027 i_t
1028 for i_t in enumerate(ro_task["tasks"])
1029 if i_t[1]
1030 and i_t[1]["action"] == "CREATE"
1031 and i_t[1]["status"] != "FINISHED"
1032 )
1033
1034 return self.new(ro_task, task_create_index, None)
1035
1036 def new(self, ro_task, task_index, task_depends):
1037
1038 task = ro_task["tasks"][task_index]
1039 task_id = task["task_id"]
1040 target_vim = self.my_vims[ro_task["target_id"]]
1041
1042 sdn_net_id = ro_task["vim_info"]["vim_id"]
1043
1044 created_items = ro_task["vim_info"].get("created_items")
1045 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1046 new_connected_ports = []
1047 last_update = ro_task["vim_info"].get("last_update", 0)
1048 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1049 error_list = []
1050 created = ro_task["vim_info"].get("created", False)
1051
1052 try:
1053 # CREATE
1054 params = task["params"]
1055 vlds_to_connect = params["vlds"]
1056 associated_vim = params["target_vim"]
1057 # external additional ports
1058 additional_ports = params.get("sdn-ports") or ()
1059 _, _, vim_account_id = associated_vim.partition(":")
1060
1061 if associated_vim:
1062 # get associated VIM
1063 if associated_vim not in self.db_vims:
1064 self.db_vims[associated_vim] = self.db.get_one(
1065 "vim_accounts", {"_id": vim_account_id}
1066 )
1067
1068 db_vim = self.db_vims[associated_vim]
1069
1070 # look for ports to connect
1071 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1072 # print(ports)
1073
1074 sdn_ports = []
1075 pending_ports = error_ports = 0
1076 vlan_used = None
1077 sdn_need_update = False
1078
1079 for port in ports:
1080 vlan_used = port.get("vlan") or vlan_used
1081
1082 # TODO. Do not connect if already done
1083 if not port.get("compute_node") or not port.get("pci"):
1084 if port.get("status") == "ERROR":
1085 error_ports += 1
1086 else:
1087 pending_ports += 1
1088 continue
1089
1090 pmap = None
1091 compute_node_mappings = next(
1092 (
1093 c
1094 for c in db_vim["config"].get("sdn-port-mapping", ())
1095 if c and c["compute_node"] == port["compute_node"]
1096 ),
1097 None,
1098 )
1099
1100 if compute_node_mappings:
1101 # process port_mapping pci of type 0000:af:1[01].[1357]
1102 pmap = next(
1103 (
1104 p
1105 for p in compute_node_mappings["ports"]
1106 if self._match_pci(port["pci"], p.get("pci"))
1107 ),
1108 None,
1109 )
1110
1111 if not pmap:
1112 if not db_vim["config"].get("mapping_not_needed"):
1113 error_list.append(
1114 "Port mapping not found for compute_node={} pci={}".format(
1115 port["compute_node"], port["pci"]
1116 )
1117 )
1118 continue
1119
1120 pmap = {}
1121
1122 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1123 new_port = {
1124 "service_endpoint_id": pmap.get("service_endpoint_id")
1125 or service_endpoint_id,
1126 "service_endpoint_encapsulation_type": "dot1q"
1127 if port["type"] == "SR-IOV"
1128 else None,
1129 "service_endpoint_encapsulation_info": {
1130 "vlan": port.get("vlan"),
1131 "mac": port.get("mac-address"),
1132 "device_id": pmap.get("device_id") or port["compute_node"],
1133 "device_interface_id": pmap.get("device_interface_id")
1134 or port["pci"],
1135 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1136 "switch_port": pmap.get("switch_port"),
1137 "service_mapping_info": pmap.get("service_mapping_info"),
1138 },
1139 }
1140
1141 # TODO
1142 # if port["modified_at"] > last_update:
1143 # sdn_need_update = True
1144 new_connected_ports.append(port["id"]) # TODO
1145 sdn_ports.append(new_port)
1146
1147 if error_ports:
1148 error_list.append(
1149 "{} interfaces have not been created as VDU is on ERROR status".format(
1150 error_ports
1151 )
1152 )
1153
1154 # connect external ports
1155 for index, additional_port in enumerate(additional_ports):
1156 additional_port_id = additional_port.get(
1157 "service_endpoint_id"
1158 ) or "external-{}".format(index)
1159 sdn_ports.append(
1160 {
1161 "service_endpoint_id": additional_port_id,
1162 "service_endpoint_encapsulation_type": additional_port.get(
1163 "service_endpoint_encapsulation_type", "dot1q"
1164 ),
1165 "service_endpoint_encapsulation_info": {
1166 "vlan": additional_port.get("vlan") or vlan_used,
1167 "mac": additional_port.get("mac_address"),
1168 "device_id": additional_port.get("device_id"),
1169 "device_interface_id": additional_port.get(
1170 "device_interface_id"
1171 ),
1172 "switch_dpid": additional_port.get("switch_dpid")
1173 or additional_port.get("switch_id"),
1174 "switch_port": additional_port.get("switch_port"),
1175 "service_mapping_info": additional_port.get(
1176 "service_mapping_info"
1177 ),
1178 },
1179 }
1180 )
1181 new_connected_ports.append(additional_port_id)
1182 sdn_info = ""
1183
1184 # if there are more ports to connect or they have been modified, call create/update
1185 if error_list:
1186 sdn_status = "ERROR"
1187 sdn_info = "; ".join(error_list)
1188 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1189 last_update = time.time()
1190
1191 if not sdn_net_id:
1192 if len(sdn_ports) < 2:
1193 sdn_status = "ACTIVE"
1194
1195 if not pending_ports:
1196 self.logger.debug(
1197 "task={} {} new-sdn-net done, less than 2 ports".format(
1198 task_id, ro_task["target_id"]
1199 )
1200 )
1201 else:
1202 net_type = params.get("type") or "ELAN"
1203 (
1204 sdn_net_id,
1205 created_items,
1206 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1207 created = True
1208 self.logger.debug(
1209 "task={} {} new-sdn-net={} created={}".format(
1210 task_id, ro_task["target_id"], sdn_net_id, created
1211 )
1212 )
1213 else:
1214 created_items = target_vim.edit_connectivity_service(
1215 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1216 )
1217 created = True
1218 self.logger.debug(
1219 "task={} {} update-sdn-net={} created={}".format(
1220 task_id, ro_task["target_id"], sdn_net_id, created
1221 )
1222 )
1223
1224 connected_ports = new_connected_ports
1225 elif sdn_net_id:
1226 wim_status_dict = target_vim.get_connectivity_service_status(
1227 sdn_net_id, conn_info=created_items
1228 )
1229 sdn_status = wim_status_dict["sdn_status"]
1230
1231 if wim_status_dict.get("sdn_info"):
1232 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1233
1234 if wim_status_dict.get("error_msg"):
1235 sdn_info = wim_status_dict.get("error_msg") or ""
1236
1237 if pending_ports:
1238 if sdn_status != "ERROR":
1239 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1240 len(ports) - pending_ports, len(ports)
1241 )
1242
1243 if sdn_status == "ACTIVE":
1244 sdn_status = "BUILD"
1245
1246 ro_vim_item_update = {
1247 "vim_id": sdn_net_id,
1248 "vim_status": sdn_status,
1249 "created": created,
1250 "created_items": created_items,
1251 "connected_ports": connected_ports,
1252 "vim_details": sdn_info,
1253 "vim_message": None,
1254 "last_update": last_update,
1255 }
1256
1257 return sdn_status, ro_vim_item_update
1258 except Exception as e:
1259 self.logger.error(
1260 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1261 exc_info=not isinstance(
1262 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1263 ),
1264 )
1265 ro_vim_item_update = {
1266 "vim_status": "VIM_ERROR",
1267 "created": created,
1268 "vim_message": str(e),
1269 }
1270
1271 return "FAILED", ro_vim_item_update
1272
1273 def delete(self, ro_task, task_index):
1274 task = ro_task["tasks"][task_index]
1275 task_id = task["task_id"]
1276 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1277 ro_vim_item_update_ok = {
1278 "vim_status": "DELETED",
1279 "created": False,
1280 "vim_message": "DELETED",
1281 "vim_id": None,
1282 }
1283
1284 try:
1285 if sdn_vim_id:
1286 target_vim = self.my_vims[ro_task["target_id"]]
1287 target_vim.delete_connectivity_service(
1288 sdn_vim_id, ro_task["vim_info"].get("created_items")
1289 )
1290
1291 except Exception as e:
1292 if (
1293 isinstance(e, sdnconn.SdnConnectorError)
1294 and e.http_code == HTTPStatus.NOT_FOUND.value
1295 ):
1296 ro_vim_item_update_ok["vim_message"] = "already deleted"
1297 else:
1298 self.logger.error(
1299 "ro_task={} vim={} del-sdn-net={}: {}".format(
1300 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1301 ),
1302 exc_info=not isinstance(
1303 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1304 ),
1305 )
1306 ro_vim_item_update = {
1307 "vim_status": "VIM_ERROR",
1308 "vim_message": "Error while deleting: {}".format(e),
1309 }
1310
1311 return "FAILED", ro_vim_item_update
1312
1313 self.logger.debug(
1314 "task={} {} del-sdn-net={} {}".format(
1315 task_id,
1316 ro_task["target_id"],
1317 sdn_vim_id,
1318 ro_vim_item_update_ok.get("vim_message", ""),
1319 )
1320 )
1321
1322 return "DONE", ro_vim_item_update_ok
1323
1324
1325 class VimInteractionMigration(VimInteractionBase):
1326 def exec(self, ro_task, task_index, task_depends):
1327 task = ro_task["tasks"][task_index]
1328 task_id = task["task_id"]
1329 db_task_update = {"retries": 0}
1330 target_vim = self.my_vims[ro_task["target_id"]]
1331 vim_interfaces = []
1332 created = False
1333 created_items = {}
1334 refreshed_vim_info = {}
1335
1336 try:
1337 if task.get("params"):
1338 vim_vm_id = task["params"].get("vim_vm_id")
1339 migrate_host = task["params"].get("migrate_host")
1340 _, migrated_compute_node = target_vim.migrate_instance(
1341 vim_vm_id, migrate_host
1342 )
1343
1344 if migrated_compute_node:
1345 # When VM is migrated, vdu["vim_info"] needs to be updated
1346 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1347 ro_task["target_id"]
1348 )
1349
1350 # Refresh VM to get new vim_info
1351 vm_to_refresh_list = [vim_vm_id]
1352 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1353 refreshed_vim_info = vim_dict[vim_vm_id]
1354
1355 if refreshed_vim_info.get("interfaces"):
1356 for old_iface in vdu_old_vim_info.get("interfaces"):
1357 iface = next(
1358 (
1359 iface
1360 for iface in refreshed_vim_info["interfaces"]
1361 if old_iface["vim_interface_id"]
1362 == iface["vim_interface_id"]
1363 ),
1364 None,
1365 )
1366 vim_interfaces.append(iface)
1367
1368 ro_vim_item_update = {
1369 "vim_id": vim_vm_id,
1370 "vim_status": "ACTIVE",
1371 "created": created,
1372 "created_items": created_items,
1373 "vim_details": None,
1374 "vim_message": None,
1375 }
1376
1377 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1378 "ERROR",
1379 "VIM_ERROR",
1380 ):
1381 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1382
1383 if vim_interfaces:
1384 ro_vim_item_update["interfaces"] = vim_interfaces
1385
1386 self.logger.debug(
1387 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1388 )
1389
1390 return "DONE", ro_vim_item_update, db_task_update
1391
1392 except (vimconn.VimConnException, NsWorkerException) as e:
1393 self.logger.error(
1394 "task={} vim={} VM Migration:"
1395 " {}".format(task_id, ro_task["target_id"], e)
1396 )
1397 ro_vim_item_update = {
1398 "vim_status": "VIM_ERROR",
1399 "created": created,
1400 "vim_message": str(e),
1401 }
1402
1403 return "FAILED", ro_vim_item_update, db_task_update
1404
1405
1406 class VimInteractionResize(VimInteractionBase):
1407 def exec(self, ro_task, task_index, task_depends):
1408 task = ro_task["tasks"][task_index]
1409 task_id = task["task_id"]
1410 db_task_update = {"retries": 0}
1411 created = False
1412 target_flavor_uuid = None
1413 created_items = {}
1414 refreshed_vim_info = {}
1415 target_vim = self.my_vims[ro_task["target_id"]]
1416
1417 try:
1418 if task.get("params"):
1419 vim_vm_id = task["params"].get("vim_vm_id")
1420 flavor_dict = task["params"].get("flavor_dict")
1421 self.logger.info("flavor_dict %s", flavor_dict)
1422
1423 try:
1424 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1425 except Exception as e:
1426 self.logger.info("Cannot find any flavor matching %s.", str(e))
1427 try:
1428 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1429 except Exception as e:
1430 self.logger.error("Error creating flavor at VIM %s.", str(e))
1431
1432 if target_flavor_uuid is not None:
1433 resized_status = target_vim.resize_instance(
1434 vim_vm_id, target_flavor_uuid
1435 )
1436
1437 if resized_status:
1438 # Refresh VM to get new vim_info
1439 vm_to_refresh_list = [vim_vm_id]
1440 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1441 refreshed_vim_info = vim_dict[vim_vm_id]
1442
1443 ro_vim_item_update = {
1444 "vim_id": vim_vm_id,
1445 "vim_status": "DONE",
1446 "created": created,
1447 "created_items": created_items,
1448 "vim_details": None,
1449 "vim_message": None,
1450 }
1451
1452 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1453 "ERROR",
1454 "VIM_ERROR",
1455 ):
1456 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1457
1458 self.logger.debug(
1459 "task={} {} resize done".format(task_id, ro_task["target_id"])
1460 )
1461 return "DONE", ro_vim_item_update, db_task_update
1462 except (vimconn.VimConnException, NsWorkerException) as e:
1463 self.logger.error(
1464 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1465 )
1466 ro_vim_item_update = {
1467 "vim_status": "VIM_ERROR",
1468 "created": created,
1469 "vim_message": str(e),
1470 }
1471
1472 return "FAILED", ro_vim_item_update, db_task_update
1473
1474
1475 class NsWorker(threading.Thread):
1476 REFRESH_BUILD = 5 # 5 seconds
1477 REFRESH_ACTIVE = 60 # 1 minute
1478 REFRESH_ERROR = 600
1479 REFRESH_IMAGE = 3600 * 10
1480 REFRESH_DELETE = 3600 * 10
1481 QUEUE_SIZE = 100
1482 terminate = False
1483
1484 def __init__(self, worker_index, config, plugins, db):
1485 """
1486
1487 :param worker_index: thread index
1488 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1489 :param plugins: global shared dict with the loaded plugins
1490 :param db: database class instance to use
1491 """
1492 threading.Thread.__init__(self)
1493 self.config = config
1494 self.plugins = plugins
1495 self.plugin_name = "unknown"
1496 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1497 self.worker_index = worker_index
1498 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1499 # targetvim: vimplugin class
1500 self.my_vims = {}
1501 # targetvim: vim information from database
1502 self.db_vims = {}
1503 # targetvim list
1504 self.vim_targets = []
1505 self.my_id = config["process_id"] + ":" + str(worker_index)
1506 self.db = db
1507 self.item2class = {
1508 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1509 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1510 "image": VimInteractionImage(
1511 self.db, self.my_vims, self.db_vims, self.logger
1512 ),
1513 "flavor": VimInteractionFlavor(
1514 self.db, self.my_vims, self.db_vims, self.logger
1515 ),
1516 "sdn_net": VimInteractionSdnNet(
1517 self.db, self.my_vims, self.db_vims, self.logger
1518 ),
1519 "update": VimInteractionUpdateVdu(
1520 self.db, self.my_vims, self.db_vims, self.logger
1521 ),
1522 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1523 self.db, self.my_vims, self.db_vims, self.logger
1524 ),
1525 "migrate": VimInteractionMigration(
1526 self.db, self.my_vims, self.db_vims, self.logger
1527 ),
1528 "verticalscale": VimInteractionResize(
1529 self.db, self.my_vims, self.db_vims, self.logger
1530 ),
1531 }
1532 self.time_last_task_processed = None
1533 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1534 self.tasks_to_delete = []
1535 # it is idle when there are not vim_targets associated
1536 self.idle = True
1537 self.task_locked_time = config["global"]["task_locked_time"]
1538
1539 def insert_task(self, task):
1540 try:
1541 self.task_queue.put(task, False)
1542 return None
1543 except queue.Full:
1544 raise NsWorkerException("timeout inserting a task")
1545
1546 def terminate(self):
1547 self.insert_task("exit")
1548
1549 def del_task(self, task):
1550 with self.task_lock:
1551 if task["status"] == "SCHEDULED":
1552 task["status"] = "SUPERSEDED"
1553 return True
1554 else: # task["status"] == "processing"
1555 self.task_lock.release()
1556 return False
1557
1558 def _process_vim_config(self, target_id, db_vim):
1559 """
1560 Process vim config, creating vim configuration files as ca_cert
1561 :param target_id: vim/sdn/wim + id
1562 :param db_vim: Vim dictionary obtained from database
1563 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1564 """
1565 if not db_vim.get("config"):
1566 return
1567
1568 file_name = ""
1569
1570 try:
1571 if db_vim["config"].get("ca_cert_content"):
1572 file_name = "{}:{}".format(target_id, self.worker_index)
1573
1574 try:
1575 mkdir(file_name)
1576 except FileExistsError:
1577 pass
1578
1579 file_name = file_name + "/ca_cert"
1580
1581 with open(file_name, "w") as f:
1582 f.write(db_vim["config"]["ca_cert_content"])
1583 del db_vim["config"]["ca_cert_content"]
1584 db_vim["config"]["ca_cert"] = file_name
1585 except Exception as e:
1586 raise NsWorkerException(
1587 "Error writing to file '{}': {}".format(file_name, e)
1588 )
1589
1590 def _load_plugin(self, name, type="vim"):
1591 # type can be vim or sdn
1592 if "rovim_dummy" not in self.plugins:
1593 self.plugins["rovim_dummy"] = VimDummyConnector
1594
1595 if "rosdn_dummy" not in self.plugins:
1596 self.plugins["rosdn_dummy"] = SdnDummyConnector
1597
1598 if name in self.plugins:
1599 return self.plugins[name]
1600
1601 try:
1602 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1603 self.plugins[name] = ep.load()
1604 except Exception as e:
1605 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1606
1607 if name and name not in self.plugins:
1608 raise NsWorkerException(
1609 "Plugin 'osm_{n}' has not been installed".format(n=name)
1610 )
1611
1612 return self.plugins[name]
1613
1614 def _unload_vim(self, target_id):
1615 """
1616 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1617 :param target_id: Contains type:_id; where type can be 'vim', ...
1618 :return: None.
1619 """
1620 try:
1621 self.db_vims.pop(target_id, None)
1622 self.my_vims.pop(target_id, None)
1623
1624 if target_id in self.vim_targets:
1625 self.vim_targets.remove(target_id)
1626
1627 self.logger.info("Unloaded {}".format(target_id))
1628 rmtree("{}:{}".format(target_id, self.worker_index))
1629 except FileNotFoundError:
1630 pass # this is raised by rmtree if folder does not exist
1631 except Exception as e:
1632 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1633
1634 def _check_vim(self, target_id):
1635 """
1636 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1637 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1638 :return: None.
1639 """
1640 target, _, _id = target_id.partition(":")
1641 now = time.time()
1642 update_dict = {}
1643 unset_dict = {}
1644 op_text = ""
1645 step = ""
1646 loaded = target_id in self.vim_targets
1647 target_database = (
1648 "vim_accounts"
1649 if target == "vim"
1650 else "wim_accounts"
1651 if target == "wim"
1652 else "sdns"
1653 )
1654
1655 try:
1656 step = "Getting {} from db".format(target_id)
1657 db_vim = self.db.get_one(target_database, {"_id": _id})
1658
1659 for op_index, operation in enumerate(
1660 db_vim["_admin"].get("operations", ())
1661 ):
1662 if operation["operationState"] != "PROCESSING":
1663 continue
1664
1665 locked_at = operation.get("locked_at")
1666
1667 if locked_at is not None and locked_at >= now - self.task_locked_time:
1668 # some other thread is doing this operation
1669 return
1670
1671 # lock
1672 op_text = "_admin.operations.{}.".format(op_index)
1673
1674 if not self.db.set_one(
1675 target_database,
1676 q_filter={
1677 "_id": _id,
1678 op_text + "operationState": "PROCESSING",
1679 op_text + "locked_at": locked_at,
1680 },
1681 update_dict={
1682 op_text + "locked_at": now,
1683 "admin.current_operation": op_index,
1684 },
1685 fail_on_empty=False,
1686 ):
1687 return
1688
1689 unset_dict[op_text + "locked_at"] = None
1690 unset_dict["current_operation"] = None
1691 step = "Loading " + target_id
1692 error_text = self._load_vim(target_id)
1693
1694 if not error_text:
1695 step = "Checking connectivity"
1696
1697 if target == "vim":
1698 self.my_vims[target_id].check_vim_connectivity()
1699 else:
1700 self.my_vims[target_id].check_credentials()
1701
1702 update_dict["_admin.operationalState"] = "ENABLED"
1703 update_dict["_admin.detailed-status"] = ""
1704 unset_dict[op_text + "detailed-status"] = None
1705 update_dict[op_text + "operationState"] = "COMPLETED"
1706
1707 return
1708
1709 except Exception as e:
1710 error_text = "{}: {}".format(step, e)
1711 self.logger.error("{} for {}: {}".format(step, target_id, e))
1712
1713 finally:
1714 if update_dict or unset_dict:
1715 if error_text:
1716 update_dict[op_text + "operationState"] = "FAILED"
1717 update_dict[op_text + "detailed-status"] = error_text
1718 unset_dict.pop(op_text + "detailed-status", None)
1719 update_dict["_admin.operationalState"] = "ERROR"
1720 update_dict["_admin.detailed-status"] = error_text
1721
1722 if op_text:
1723 update_dict[op_text + "statusEnteredTime"] = now
1724
1725 self.db.set_one(
1726 target_database,
1727 q_filter={"_id": _id},
1728 update_dict=update_dict,
1729 unset=unset_dict,
1730 fail_on_empty=False,
1731 )
1732
1733 if not loaded:
1734 self._unload_vim(target_id)
1735
1736 def _reload_vim(self, target_id):
1737 if target_id in self.vim_targets:
1738 self._load_vim(target_id)
1739 else:
1740 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1741 # just remove it to force load again next time it is needed
1742 self.db_vims.pop(target_id, None)
1743
1744 def _load_vim(self, target_id):
1745 """
1746 Load or reload a vim_account, sdn_controller or wim_account.
1747 Read content from database, load the plugin if not loaded.
1748 In case of error loading the plugin, it load a failing VIM_connector
1749 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1750 :param target_id: Contains type:_id; where type can be 'vim', ...
1751 :return: None if ok, descriptive text if error
1752 """
1753 target, _, _id = target_id.partition(":")
1754 target_database = (
1755 "vim_accounts"
1756 if target == "vim"
1757 else "wim_accounts"
1758 if target == "wim"
1759 else "sdns"
1760 )
1761 plugin_name = ""
1762 vim = None
1763
1764 try:
1765 step = "Getting {}={} from db".format(target, _id)
1766 # TODO process for wim, sdnc, ...
1767 vim = self.db.get_one(target_database, {"_id": _id})
1768
1769 # if deep_get(vim, "config", "sdn-controller"):
1770 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1771 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1772
1773 step = "Decrypting password"
1774 schema_version = vim.get("schema_version")
1775 self.db.encrypt_decrypt_fields(
1776 vim,
1777 "decrypt",
1778 fields=("password", "secret"),
1779 schema_version=schema_version,
1780 salt=_id,
1781 )
1782 self._process_vim_config(target_id, vim)
1783
1784 if target == "vim":
1785 plugin_name = "rovim_" + vim["vim_type"]
1786 step = "Loading plugin '{}'".format(plugin_name)
1787 vim_module_conn = self._load_plugin(plugin_name)
1788 step = "Loading {}'".format(target_id)
1789 self.my_vims[target_id] = vim_module_conn(
1790 uuid=vim["_id"],
1791 name=vim["name"],
1792 tenant_id=vim.get("vim_tenant_id"),
1793 tenant_name=vim.get("vim_tenant_name"),
1794 url=vim["vim_url"],
1795 url_admin=None,
1796 user=vim["vim_user"],
1797 passwd=vim["vim_password"],
1798 config=vim.get("config") or {},
1799 persistent_info={},
1800 )
1801 else: # sdn
1802 plugin_name = "rosdn_" + vim["type"]
1803 step = "Loading plugin '{}'".format(plugin_name)
1804 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1805 step = "Loading {}'".format(target_id)
1806 wim = deepcopy(vim)
1807 wim_config = wim.pop("config", {}) or {}
1808 wim["uuid"] = wim["_id"]
1809 wim["wim_url"] = wim["url"]
1810
1811 if wim.get("dpid"):
1812 wim_config["dpid"] = wim.pop("dpid")
1813
1814 if wim.get("switch_id"):
1815 wim_config["switch_id"] = wim.pop("switch_id")
1816
1817 # wim, wim_account, config
1818 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1819 self.db_vims[target_id] = vim
1820 self.error_status = None
1821
1822 self.logger.info(
1823 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1824 )
1825 except Exception as e:
1826 self.logger.error(
1827 "Cannot load {} plugin={}: {} {}".format(
1828 target_id, plugin_name, step, e
1829 )
1830 )
1831
1832 self.db_vims[target_id] = vim or {}
1833 self.db_vims[target_id] = FailingConnector(str(e))
1834 error_status = "{} Error: {}".format(step, e)
1835
1836 return error_status
1837 finally:
1838 if target_id not in self.vim_targets:
1839 self.vim_targets.append(target_id)
1840
1841 def _get_db_task(self):
1842 """
1843 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1844 :return: None
1845 """
1846 now = time.time()
1847
1848 if not self.time_last_task_processed:
1849 self.time_last_task_processed = now
1850
1851 try:
1852 while True:
1853 """
1854 # Log RO tasks only when loglevel is DEBUG
1855 if self.logger.getEffectiveLevel() == logging.DEBUG:
1856 self._log_ro_task(
1857 None,
1858 None,
1859 None,
1860 "TASK_WF",
1861 "task_locked_time="
1862 + str(self.task_locked_time)
1863 + " "
1864 + "time_last_task_processed="
1865 + str(self.time_last_task_processed)
1866 + " "
1867 + "now="
1868 + str(now),
1869 )
1870 """
1871 locked = self.db.set_one(
1872 "ro_tasks",
1873 q_filter={
1874 "target_id": self.vim_targets,
1875 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1876 "locked_at.lt": now - self.task_locked_time,
1877 "to_check_at.lt": self.time_last_task_processed,
1878 },
1879 update_dict={"locked_by": self.my_id, "locked_at": now},
1880 fail_on_empty=False,
1881 )
1882
1883 if locked:
1884 # read and return
1885 ro_task = self.db.get_one(
1886 "ro_tasks",
1887 q_filter={
1888 "target_id": self.vim_targets,
1889 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1890 "locked_at": now,
1891 },
1892 )
1893 return ro_task
1894
1895 if self.time_last_task_processed == now:
1896 self.time_last_task_processed = None
1897 return None
1898 else:
1899 self.time_last_task_processed = now
1900 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1901
1902 except DbException as e:
1903 self.logger.error("Database exception at _get_db_task: {}".format(e))
1904 except Exception as e:
1905 self.logger.critical(
1906 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1907 )
1908
1909 return None
1910
1911 def _get_db_all_tasks(self):
1912 """
1913 Read all content of table ro_tasks to log it
1914 :return: None
1915 """
1916 try:
1917 # Checking the content of the BD:
1918
1919 # read and return
1920 ro_task = self.db.get_list("ro_tasks")
1921 for rt in ro_task:
1922 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1923 return ro_task
1924
1925 except DbException as e:
1926 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1927 except Exception as e:
1928 self.logger.critical(
1929 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1930 )
1931
1932 return None
1933
1934 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1935 """
1936 Generate a log with the following format:
1937
1938 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1939 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1940 task_array_index;task_id;task_action;task_item;task_args
1941
1942 Example:
1943
1944 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1945 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1946 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1947 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1948 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1949 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1950 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1951 """
1952 try:
1953 line = []
1954 i = 0
1955 if ro_task is not None and isinstance(ro_task, dict):
1956 for t in ro_task["tasks"]:
1957 line.clear()
1958 line.append(mark)
1959 line.append(event)
1960 line.append(ro_task.get("_id", ""))
1961 line.append(str(ro_task.get("locked_at", "")))
1962 line.append(str(ro_task.get("modified_at", "")))
1963 line.append(str(ro_task.get("created_at", "")))
1964 line.append(str(ro_task.get("to_check_at", "")))
1965 line.append(str(ro_task.get("locked_by", "")))
1966 line.append(str(ro_task.get("target_id", "")))
1967 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1968 line.append(str(ro_task.get("vim_info", "")))
1969 line.append(str(ro_task.get("tasks", "")))
1970 if isinstance(t, dict):
1971 line.append(str(t.get("status", "")))
1972 line.append(str(t.get("action_id", "")))
1973 line.append(str(i))
1974 line.append(str(t.get("task_id", "")))
1975 line.append(str(t.get("action", "")))
1976 line.append(str(t.get("item", "")))
1977 line.append(str(t.get("find_params", "")))
1978 line.append(str(t.get("params", "")))
1979 else:
1980 line.extend([""] * 2)
1981 line.append(str(i))
1982 line.extend([""] * 5)
1983
1984 i += 1
1985 self.logger.debug(";".join(line))
1986 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1987 i = 0
1988 while True:
1989 st = "tasks.{}.status".format(i)
1990 if st not in db_ro_task_update:
1991 break
1992 line.clear()
1993 line.append(mark)
1994 line.append(event)
1995 line.append(db_ro_task_update.get("_id", ""))
1996 line.append(str(db_ro_task_update.get("locked_at", "")))
1997 line.append(str(db_ro_task_update.get("modified_at", "")))
1998 line.append("")
1999 line.append(str(db_ro_task_update.get("to_check_at", "")))
2000 line.append(str(db_ro_task_update.get("locked_by", "")))
2001 line.append("")
2002 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
2003 line.append("")
2004 line.append(str(db_ro_task_update.get("vim_info", "")))
2005 line.append(str(str(db_ro_task_update).count(".status")))
2006 line.append(db_ro_task_update.get(st, ""))
2007 line.append("")
2008 line.append(str(i))
2009 line.extend([""] * 3)
2010 i += 1
2011 self.logger.debug(";".join(line))
2012
2013 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
2014 line.clear()
2015 line.append(mark)
2016 line.append(event)
2017 line.append(db_ro_task_delete.get("_id", ""))
2018 line.append("")
2019 line.append(db_ro_task_delete.get("modified_at", ""))
2020 line.extend([""] * 13)
2021 self.logger.debug(";".join(line))
2022
2023 else:
2024 line.clear()
2025 line.append(mark)
2026 line.append(event)
2027 line.extend([""] * 16)
2028 self.logger.debug(";".join(line))
2029
2030 except Exception as e:
2031 self.logger.error("Error logging ro_task: {}".format(e))
2032
2033 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2034 """
2035 Determine if this task need to be done or superseded
2036 :return: None
2037 """
2038 my_task = ro_task["tasks"][task_index]
2039 task_id = my_task["task_id"]
2040 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2041 "created_items", False
2042 )
2043
2044 self.logger.warning("Needed delete: {}".format(needed_delete))
2045 if my_task["status"] == "FAILED":
2046 return None, None # TODO need to be retry??
2047
2048 try:
2049 for index, task in enumerate(ro_task["tasks"]):
2050 if index == task_index or not task:
2051 continue # own task
2052
2053 if (
2054 my_task["target_record"] == task["target_record"]
2055 and task["action"] == "CREATE"
2056 ):
2057 # set to finished
2058 db_update["tasks.{}.status".format(index)] = task[
2059 "status"
2060 ] = "FINISHED"
2061 elif task["action"] == "CREATE" and task["status"] not in (
2062 "FINISHED",
2063 "SUPERSEDED",
2064 ):
2065 needed_delete = False
2066
2067 if needed_delete:
2068 self.logger.warning(
2069 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2070 )
2071 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2072 else:
2073 return "SUPERSEDED", None
2074 except Exception as e:
2075 if not isinstance(e, NsWorkerException):
2076 self.logger.critical(
2077 "Unexpected exception at _delete_task task={}: {}".format(
2078 task_id, e
2079 ),
2080 exc_info=True,
2081 )
2082
2083 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2084
2085 def _create_task(self, ro_task, task_index, task_depends, db_update):
2086 """
2087 Determine if this task need to create something at VIM
2088 :return: None
2089 """
2090 my_task = ro_task["tasks"][task_index]
2091 task_id = my_task["task_id"]
2092 task_status = None
2093
2094 if my_task["status"] == "FAILED":
2095 return None, None # TODO need to be retry??
2096 elif my_task["status"] == "SCHEDULED":
2097 # check if already created by another task
2098 for index, task in enumerate(ro_task["tasks"]):
2099 if index == task_index or not task:
2100 continue # own task
2101
2102 if task["action"] == "CREATE" and task["status"] not in (
2103 "SCHEDULED",
2104 "FINISHED",
2105 "SUPERSEDED",
2106 ):
2107 return task["status"], "COPY_VIM_INFO"
2108
2109 try:
2110 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2111 ro_task, task_index, task_depends
2112 )
2113 # TODO update other CREATE tasks
2114 except Exception as e:
2115 if not isinstance(e, NsWorkerException):
2116 self.logger.error(
2117 "Error executing task={}: {}".format(task_id, e), exc_info=True
2118 )
2119
2120 task_status = "FAILED"
2121 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2122 # TODO update ro_vim_item_update
2123
2124 return task_status, ro_vim_item_update
2125 else:
2126 return None, None
2127
2128 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2129 """
2130 Look for dependency task
2131 :param task_id: Can be one of
2132 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2133 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2134 3. task.task_id: "<action_id>:number"
2135 :param ro_task:
2136 :param target_id:
2137 :return: database ro_task plus index of task
2138 """
2139 if (
2140 task_id.startswith("vim:")
2141 or task_id.startswith("sdn:")
2142 or task_id.startswith("wim:")
2143 ):
2144 target_id, _, task_id = task_id.partition(" ")
2145
2146 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2147 ro_task_dependency = self.db.get_one(
2148 "ro_tasks",
2149 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2150 fail_on_empty=False,
2151 )
2152
2153 if ro_task_dependency:
2154 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2155 if task["target_record_id"] == task_id:
2156 return ro_task_dependency, task_index
2157
2158 else:
2159 if ro_task:
2160 for task_index, task in enumerate(ro_task["tasks"]):
2161 if task and task["task_id"] == task_id:
2162 return ro_task, task_index
2163
2164 ro_task_dependency = self.db.get_one(
2165 "ro_tasks",
2166 q_filter={
2167 "tasks.ANYINDEX.task_id": task_id,
2168 "tasks.ANYINDEX.target_record.ne": None,
2169 },
2170 fail_on_empty=False,
2171 )
2172
2173 self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
2174 if ro_task_dependency:
2175 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2176 if task["task_id"] == task_id:
2177 return ro_task_dependency, task_index
2178 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2179
2180 def _process_pending_tasks(self, ro_task):
2181 ro_task_id = ro_task["_id"]
2182 now = time.time()
2183 # one day
2184 next_check_at = now + (24 * 60 * 60)
2185 db_ro_task_update = {}
2186
2187 def _update_refresh(new_status):
2188 # compute next_refresh
2189 nonlocal task
2190 nonlocal next_check_at
2191 nonlocal db_ro_task_update
2192 nonlocal ro_task
2193
2194 next_refresh = time.time()
2195
2196 if task["item"] in ("image", "flavor"):
2197 next_refresh += self.REFRESH_IMAGE
2198 elif new_status == "BUILD":
2199 next_refresh += self.REFRESH_BUILD
2200 elif new_status == "DONE":
2201 next_refresh += self.REFRESH_ACTIVE
2202 else:
2203 next_refresh += self.REFRESH_ERROR
2204
2205 next_check_at = min(next_check_at, next_refresh)
2206 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2207 ro_task["vim_info"]["refresh_at"] = next_refresh
2208
2209 try:
2210 """
2211 # Log RO tasks only when loglevel is DEBUG
2212 if self.logger.getEffectiveLevel() == logging.DEBUG:
2213 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2214 """
2215 # 0: get task_status_create
2216 lock_object = None
2217 task_status_create = None
2218 task_create = next(
2219 (
2220 t
2221 for t in ro_task["tasks"]
2222 if t
2223 and t["action"] == "CREATE"
2224 and t["status"] in ("BUILD", "DONE")
2225 ),
2226 None,
2227 )
2228
2229 if task_create:
2230 task_status_create = task_create["status"]
2231
2232 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2233 for task_action in ("DELETE", "CREATE", "EXEC"):
2234 db_vim_update = None
2235 new_status = None
2236
2237 for task_index, task in enumerate(ro_task["tasks"]):
2238 if not task:
2239 continue # task deleted
2240
2241 task_depends = {}
2242 target_update = None
2243
2244 if (
2245 (
2246 task_action in ("DELETE", "EXEC")
2247 and task["status"] not in ("SCHEDULED", "BUILD")
2248 )
2249 or task["action"] != task_action
2250 or (
2251 task_action == "CREATE"
2252 and task["status"] in ("FINISHED", "SUPERSEDED")
2253 )
2254 ):
2255 continue
2256
2257 task_path = "tasks.{}.status".format(task_index)
2258 try:
2259 db_vim_info_update = None
2260
2261 if task["status"] == "SCHEDULED":
2262 # check if tasks that this depends on have been completed
2263 dependency_not_completed = False
2264
2265 for dependency_task_id in task.get("depends_on") or ():
2266 (
2267 dependency_ro_task,
2268 dependency_task_index,
2269 ) = self._get_dependency(
2270 dependency_task_id, target_id=ro_task["target_id"]
2271 )
2272 dependency_task = dependency_ro_task["tasks"][
2273 dependency_task_index
2274 ]
2275 self.logger.warning(
2276 "dependency_ro_task={} dependency_task_index={}".format(
2277 dependency_ro_task, dependency_task_index
2278 )
2279 )
2280
2281 if dependency_task["status"] == "SCHEDULED":
2282 dependency_not_completed = True
2283 next_check_at = min(
2284 next_check_at, dependency_ro_task["to_check_at"]
2285 )
2286 # must allow dependent task to be processed first
2287 # to do this set time after last_task_processed
2288 next_check_at = max(
2289 self.time_last_task_processed, next_check_at
2290 )
2291 break
2292 elif dependency_task["status"] == "FAILED":
2293 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2294 task["action"],
2295 task["item"],
2296 dependency_task["action"],
2297 dependency_task["item"],
2298 dependency_task_id,
2299 dependency_ro_task["vim_info"].get(
2300 "vim_message"
2301 ),
2302 )
2303 self.logger.error(
2304 "task={} {}".format(task["task_id"], error_text)
2305 )
2306 raise NsWorkerException(error_text)
2307
2308 task_depends[dependency_task_id] = dependency_ro_task[
2309 "vim_info"
2310 ]["vim_id"]
2311 task_depends[
2312 "TASK-{}".format(dependency_task_id)
2313 ] = dependency_ro_task["vim_info"]["vim_id"]
2314
2315 if dependency_not_completed:
2316 self.logger.warning(
2317 "DEPENDENCY NOT COMPLETED {}".format(
2318 dependency_ro_task["vim_info"]["vim_id"]
2319 )
2320 )
2321 # TODO set at vim_info.vim_details that it is waiting
2322 continue
2323
2324 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2325 # the task of renew this locking. It will update database locket_at periodically
2326 if not lock_object:
2327 lock_object = LockRenew.add_lock_object(
2328 "ro_tasks", ro_task, self
2329 )
2330
2331 if task["action"] == "DELETE":
2332 (new_status, db_vim_info_update,) = self._delete_task(
2333 ro_task, task_index, task_depends, db_ro_task_update
2334 )
2335 new_status = (
2336 "FINISHED" if new_status == "DONE" else new_status
2337 )
2338 # ^with FINISHED instead of DONE it will not be refreshing
2339
2340 if new_status in ("FINISHED", "SUPERSEDED"):
2341 target_update = "DELETE"
2342 elif task["action"] == "EXEC":
2343 (
2344 new_status,
2345 db_vim_info_update,
2346 db_task_update,
2347 ) = self.item2class[task["item"]].exec(
2348 ro_task, task_index, task_depends
2349 )
2350 new_status = (
2351 "FINISHED" if new_status == "DONE" else new_status
2352 )
2353 # ^with FINISHED instead of DONE it will not be refreshing
2354
2355 if db_task_update:
2356 # load into database the modified db_task_update "retries" and "next_retry"
2357 if db_task_update.get("retries"):
2358 db_ro_task_update[
2359 "tasks.{}.retries".format(task_index)
2360 ] = db_task_update["retries"]
2361
2362 next_check_at = time.time() + db_task_update.get(
2363 "next_retry", 60
2364 )
2365 target_update = None
2366 elif task["action"] == "CREATE":
2367 if task["status"] == "SCHEDULED":
2368 if task_status_create:
2369 new_status = task_status_create
2370 target_update = "COPY_VIM_INFO"
2371 else:
2372 new_status, db_vim_info_update = self.item2class[
2373 task["item"]
2374 ].new(ro_task, task_index, task_depends)
2375 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2376 _update_refresh(new_status)
2377 else:
2378 if (
2379 ro_task["vim_info"]["refresh_at"]
2380 and now > ro_task["vim_info"]["refresh_at"]
2381 ):
2382 new_status, db_vim_info_update = self.item2class[
2383 task["item"]
2384 ].refresh(ro_task)
2385 _update_refresh(new_status)
2386 else:
2387 # The refresh is updated to avoid set the value of "refresh_at" to
2388 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2389 # because it can happen that in this case the task is never processed
2390 _update_refresh(task["status"])
2391
2392 except Exception as e:
2393 new_status = "FAILED"
2394 db_vim_info_update = {
2395 "vim_status": "VIM_ERROR",
2396 "vim_message": str(e),
2397 }
2398
2399 if not isinstance(
2400 e, (NsWorkerException, vimconn.VimConnException)
2401 ):
2402 self.logger.error(
2403 "Unexpected exception at _delete_task task={}: {}".format(
2404 task["task_id"], e
2405 ),
2406 exc_info=True,
2407 )
2408
2409 try:
2410 if db_vim_info_update:
2411 db_vim_update = db_vim_info_update.copy()
2412 db_ro_task_update.update(
2413 {
2414 "vim_info." + k: v
2415 for k, v in db_vim_info_update.items()
2416 }
2417 )
2418 ro_task["vim_info"].update(db_vim_info_update)
2419
2420 if new_status:
2421 if task_action == "CREATE":
2422 task_status_create = new_status
2423 db_ro_task_update[task_path] = new_status
2424
2425 if target_update or db_vim_update:
2426 if target_update == "DELETE":
2427 self._update_target(task, None)
2428 elif target_update == "COPY_VIM_INFO":
2429 self._update_target(task, ro_task["vim_info"])
2430 else:
2431 self._update_target(task, db_vim_update)
2432
2433 except Exception as e:
2434 if (
2435 isinstance(e, DbException)
2436 and e.http_code == HTTPStatus.NOT_FOUND
2437 ):
2438 # if the vnfrs or nsrs has been removed from database, this task must be removed
2439 self.logger.debug(
2440 "marking to delete task={}".format(task["task_id"])
2441 )
2442 self.tasks_to_delete.append(task)
2443 else:
2444 self.logger.error(
2445 "Unexpected exception at _update_target task={}: {}".format(
2446 task["task_id"], e
2447 ),
2448 exc_info=True,
2449 )
2450
2451 locked_at = ro_task["locked_at"]
2452
2453 if lock_object:
2454 locked_at = [
2455 lock_object["locked_at"],
2456 lock_object["locked_at"] + self.task_locked_time,
2457 ]
2458 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2459 # contain exactly locked_at + self.task_locked_time
2460 LockRenew.remove_lock_object(lock_object)
2461
2462 q_filter = {
2463 "_id": ro_task["_id"],
2464 "to_check_at": ro_task["to_check_at"],
2465 "locked_at": locked_at,
2466 }
2467 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2468 # outside this task (by ro_nbi) do not update it
2469 db_ro_task_update["locked_by"] = None
2470 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2471 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2472 db_ro_task_update["modified_at"] = now
2473 db_ro_task_update["to_check_at"] = next_check_at
2474
2475 """
2476 # Log RO tasks only when loglevel is DEBUG
2477 if self.logger.getEffectiveLevel() == logging.DEBUG:
2478 db_ro_task_update_log = db_ro_task_update.copy()
2479 db_ro_task_update_log["_id"] = q_filter["_id"]
2480 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2481 """
2482
2483 if not self.db.set_one(
2484 "ro_tasks",
2485 update_dict=db_ro_task_update,
2486 q_filter=q_filter,
2487 fail_on_empty=False,
2488 ):
2489 del db_ro_task_update["to_check_at"]
2490 del q_filter["to_check_at"]
2491 """
2492 # Log RO tasks only when loglevel is DEBUG
2493 if self.logger.getEffectiveLevel() == logging.DEBUG:
2494 self._log_ro_task(
2495 None,
2496 db_ro_task_update_log,
2497 None,
2498 "TASK_WF",
2499 "SET_TASK " + str(q_filter),
2500 )
2501 """
2502 self.db.set_one(
2503 "ro_tasks",
2504 q_filter=q_filter,
2505 update_dict=db_ro_task_update,
2506 fail_on_empty=True,
2507 )
2508 except DbException as e:
2509 self.logger.error(
2510 "ro_task={} Error updating database {}".format(ro_task_id, e)
2511 )
2512 except Exception as e:
2513 self.logger.error(
2514 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2515 )
2516
2517 def _update_target(self, task, ro_vim_item_update):
2518 table, _, temp = task["target_record"].partition(":")
2519 _id, _, path_vim_status = temp.partition(":")
2520 path_item = path_vim_status[: path_vim_status.rfind(".")]
2521 path_item = path_item[: path_item.rfind(".")]
2522 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2523 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2524
2525 if ro_vim_item_update:
2526 update_dict = {
2527 path_vim_status + "." + k: v
2528 for k, v in ro_vim_item_update.items()
2529 if k
2530 in (
2531 "vim_id",
2532 "vim_details",
2533 "vim_message",
2534 "vim_name",
2535 "vim_status",
2536 "interfaces",
2537 "interfaces_backup",
2538 )
2539 }
2540
2541 if path_vim_status.startswith("vdur."):
2542 # for backward compatibility, add vdur.name apart from vdur.vim_name
2543 if ro_vim_item_update.get("vim_name"):
2544 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2545
2546 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2547 if ro_vim_item_update.get("vim_id"):
2548 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2549
2550 # update general status
2551 if ro_vim_item_update.get("vim_status"):
2552 update_dict[path_item + ".status"] = ro_vim_item_update[
2553 "vim_status"
2554 ]
2555
2556 if ro_vim_item_update.get("interfaces"):
2557 path_interfaces = path_item + ".interfaces"
2558
2559 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2560 if iface:
2561 update_dict.update(
2562 {
2563 path_interfaces + ".{}.".format(i) + k: v
2564 for k, v in iface.items()
2565 if k in ("vlan", "compute_node", "pci")
2566 }
2567 )
2568
2569 # put ip_address and mac_address with ip-address and mac-address
2570 if iface.get("ip_address"):
2571 update_dict[
2572 path_interfaces + ".{}.".format(i) + "ip-address"
2573 ] = iface["ip_address"]
2574
2575 if iface.get("mac_address"):
2576 update_dict[
2577 path_interfaces + ".{}.".format(i) + "mac-address"
2578 ] = iface["mac_address"]
2579
2580 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2581 update_dict["ip-address"] = iface.get("ip_address").split(
2582 ";"
2583 )[0]
2584
2585 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2586 update_dict[path_item + ".ip-address"] = iface.get(
2587 "ip_address"
2588 ).split(";")[0]
2589
2590 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2591
2592 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2593 if ro_vim_item_update.get("interfaces"):
2594 search_key = path_vim_status + ".interfaces"
2595 if update_dict.get(search_key):
2596 interfaces_backup_update = {
2597 path_vim_status + ".interfaces_backup": update_dict[search_key]
2598 }
2599
2600 self.db.set_one(
2601 table,
2602 q_filter={"_id": _id},
2603 update_dict=interfaces_backup_update,
2604 )
2605
2606 else:
2607 update_dict = {path_item + ".status": "DELETED"}
2608 self.db.set_one(
2609 table,
2610 q_filter={"_id": _id},
2611 update_dict=update_dict,
2612 unset={path_vim_status: None},
2613 )
2614
2615 def _process_delete_db_tasks(self):
2616 """
2617 Delete task from database because vnfrs or nsrs or both have been deleted
2618 :return: None. Uses and modify self.tasks_to_delete
2619 """
2620 while self.tasks_to_delete:
2621 task = self.tasks_to_delete[0]
2622 vnfrs_deleted = None
2623 nsr_id = task["nsr_id"]
2624
2625 if task["target_record"].startswith("vnfrs:"):
2626 # check if nsrs is present
2627 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2628 vnfrs_deleted = task["target_record"].split(":")[1]
2629
2630 try:
2631 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2632 except Exception as e:
2633 self.logger.error(
2634 "Error deleting task={}: {}".format(task["task_id"], e)
2635 )
2636 self.tasks_to_delete.pop(0)
2637
2638 @staticmethod
2639 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2640 """
2641 Static method because it is called from osm_ng_ro.ns
2642 :param db: instance of database to use
2643 :param nsr_id: affected nsrs id
2644 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2645 :return: None, exception is fails
2646 """
2647 retries = 5
2648 for retry in range(retries):
2649 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2650 now = time.time()
2651 conflict = False
2652
2653 for ro_task in ro_tasks:
2654 db_update = {}
2655 to_delete_ro_task = True
2656
2657 for index, task in enumerate(ro_task["tasks"]):
2658 if not task:
2659 pass
2660 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2661 vnfrs_deleted
2662 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2663 ):
2664 db_update["tasks.{}".format(index)] = None
2665 else:
2666 # used by other nsr, ro_task cannot be deleted
2667 to_delete_ro_task = False
2668
2669 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2670 if to_delete_ro_task:
2671 if not db.del_one(
2672 "ro_tasks",
2673 q_filter={
2674 "_id": ro_task["_id"],
2675 "modified_at": ro_task["modified_at"],
2676 },
2677 fail_on_empty=False,
2678 ):
2679 conflict = True
2680 elif db_update:
2681 db_update["modified_at"] = now
2682 if not db.set_one(
2683 "ro_tasks",
2684 q_filter={
2685 "_id": ro_task["_id"],
2686 "modified_at": ro_task["modified_at"],
2687 },
2688 update_dict=db_update,
2689 fail_on_empty=False,
2690 ):
2691 conflict = True
2692 if not conflict:
2693 return
2694 else:
2695 raise NsWorkerException("Exceeded {} retries".format(retries))
2696
2697 def run(self):
2698 # load database
2699 self.logger.info("Starting")
2700 while True:
2701 # step 1: get commands from queue
2702 try:
2703 if self.vim_targets:
2704 task = self.task_queue.get(block=False)
2705 else:
2706 if not self.idle:
2707 self.logger.debug("enters in idle state")
2708 self.idle = True
2709 task = self.task_queue.get(block=True)
2710 self.idle = False
2711
2712 if task[0] == "terminate":
2713 break
2714 elif task[0] == "load_vim":
2715 self.logger.info("order to load vim {}".format(task[1]))
2716 self._load_vim(task[1])
2717 elif task[0] == "unload_vim":
2718 self.logger.info("order to unload vim {}".format(task[1]))
2719 self._unload_vim(task[1])
2720 elif task[0] == "reload_vim":
2721 self._reload_vim(task[1])
2722 elif task[0] == "check_vim":
2723 self.logger.info("order to check vim {}".format(task[1]))
2724 self._check_vim(task[1])
2725 continue
2726 except Exception as e:
2727 if isinstance(e, queue.Empty):
2728 pass
2729 else:
2730 self.logger.critical(
2731 "Error processing task: {}".format(e), exc_info=True
2732 )
2733
2734 # step 2: process pending_tasks, delete not needed tasks
2735 try:
2736 if self.tasks_to_delete:
2737 self._process_delete_db_tasks()
2738 busy = False
2739 """
2740 # Log RO tasks only when loglevel is DEBUG
2741 if self.logger.getEffectiveLevel() == logging.DEBUG:
2742 _ = self._get_db_all_tasks()
2743 """
2744 ro_task = self._get_db_task()
2745 if ro_task:
2746 self.logger.warning("Task to process: {}".format(ro_task))
2747 time.sleep(1)
2748 self._process_pending_tasks(ro_task)
2749 busy = True
2750 if not busy:
2751 time.sleep(5)
2752 except Exception as e:
2753 self.logger.critical(
2754 "Unexpected exception at run: " + str(e), exc_info=True
2755 )
2756
2757 self.logger.info("Finishing")