Fix Bug 2041: Backup VDU interfaces for healing features
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from unittest.mock import Mock
37
38 from importlib_metadata import entry_points
39 from osm_common.dbbase import DbException
40 from osm_ng_ro.vim_admin import LockRenew
41 from osm_ro_plugin import sdnconn, vimconn
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin.vim_dummy import VimDummyConnector
44 import yaml
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 "vim_message": None,
215 }
216 self.logger.debug(
217 "task={} {} new-net={} created={}".format(
218 task_id, ro_task["target_id"], vim_net_id, created
219 )
220 )
221
222 return "BUILD", ro_vim_item_update
223 except (vimconn.VimConnException, NsWorkerException) as e:
224 self.logger.error(
225 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
226 )
227 ro_vim_item_update = {
228 "vim_status": "VIM_ERROR",
229 "created": created,
230 "vim_message": str(e),
231 }
232
233 return "FAILED", ro_vim_item_update
234
235 def refresh(self, ro_task):
236 """Call VIM to get network status"""
237 ro_task_id = ro_task["_id"]
238 target_vim = self.my_vims[ro_task["target_id"]]
239 vim_id = ro_task["vim_info"]["vim_id"]
240 net_to_refresh_list = [vim_id]
241
242 try:
243 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
244 vim_info = vim_dict[vim_id]
245
246 if vim_info["status"] == "ACTIVE":
247 task_status = "DONE"
248 elif vim_info["status"] == "BUILD":
249 task_status = "BUILD"
250 else:
251 task_status = "FAILED"
252 except vimconn.VimConnException as e:
253 # Mark all tasks at VIM_ERROR status
254 self.logger.error(
255 "ro_task={} vim={} get-net={}: {}".format(
256 ro_task_id, ro_task["target_id"], vim_id, e
257 )
258 )
259 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
260 task_status = "FAILED"
261
262 ro_vim_item_update = {}
263 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
264 ro_vim_item_update["vim_status"] = vim_info["status"]
265
266 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
267 ro_vim_item_update["vim_name"] = vim_info.get("name")
268
269 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
270 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
271 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
272 elif vim_info["status"] == "DELETED":
273 ro_vim_item_update["vim_id"] = None
274 ro_vim_item_update["vim_message"] = "Deleted externally"
275 else:
276 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
277 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
278
279 if ro_vim_item_update:
280 self.logger.debug(
281 "ro_task={} {} get-net={}: status={} {}".format(
282 ro_task_id,
283 ro_task["target_id"],
284 vim_id,
285 ro_vim_item_update.get("vim_status"),
286 ro_vim_item_update.get("vim_message")
287 if ro_vim_item_update.get("vim_status") != "ACTIVE"
288 else "",
289 )
290 )
291
292 return task_status, ro_vim_item_update
293
294 def delete(self, ro_task, task_index):
295 task = ro_task["tasks"][task_index]
296 task_id = task["task_id"]
297 net_vim_id = ro_task["vim_info"]["vim_id"]
298 ro_vim_item_update_ok = {
299 "vim_status": "DELETED",
300 "created": False,
301 "vim_message": "DELETED",
302 "vim_id": None,
303 }
304
305 try:
306 if net_vim_id or ro_task["vim_info"]["created_items"]:
307 target_vim = self.my_vims[ro_task["target_id"]]
308 target_vim.delete_network(
309 net_vim_id, ro_task["vim_info"]["created_items"]
310 )
311 except vimconn.VimConnNotFoundException:
312 ro_vim_item_update_ok["vim_message"] = "already deleted"
313 except vimconn.VimConnException as e:
314 self.logger.error(
315 "ro_task={} vim={} del-net={}: {}".format(
316 ro_task["_id"], ro_task["target_id"], net_vim_id, e
317 )
318 )
319 ro_vim_item_update = {
320 "vim_status": "VIM_ERROR",
321 "vim_message": "Error while deleting: {}".format(e),
322 }
323
324 return "FAILED", ro_vim_item_update
325
326 self.logger.debug(
327 "task={} {} del-net={} {}".format(
328 task_id,
329 ro_task["target_id"],
330 net_vim_id,
331 ro_vim_item_update_ok.get("vim_message", ""),
332 )
333 )
334
335 return "DONE", ro_vim_item_update_ok
336
337
338 class VimInteractionVdu(VimInteractionBase):
339 max_retries_inject_ssh_key = 20 # 20 times
340 time_retries_inject_ssh_key = 30 # wevery 30 seconds
341
342 def new(self, ro_task, task_index, task_depends):
343 task = ro_task["tasks"][task_index]
344 task_id = task["task_id"]
345 created = False
346 created_items = {}
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390
391 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
392 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393
394 ro_vim_item_update = {
395 "vim_id": vim_vm_id,
396 "vim_status": "BUILD",
397 "created": created,
398 "created_items": created_items,
399 "vim_details": None,
400 "vim_message": None,
401 "interfaces_vim_ids": interfaces,
402 "interfaces": [],
403 "interfaces_backup": [],
404 }
405 self.logger.debug(
406 "task={} {} new-vm={} created={}".format(
407 task_id, ro_task["target_id"], vim_vm_id, created
408 )
409 )
410
411 return "BUILD", ro_vim_item_update
412 except (vimconn.VimConnException, NsWorkerException) as e:
413 self.logger.error(
414 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
415 )
416 ro_vim_item_update = {
417 "vim_status": "VIM_ERROR",
418 "created": created,
419 "vim_message": str(e),
420 }
421
422 return "FAILED", ro_vim_item_update
423
424 def delete(self, ro_task, task_index):
425 task = ro_task["tasks"][task_index]
426 task_id = task["task_id"]
427 vm_vim_id = ro_task["vim_info"]["vim_id"]
428 ro_vim_item_update_ok = {
429 "vim_status": "DELETED",
430 "created": False,
431 "vim_message": "DELETED",
432 "vim_id": None,
433 }
434
435 try:
436 self.logger.debug(
437 "delete_vminstance: vm_vim_id={} created_items={}".format(
438 vm_vim_id, ro_task["vim_info"]["created_items"]
439 )
440 )
441 if vm_vim_id or ro_task["vim_info"]["created_items"]:
442 target_vim = self.my_vims[ro_task["target_id"]]
443 target_vim.delete_vminstance(
444 vm_vim_id,
445 ro_task["vim_info"]["created_items"],
446 ro_task["vim_info"].get("volumes_to_hold", []),
447 )
448 except vimconn.VimConnNotFoundException:
449 ro_vim_item_update_ok["vim_message"] = "already deleted"
450 except vimconn.VimConnException as e:
451 self.logger.error(
452 "ro_task={} vim={} del-vm={}: {}".format(
453 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
454 )
455 )
456 ro_vim_item_update = {
457 "vim_status": "VIM_ERROR",
458 "vim_message": "Error while deleting: {}".format(e),
459 }
460
461 return "FAILED", ro_vim_item_update
462
463 self.logger.debug(
464 "task={} {} del-vm={} {}".format(
465 task_id,
466 ro_task["target_id"],
467 vm_vim_id,
468 ro_vim_item_update_ok.get("vim_message", ""),
469 )
470 )
471
472 return "DONE", ro_vim_item_update_ok
473
474 def refresh(self, ro_task):
475 """Call VIM to get vm status"""
476 ro_task_id = ro_task["_id"]
477 target_vim = self.my_vims[ro_task["target_id"]]
478 vim_id = ro_task["vim_info"]["vim_id"]
479
480 if not vim_id:
481 return None, None
482
483 vm_to_refresh_list = [vim_id]
484 try:
485 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
486 vim_info = vim_dict[vim_id]
487
488 if vim_info["status"] == "ACTIVE":
489 task_status = "DONE"
490 elif vim_info["status"] == "BUILD":
491 task_status = "BUILD"
492 else:
493 task_status = "FAILED"
494
495 # try to load and parse vim_information
496 try:
497 vim_info_info = yaml.safe_load(vim_info["vim_info"])
498 if vim_info_info.get("name"):
499 vim_info["name"] = vim_info_info["name"]
500 except Exception:
501 pass
502 except vimconn.VimConnException as e:
503 # Mark all tasks at VIM_ERROR status
504 self.logger.error(
505 "ro_task={} vim={} get-vm={}: {}".format(
506 ro_task_id, ro_task["target_id"], vim_id, e
507 )
508 )
509 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
510 task_status = "FAILED"
511
512 ro_vim_item_update = {}
513
514 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
515 vim_interfaces = []
516 if vim_info.get("interfaces"):
517 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
518 iface = next(
519 (
520 iface
521 for iface in vim_info["interfaces"]
522 if vim_iface_id == iface["vim_interface_id"]
523 ),
524 None,
525 )
526 # if iface:
527 # iface.pop("vim_info", None)
528 vim_interfaces.append(iface)
529
530 task_create = next(
531 t
532 for t in ro_task["tasks"]
533 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
534 )
535 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
536 vim_interfaces[task_create["mgmt_vnf_interface"]][
537 "mgmt_vnf_interface"
538 ] = True
539
540 mgmt_vdu_iface = task_create.get(
541 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
542 )
543 if vim_interfaces:
544 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
545
546 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
547 ro_vim_item_update["interfaces"] = vim_interfaces
548
549 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
550 ro_vim_item_update["vim_status"] = vim_info["status"]
551
552 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
553 ro_vim_item_update["vim_name"] = vim_info.get("name")
554
555 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
556 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
557 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
558 elif vim_info["status"] == "DELETED":
559 ro_vim_item_update["vim_id"] = None
560 ro_vim_item_update["vim_message"] = "Deleted externally"
561 else:
562 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
563 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
564
565 if ro_vim_item_update:
566 self.logger.debug(
567 "ro_task={} {} get-vm={}: status={} {}".format(
568 ro_task_id,
569 ro_task["target_id"],
570 vim_id,
571 ro_vim_item_update.get("vim_status"),
572 ro_vim_item_update.get("vim_message")
573 if ro_vim_item_update.get("vim_status") != "ACTIVE"
574 else "",
575 )
576 )
577
578 return task_status, ro_vim_item_update
579
580 def exec(self, ro_task, task_index, task_depends):
581 task = ro_task["tasks"][task_index]
582 task_id = task["task_id"]
583 target_vim = self.my_vims[ro_task["target_id"]]
584 db_task_update = {"retries": 0}
585 retries = task.get("retries", 0)
586
587 try:
588 params = task["params"]
589 params_copy = deepcopy(params)
590 params_copy["ro_key"] = self.db.decrypt(
591 params_copy.pop("private_key"),
592 params_copy.pop("schema_version"),
593 params_copy.pop("salt"),
594 )
595 params_copy["ip_addr"] = params_copy.pop("ip_address")
596 target_vim.inject_user_key(**params_copy)
597 self.logger.debug(
598 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
599 )
600
601 return (
602 "DONE",
603 None,
604 db_task_update,
605 ) # params_copy["key"]
606 except (vimconn.VimConnException, NsWorkerException) as e:
607 retries += 1
608
609 self.logger.debug(traceback.format_exc())
610 if retries < self.max_retries_inject_ssh_key:
611 return (
612 "BUILD",
613 None,
614 {
615 "retries": retries,
616 "next_retry": self.time_retries_inject_ssh_key,
617 },
618 )
619
620 self.logger.error(
621 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
622 )
623 ro_vim_item_update = {"vim_message": str(e)}
624
625 return "FAILED", ro_vim_item_update, db_task_update
626
627
628 class VimInteractionImage(VimInteractionBase):
629 def new(self, ro_task, task_index, task_depends):
630 task = ro_task["tasks"][task_index]
631 task_id = task["task_id"]
632 created = False
633 created_items = {}
634 target_vim = self.my_vims[ro_task["target_id"]]
635
636 try:
637 # FIND
638 if task.get("find_params"):
639 vim_images = target_vim.get_image_list(**task["find_params"])
640
641 if not vim_images:
642 raise NsWorkerExceptionNotFound(
643 "Image not found with this criteria: '{}'".format(
644 task["find_params"]
645 )
646 )
647 elif len(vim_images) > 1:
648 raise NsWorkerException(
649 "More than one image found with this criteria: '{}'".format(
650 task["find_params"]
651 )
652 )
653 else:
654 vim_image_id = vim_images[0]["id"]
655
656 ro_vim_item_update = {
657 "vim_id": vim_image_id,
658 "vim_status": "DONE",
659 "created": created,
660 "created_items": created_items,
661 "vim_details": None,
662 "vim_message": None,
663 }
664 self.logger.debug(
665 "task={} {} new-image={} created={}".format(
666 task_id, ro_task["target_id"], vim_image_id, created
667 )
668 )
669
670 return "DONE", ro_vim_item_update
671 except (NsWorkerException, vimconn.VimConnException) as e:
672 self.logger.error(
673 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
674 )
675 ro_vim_item_update = {
676 "vim_status": "VIM_ERROR",
677 "created": created,
678 "vim_message": str(e),
679 }
680
681 return "FAILED", ro_vim_item_update
682
683
684 class VimInteractionFlavor(VimInteractionBase):
685 def delete(self, ro_task, task_index):
686 task = ro_task["tasks"][task_index]
687 task_id = task["task_id"]
688 flavor_vim_id = ro_task["vim_info"]["vim_id"]
689 ro_vim_item_update_ok = {
690 "vim_status": "DELETED",
691 "created": False,
692 "vim_message": "DELETED",
693 "vim_id": None,
694 }
695
696 try:
697 if flavor_vim_id:
698 target_vim = self.my_vims[ro_task["target_id"]]
699 target_vim.delete_flavor(flavor_vim_id)
700 except vimconn.VimConnNotFoundException:
701 ro_vim_item_update_ok["vim_message"] = "already deleted"
702 except vimconn.VimConnException as e:
703 self.logger.error(
704 "ro_task={} vim={} del-flavor={}: {}".format(
705 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
706 )
707 )
708 ro_vim_item_update = {
709 "vim_status": "VIM_ERROR",
710 "vim_message": "Error while deleting: {}".format(e),
711 }
712
713 return "FAILED", ro_vim_item_update
714
715 self.logger.debug(
716 "task={} {} del-flavor={} {}".format(
717 task_id,
718 ro_task["target_id"],
719 flavor_vim_id,
720 ro_vim_item_update_ok.get("vim_message", ""),
721 )
722 )
723
724 return "DONE", ro_vim_item_update_ok
725
726 def new(self, ro_task, task_index, task_depends):
727 task = ro_task["tasks"][task_index]
728 task_id = task["task_id"]
729 created = False
730 created_items = {}
731 target_vim = self.my_vims[ro_task["target_id"]]
732
733 try:
734 # FIND
735 vim_flavor_id = None
736
737 if task.get("find_params"):
738 try:
739 flavor_data = task["find_params"]["flavor_data"]
740 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
741 except vimconn.VimConnNotFoundException:
742 pass
743
744 if not vim_flavor_id and task.get("params"):
745 # CREATE
746 flavor_data = task["params"]["flavor_data"]
747 vim_flavor_id = target_vim.new_flavor(flavor_data)
748 created = True
749
750 ro_vim_item_update = {
751 "vim_id": vim_flavor_id,
752 "vim_status": "DONE",
753 "created": created,
754 "created_items": created_items,
755 "vim_details": None,
756 "vim_message": None,
757 }
758 self.logger.debug(
759 "task={} {} new-flavor={} created={}".format(
760 task_id, ro_task["target_id"], vim_flavor_id, created
761 )
762 )
763
764 return "DONE", ro_vim_item_update
765 except (vimconn.VimConnException, NsWorkerException) as e:
766 self.logger.error(
767 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
768 )
769 ro_vim_item_update = {
770 "vim_status": "VIM_ERROR",
771 "created": created,
772 "vim_message": str(e),
773 }
774
775 return "FAILED", ro_vim_item_update
776
777
778 class VimInteractionAffinityGroup(VimInteractionBase):
779 def delete(self, ro_task, task_index):
780 task = ro_task["tasks"][task_index]
781 task_id = task["task_id"]
782 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
783 ro_vim_item_update_ok = {
784 "vim_status": "DELETED",
785 "created": False,
786 "vim_message": "DELETED",
787 "vim_id": None,
788 }
789
790 try:
791 if affinity_group_vim_id:
792 target_vim = self.my_vims[ro_task["target_id"]]
793 target_vim.delete_affinity_group(affinity_group_vim_id)
794 except vimconn.VimConnNotFoundException:
795 ro_vim_item_update_ok["vim_message"] = "already deleted"
796 except vimconn.VimConnException as e:
797 self.logger.error(
798 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
799 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
800 )
801 )
802 ro_vim_item_update = {
803 "vim_status": "VIM_ERROR",
804 "vim_message": "Error while deleting: {}".format(e),
805 }
806
807 return "FAILED", ro_vim_item_update
808
809 self.logger.debug(
810 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
811 task_id,
812 ro_task["target_id"],
813 affinity_group_vim_id,
814 ro_vim_item_update_ok.get("vim_message", ""),
815 )
816 )
817
818 return "DONE", ro_vim_item_update_ok
819
820 def new(self, ro_task, task_index, task_depends):
821 task = ro_task["tasks"][task_index]
822 task_id = task["task_id"]
823 created = False
824 created_items = {}
825 target_vim = self.my_vims[ro_task["target_id"]]
826
827 try:
828 affinity_group_vim_id = None
829 affinity_group_data = None
830
831 if task.get("params"):
832 affinity_group_data = task["params"].get("affinity_group_data")
833
834 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
835 try:
836 param_affinity_group_id = task["params"]["affinity_group_data"].get(
837 "vim-affinity-group-id"
838 )
839 affinity_group_vim_id = target_vim.get_affinity_group(
840 param_affinity_group_id
841 ).get("id")
842 except vimconn.VimConnNotFoundException:
843 self.logger.error(
844 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
845 "could not be found at VIM. Creating a new one.".format(
846 task_id, ro_task["target_id"], param_affinity_group_id
847 )
848 )
849
850 if not affinity_group_vim_id and affinity_group_data:
851 affinity_group_vim_id = target_vim.new_affinity_group(
852 affinity_group_data
853 )
854 created = True
855
856 ro_vim_item_update = {
857 "vim_id": affinity_group_vim_id,
858 "vim_status": "DONE",
859 "created": created,
860 "created_items": created_items,
861 "vim_details": None,
862 "vim_message": None,
863 }
864 self.logger.debug(
865 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
866 task_id, ro_task["target_id"], affinity_group_vim_id, created
867 )
868 )
869
870 return "DONE", ro_vim_item_update
871 except (vimconn.VimConnException, NsWorkerException) as e:
872 self.logger.error(
873 "task={} vim={} new-affinity-or-anti-affinity-group:"
874 " {}".format(task_id, ro_task["target_id"], e)
875 )
876 ro_vim_item_update = {
877 "vim_status": "VIM_ERROR",
878 "created": created,
879 "vim_message": str(e),
880 }
881
882 return "FAILED", ro_vim_item_update
883
884
885 class VimInteractionSdnNet(VimInteractionBase):
886 @staticmethod
887 def _match_pci(port_pci, mapping):
888 """
889 Check if port_pci matches with mapping
890 mapping can have brackets to indicate that several chars are accepted. e.g
891 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
892 :param port_pci: text
893 :param mapping: text, can contain brackets to indicate several chars are available
894 :return: True if matches, False otherwise
895 """
896 if not port_pci or not mapping:
897 return False
898 if port_pci == mapping:
899 return True
900
901 mapping_index = 0
902 pci_index = 0
903 while True:
904 bracket_start = mapping.find("[", mapping_index)
905
906 if bracket_start == -1:
907 break
908
909 bracket_end = mapping.find("]", bracket_start)
910 if bracket_end == -1:
911 break
912
913 length = bracket_start - mapping_index
914 if (
915 length
916 and port_pci[pci_index : pci_index + length]
917 != mapping[mapping_index:bracket_start]
918 ):
919 return False
920
921 if (
922 port_pci[pci_index + length]
923 not in mapping[bracket_start + 1 : bracket_end]
924 ):
925 return False
926
927 pci_index += length + 1
928 mapping_index = bracket_end + 1
929
930 if port_pci[pci_index:] != mapping[mapping_index:]:
931 return False
932
933 return True
934
935 def _get_interfaces(self, vlds_to_connect, vim_account_id):
936 """
937 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
938 :param vim_account_id:
939 :return:
940 """
941 interfaces = []
942
943 for vld in vlds_to_connect:
944 table, _, db_id = vld.partition(":")
945 db_id, _, vld = db_id.partition(":")
946 _, _, vld_id = vld.partition(".")
947
948 if table == "vnfrs":
949 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
950 iface_key = "vnf-vld-id"
951 else: # table == "nsrs"
952 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
953 iface_key = "ns-vld-id"
954
955 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
956
957 for db_vnfr in db_vnfrs:
958 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
959 for iface_index, interface in enumerate(vdur["interfaces"]):
960 if interface.get(iface_key) == vld_id and interface.get(
961 "type"
962 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
963 # only SR-IOV o PT
964 interface_ = interface.copy()
965 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
966 db_vnfr["_id"], vdu_index, iface_index
967 )
968
969 if vdur.get("status") == "ERROR":
970 interface_["status"] = "ERROR"
971
972 interfaces.append(interface_)
973
974 return interfaces
975
976 def refresh(self, ro_task):
977 # look for task create
978 task_create_index, _ = next(
979 i_t
980 for i_t in enumerate(ro_task["tasks"])
981 if i_t[1]
982 and i_t[1]["action"] == "CREATE"
983 and i_t[1]["status"] != "FINISHED"
984 )
985
986 return self.new(ro_task, task_create_index, None)
987
988 def new(self, ro_task, task_index, task_depends):
989
990 task = ro_task["tasks"][task_index]
991 task_id = task["task_id"]
992 target_vim = self.my_vims[ro_task["target_id"]]
993
994 sdn_net_id = ro_task["vim_info"]["vim_id"]
995
996 created_items = ro_task["vim_info"].get("created_items")
997 connected_ports = ro_task["vim_info"].get("connected_ports", [])
998 new_connected_ports = []
999 last_update = ro_task["vim_info"].get("last_update", 0)
1000 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1001 error_list = []
1002 created = ro_task["vim_info"].get("created", False)
1003
1004 try:
1005 # CREATE
1006 params = task["params"]
1007 vlds_to_connect = params["vlds"]
1008 associated_vim = params["target_vim"]
1009 # external additional ports
1010 additional_ports = params.get("sdn-ports") or ()
1011 _, _, vim_account_id = associated_vim.partition(":")
1012
1013 if associated_vim:
1014 # get associated VIM
1015 if associated_vim not in self.db_vims:
1016 self.db_vims[associated_vim] = self.db.get_one(
1017 "vim_accounts", {"_id": vim_account_id}
1018 )
1019
1020 db_vim = self.db_vims[associated_vim]
1021
1022 # look for ports to connect
1023 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1024 # print(ports)
1025
1026 sdn_ports = []
1027 pending_ports = error_ports = 0
1028 vlan_used = None
1029 sdn_need_update = False
1030
1031 for port in ports:
1032 vlan_used = port.get("vlan") or vlan_used
1033
1034 # TODO. Do not connect if already done
1035 if not port.get("compute_node") or not port.get("pci"):
1036 if port.get("status") == "ERROR":
1037 error_ports += 1
1038 else:
1039 pending_ports += 1
1040 continue
1041
1042 pmap = None
1043 compute_node_mappings = next(
1044 (
1045 c
1046 for c in db_vim["config"].get("sdn-port-mapping", ())
1047 if c and c["compute_node"] == port["compute_node"]
1048 ),
1049 None,
1050 )
1051
1052 if compute_node_mappings:
1053 # process port_mapping pci of type 0000:af:1[01].[1357]
1054 pmap = next(
1055 (
1056 p
1057 for p in compute_node_mappings["ports"]
1058 if self._match_pci(port["pci"], p.get("pci"))
1059 ),
1060 None,
1061 )
1062
1063 if not pmap:
1064 if not db_vim["config"].get("mapping_not_needed"):
1065 error_list.append(
1066 "Port mapping not found for compute_node={} pci={}".format(
1067 port["compute_node"], port["pci"]
1068 )
1069 )
1070 continue
1071
1072 pmap = {}
1073
1074 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1075 new_port = {
1076 "service_endpoint_id": pmap.get("service_endpoint_id")
1077 or service_endpoint_id,
1078 "service_endpoint_encapsulation_type": "dot1q"
1079 if port["type"] == "SR-IOV"
1080 else None,
1081 "service_endpoint_encapsulation_info": {
1082 "vlan": port.get("vlan"),
1083 "mac": port.get("mac-address"),
1084 "device_id": pmap.get("device_id") or port["compute_node"],
1085 "device_interface_id": pmap.get("device_interface_id")
1086 or port["pci"],
1087 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1088 "switch_port": pmap.get("switch_port"),
1089 "service_mapping_info": pmap.get("service_mapping_info"),
1090 },
1091 }
1092
1093 # TODO
1094 # if port["modified_at"] > last_update:
1095 # sdn_need_update = True
1096 new_connected_ports.append(port["id"]) # TODO
1097 sdn_ports.append(new_port)
1098
1099 if error_ports:
1100 error_list.append(
1101 "{} interfaces have not been created as VDU is on ERROR status".format(
1102 error_ports
1103 )
1104 )
1105
1106 # connect external ports
1107 for index, additional_port in enumerate(additional_ports):
1108 additional_port_id = additional_port.get(
1109 "service_endpoint_id"
1110 ) or "external-{}".format(index)
1111 sdn_ports.append(
1112 {
1113 "service_endpoint_id": additional_port_id,
1114 "service_endpoint_encapsulation_type": additional_port.get(
1115 "service_endpoint_encapsulation_type", "dot1q"
1116 ),
1117 "service_endpoint_encapsulation_info": {
1118 "vlan": additional_port.get("vlan") or vlan_used,
1119 "mac": additional_port.get("mac_address"),
1120 "device_id": additional_port.get("device_id"),
1121 "device_interface_id": additional_port.get(
1122 "device_interface_id"
1123 ),
1124 "switch_dpid": additional_port.get("switch_dpid")
1125 or additional_port.get("switch_id"),
1126 "switch_port": additional_port.get("switch_port"),
1127 "service_mapping_info": additional_port.get(
1128 "service_mapping_info"
1129 ),
1130 },
1131 }
1132 )
1133 new_connected_ports.append(additional_port_id)
1134 sdn_info = ""
1135
1136 # if there are more ports to connect or they have been modified, call create/update
1137 if error_list:
1138 sdn_status = "ERROR"
1139 sdn_info = "; ".join(error_list)
1140 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1141 last_update = time.time()
1142
1143 if not sdn_net_id:
1144 if len(sdn_ports) < 2:
1145 sdn_status = "ACTIVE"
1146
1147 if not pending_ports:
1148 self.logger.debug(
1149 "task={} {} new-sdn-net done, less than 2 ports".format(
1150 task_id, ro_task["target_id"]
1151 )
1152 )
1153 else:
1154 net_type = params.get("type") or "ELAN"
1155 (
1156 sdn_net_id,
1157 created_items,
1158 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1159 created = True
1160 self.logger.debug(
1161 "task={} {} new-sdn-net={} created={}".format(
1162 task_id, ro_task["target_id"], sdn_net_id, created
1163 )
1164 )
1165 else:
1166 created_items = target_vim.edit_connectivity_service(
1167 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1168 )
1169 created = True
1170 self.logger.debug(
1171 "task={} {} update-sdn-net={} created={}".format(
1172 task_id, ro_task["target_id"], sdn_net_id, created
1173 )
1174 )
1175
1176 connected_ports = new_connected_ports
1177 elif sdn_net_id:
1178 wim_status_dict = target_vim.get_connectivity_service_status(
1179 sdn_net_id, conn_info=created_items
1180 )
1181 sdn_status = wim_status_dict["sdn_status"]
1182
1183 if wim_status_dict.get("sdn_info"):
1184 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1185
1186 if wim_status_dict.get("error_msg"):
1187 sdn_info = wim_status_dict.get("error_msg") or ""
1188
1189 if pending_ports:
1190 if sdn_status != "ERROR":
1191 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1192 len(ports) - pending_ports, len(ports)
1193 )
1194
1195 if sdn_status == "ACTIVE":
1196 sdn_status = "BUILD"
1197
1198 ro_vim_item_update = {
1199 "vim_id": sdn_net_id,
1200 "vim_status": sdn_status,
1201 "created": created,
1202 "created_items": created_items,
1203 "connected_ports": connected_ports,
1204 "vim_details": sdn_info,
1205 "vim_message": None,
1206 "last_update": last_update,
1207 }
1208
1209 return sdn_status, ro_vim_item_update
1210 except Exception as e:
1211 self.logger.error(
1212 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1213 exc_info=not isinstance(
1214 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1215 ),
1216 )
1217 ro_vim_item_update = {
1218 "vim_status": "VIM_ERROR",
1219 "created": created,
1220 "vim_message": str(e),
1221 }
1222
1223 return "FAILED", ro_vim_item_update
1224
1225 def delete(self, ro_task, task_index):
1226 task = ro_task["tasks"][task_index]
1227 task_id = task["task_id"]
1228 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1229 ro_vim_item_update_ok = {
1230 "vim_status": "DELETED",
1231 "created": False,
1232 "vim_message": "DELETED",
1233 "vim_id": None,
1234 }
1235
1236 try:
1237 if sdn_vim_id:
1238 target_vim = self.my_vims[ro_task["target_id"]]
1239 target_vim.delete_connectivity_service(
1240 sdn_vim_id, ro_task["vim_info"].get("created_items")
1241 )
1242
1243 except Exception as e:
1244 if (
1245 isinstance(e, sdnconn.SdnConnectorError)
1246 and e.http_code == HTTPStatus.NOT_FOUND.value
1247 ):
1248 ro_vim_item_update_ok["vim_message"] = "already deleted"
1249 else:
1250 self.logger.error(
1251 "ro_task={} vim={} del-sdn-net={}: {}".format(
1252 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1253 ),
1254 exc_info=not isinstance(
1255 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1256 ),
1257 )
1258 ro_vim_item_update = {
1259 "vim_status": "VIM_ERROR",
1260 "vim_message": "Error while deleting: {}".format(e),
1261 }
1262
1263 return "FAILED", ro_vim_item_update
1264
1265 self.logger.debug(
1266 "task={} {} del-sdn-net={} {}".format(
1267 task_id,
1268 ro_task["target_id"],
1269 sdn_vim_id,
1270 ro_vim_item_update_ok.get("vim_message", ""),
1271 )
1272 )
1273
1274 return "DONE", ro_vim_item_update_ok
1275
1276
1277 class VimInteractionMigration(VimInteractionBase):
1278 def exec(self, ro_task, task_index, task_depends):
1279 task = ro_task["tasks"][task_index]
1280 task_id = task["task_id"]
1281 db_task_update = {"retries": 0}
1282 target_vim = self.my_vims[ro_task["target_id"]]
1283 vim_interfaces = []
1284 created = False
1285 created_items = {}
1286 refreshed_vim_info = {}
1287
1288 try:
1289 if task.get("params"):
1290 vim_vm_id = task["params"].get("vim_vm_id")
1291 migrate_host = task["params"].get("migrate_host")
1292 _, migrated_compute_node = target_vim.migrate_instance(
1293 vim_vm_id, migrate_host
1294 )
1295
1296 if migrated_compute_node:
1297 # When VM is migrated, vdu["vim_info"] needs to be updated
1298 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1299 ro_task["target_id"]
1300 )
1301
1302 # Refresh VM to get new vim_info
1303 vm_to_refresh_list = [vim_vm_id]
1304 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1305 refreshed_vim_info = vim_dict[vim_vm_id]
1306
1307 if refreshed_vim_info.get("interfaces"):
1308 for old_iface in vdu_old_vim_info.get("interfaces"):
1309 iface = next(
1310 (
1311 iface
1312 for iface in refreshed_vim_info["interfaces"]
1313 if old_iface["vim_interface_id"]
1314 == iface["vim_interface_id"]
1315 ),
1316 None,
1317 )
1318 vim_interfaces.append(iface)
1319
1320 ro_vim_item_update = {
1321 "vim_id": vim_vm_id,
1322 "vim_status": "ACTIVE",
1323 "created": created,
1324 "created_items": created_items,
1325 "vim_details": None,
1326 "vim_message": None,
1327 }
1328
1329 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1330 "ERROR",
1331 "VIM_ERROR",
1332 ):
1333 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1334
1335 if vim_interfaces:
1336 ro_vim_item_update["interfaces"] = vim_interfaces
1337
1338 self.logger.debug(
1339 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1340 )
1341
1342 return "DONE", ro_vim_item_update, db_task_update
1343
1344 except (vimconn.VimConnException, NsWorkerException) as e:
1345 self.logger.error(
1346 "task={} vim={} VM Migration:"
1347 " {}".format(task_id, ro_task["target_id"], e)
1348 )
1349 ro_vim_item_update = {
1350 "vim_status": "VIM_ERROR",
1351 "created": created,
1352 "vim_message": str(e),
1353 }
1354
1355 return "FAILED", ro_vim_item_update, db_task_update
1356
1357
1358 class NsWorker(threading.Thread):
1359 REFRESH_BUILD = 5 # 5 seconds
1360 REFRESH_ACTIVE = 60 # 1 minute
1361 REFRESH_ERROR = 600
1362 REFRESH_IMAGE = 3600 * 10
1363 REFRESH_DELETE = 3600 * 10
1364 QUEUE_SIZE = 100
1365 terminate = False
1366
1367 def __init__(self, worker_index, config, plugins, db):
1368 """
1369
1370 :param worker_index: thread index
1371 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1372 :param plugins: global shared dict with the loaded plugins
1373 :param db: database class instance to use
1374 """
1375 threading.Thread.__init__(self)
1376 self.config = config
1377 self.plugins = plugins
1378 self.plugin_name = "unknown"
1379 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1380 self.worker_index = worker_index
1381 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1382 # targetvim: vimplugin class
1383 self.my_vims = {}
1384 # targetvim: vim information from database
1385 self.db_vims = {}
1386 # targetvim list
1387 self.vim_targets = []
1388 self.my_id = config["process_id"] + ":" + str(worker_index)
1389 self.db = db
1390 self.item2class = {
1391 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1392 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1393 "image": VimInteractionImage(
1394 self.db, self.my_vims, self.db_vims, self.logger
1395 ),
1396 "flavor": VimInteractionFlavor(
1397 self.db, self.my_vims, self.db_vims, self.logger
1398 ),
1399 "sdn_net": VimInteractionSdnNet(
1400 self.db, self.my_vims, self.db_vims, self.logger
1401 ),
1402 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1403 self.db, self.my_vims, self.db_vims, self.logger
1404 ),
1405 "migrate": VimInteractionMigration(
1406 self.db, self.my_vims, self.db_vims, self.logger
1407 ),
1408 }
1409 self.time_last_task_processed = None
1410 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1411 self.tasks_to_delete = []
1412 # it is idle when there are not vim_targets associated
1413 self.idle = True
1414 self.task_locked_time = config["global"]["task_locked_time"]
1415
1416 def insert_task(self, task):
1417 try:
1418 self.task_queue.put(task, False)
1419 return None
1420 except queue.Full:
1421 raise NsWorkerException("timeout inserting a task")
1422
1423 def terminate(self):
1424 self.insert_task("exit")
1425
1426 def del_task(self, task):
1427 with self.task_lock:
1428 if task["status"] == "SCHEDULED":
1429 task["status"] = "SUPERSEDED"
1430 return True
1431 else: # task["status"] == "processing"
1432 self.task_lock.release()
1433 return False
1434
1435 def _process_vim_config(self, target_id, db_vim):
1436 """
1437 Process vim config, creating vim configuration files as ca_cert
1438 :param target_id: vim/sdn/wim + id
1439 :param db_vim: Vim dictionary obtained from database
1440 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1441 """
1442 if not db_vim.get("config"):
1443 return
1444
1445 file_name = ""
1446
1447 try:
1448 if db_vim["config"].get("ca_cert_content"):
1449 file_name = "{}:{}".format(target_id, self.worker_index)
1450
1451 try:
1452 mkdir(file_name)
1453 except FileExistsError:
1454 pass
1455
1456 file_name = file_name + "/ca_cert"
1457
1458 with open(file_name, "w") as f:
1459 f.write(db_vim["config"]["ca_cert_content"])
1460 del db_vim["config"]["ca_cert_content"]
1461 db_vim["config"]["ca_cert"] = file_name
1462 except Exception as e:
1463 raise NsWorkerException(
1464 "Error writing to file '{}': {}".format(file_name, e)
1465 )
1466
1467 def _load_plugin(self, name, type="vim"):
1468 # type can be vim or sdn
1469 if "rovim_dummy" not in self.plugins:
1470 self.plugins["rovim_dummy"] = VimDummyConnector
1471
1472 if "rosdn_dummy" not in self.plugins:
1473 self.plugins["rosdn_dummy"] = SdnDummyConnector
1474
1475 if name in self.plugins:
1476 return self.plugins[name]
1477
1478 try:
1479 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1480 self.plugins[name] = ep.load()
1481 except Exception as e:
1482 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1483
1484 if name and name not in self.plugins:
1485 raise NsWorkerException(
1486 "Plugin 'osm_{n}' has not been installed".format(n=name)
1487 )
1488
1489 return self.plugins[name]
1490
1491 def _unload_vim(self, target_id):
1492 """
1493 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1494 :param target_id: Contains type:_id; where type can be 'vim', ...
1495 :return: None.
1496 """
1497 try:
1498 self.db_vims.pop(target_id, None)
1499 self.my_vims.pop(target_id, None)
1500
1501 if target_id in self.vim_targets:
1502 self.vim_targets.remove(target_id)
1503
1504 self.logger.info("Unloaded {}".format(target_id))
1505 rmtree("{}:{}".format(target_id, self.worker_index))
1506 except FileNotFoundError:
1507 pass # this is raised by rmtree if folder does not exist
1508 except Exception as e:
1509 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1510
1511 def _check_vim(self, target_id):
1512 """
1513 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1514 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1515 :return: None.
1516 """
1517 target, _, _id = target_id.partition(":")
1518 now = time.time()
1519 update_dict = {}
1520 unset_dict = {}
1521 op_text = ""
1522 step = ""
1523 loaded = target_id in self.vim_targets
1524 target_database = (
1525 "vim_accounts"
1526 if target == "vim"
1527 else "wim_accounts"
1528 if target == "wim"
1529 else "sdns"
1530 )
1531
1532 try:
1533 step = "Getting {} from db".format(target_id)
1534 db_vim = self.db.get_one(target_database, {"_id": _id})
1535
1536 for op_index, operation in enumerate(
1537 db_vim["_admin"].get("operations", ())
1538 ):
1539 if operation["operationState"] != "PROCESSING":
1540 continue
1541
1542 locked_at = operation.get("locked_at")
1543
1544 if locked_at is not None and locked_at >= now - self.task_locked_time:
1545 # some other thread is doing this operation
1546 return
1547
1548 # lock
1549 op_text = "_admin.operations.{}.".format(op_index)
1550
1551 if not self.db.set_one(
1552 target_database,
1553 q_filter={
1554 "_id": _id,
1555 op_text + "operationState": "PROCESSING",
1556 op_text + "locked_at": locked_at,
1557 },
1558 update_dict={
1559 op_text + "locked_at": now,
1560 "admin.current_operation": op_index,
1561 },
1562 fail_on_empty=False,
1563 ):
1564 return
1565
1566 unset_dict[op_text + "locked_at"] = None
1567 unset_dict["current_operation"] = None
1568 step = "Loading " + target_id
1569 error_text = self._load_vim(target_id)
1570
1571 if not error_text:
1572 step = "Checking connectivity"
1573
1574 if target == "vim":
1575 self.my_vims[target_id].check_vim_connectivity()
1576 else:
1577 self.my_vims[target_id].check_credentials()
1578
1579 update_dict["_admin.operationalState"] = "ENABLED"
1580 update_dict["_admin.detailed-status"] = ""
1581 unset_dict[op_text + "detailed-status"] = None
1582 update_dict[op_text + "operationState"] = "COMPLETED"
1583
1584 return
1585
1586 except Exception as e:
1587 error_text = "{}: {}".format(step, e)
1588 self.logger.error("{} for {}: {}".format(step, target_id, e))
1589
1590 finally:
1591 if update_dict or unset_dict:
1592 if error_text:
1593 update_dict[op_text + "operationState"] = "FAILED"
1594 update_dict[op_text + "detailed-status"] = error_text
1595 unset_dict.pop(op_text + "detailed-status", None)
1596 update_dict["_admin.operationalState"] = "ERROR"
1597 update_dict["_admin.detailed-status"] = error_text
1598
1599 if op_text:
1600 update_dict[op_text + "statusEnteredTime"] = now
1601
1602 self.db.set_one(
1603 target_database,
1604 q_filter={"_id": _id},
1605 update_dict=update_dict,
1606 unset=unset_dict,
1607 fail_on_empty=False,
1608 )
1609
1610 if not loaded:
1611 self._unload_vim(target_id)
1612
1613 def _reload_vim(self, target_id):
1614 if target_id in self.vim_targets:
1615 self._load_vim(target_id)
1616 else:
1617 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1618 # just remove it to force load again next time it is needed
1619 self.db_vims.pop(target_id, None)
1620
1621 def _load_vim(self, target_id):
1622 """
1623 Load or reload a vim_account, sdn_controller or wim_account.
1624 Read content from database, load the plugin if not loaded.
1625 In case of error loading the plugin, it load a failing VIM_connector
1626 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1627 :param target_id: Contains type:_id; where type can be 'vim', ...
1628 :return: None if ok, descriptive text if error
1629 """
1630 target, _, _id = target_id.partition(":")
1631 target_database = (
1632 "vim_accounts"
1633 if target == "vim"
1634 else "wim_accounts"
1635 if target == "wim"
1636 else "sdns"
1637 )
1638 plugin_name = ""
1639 vim = None
1640
1641 try:
1642 step = "Getting {}={} from db".format(target, _id)
1643 # TODO process for wim, sdnc, ...
1644 vim = self.db.get_one(target_database, {"_id": _id})
1645
1646 # if deep_get(vim, "config", "sdn-controller"):
1647 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1648 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1649
1650 step = "Decrypting password"
1651 schema_version = vim.get("schema_version")
1652 self.db.encrypt_decrypt_fields(
1653 vim,
1654 "decrypt",
1655 fields=("password", "secret"),
1656 schema_version=schema_version,
1657 salt=_id,
1658 )
1659 self._process_vim_config(target_id, vim)
1660
1661 if target == "vim":
1662 plugin_name = "rovim_" + vim["vim_type"]
1663 step = "Loading plugin '{}'".format(plugin_name)
1664 vim_module_conn = self._load_plugin(plugin_name)
1665 step = "Loading {}'".format(target_id)
1666 self.my_vims[target_id] = vim_module_conn(
1667 uuid=vim["_id"],
1668 name=vim["name"],
1669 tenant_id=vim.get("vim_tenant_id"),
1670 tenant_name=vim.get("vim_tenant_name"),
1671 url=vim["vim_url"],
1672 url_admin=None,
1673 user=vim["vim_user"],
1674 passwd=vim["vim_password"],
1675 config=vim.get("config") or {},
1676 persistent_info={},
1677 )
1678 else: # sdn
1679 plugin_name = "rosdn_" + vim["type"]
1680 step = "Loading plugin '{}'".format(plugin_name)
1681 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1682 step = "Loading {}'".format(target_id)
1683 wim = deepcopy(vim)
1684 wim_config = wim.pop("config", {}) or {}
1685 wim["uuid"] = wim["_id"]
1686 wim["wim_url"] = wim["url"]
1687
1688 if wim.get("dpid"):
1689 wim_config["dpid"] = wim.pop("dpid")
1690
1691 if wim.get("switch_id"):
1692 wim_config["switch_id"] = wim.pop("switch_id")
1693
1694 # wim, wim_account, config
1695 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1696 self.db_vims[target_id] = vim
1697 self.error_status = None
1698
1699 self.logger.info(
1700 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1701 )
1702 except Exception as e:
1703 self.logger.error(
1704 "Cannot load {} plugin={}: {} {}".format(
1705 target_id, plugin_name, step, e
1706 )
1707 )
1708
1709 self.db_vims[target_id] = vim or {}
1710 self.db_vims[target_id] = FailingConnector(str(e))
1711 error_status = "{} Error: {}".format(step, e)
1712
1713 return error_status
1714 finally:
1715 if target_id not in self.vim_targets:
1716 self.vim_targets.append(target_id)
1717
1718 def _get_db_task(self):
1719 """
1720 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1721 :return: None
1722 """
1723 now = time.time()
1724
1725 if not self.time_last_task_processed:
1726 self.time_last_task_processed = now
1727
1728 try:
1729 while True:
1730 """
1731 # Log RO tasks only when loglevel is DEBUG
1732 if self.logger.getEffectiveLevel() == logging.DEBUG:
1733 self._log_ro_task(
1734 None,
1735 None,
1736 None,
1737 "TASK_WF",
1738 "task_locked_time="
1739 + str(self.task_locked_time)
1740 + " "
1741 + "time_last_task_processed="
1742 + str(self.time_last_task_processed)
1743 + " "
1744 + "now="
1745 + str(now),
1746 )
1747 """
1748 locked = self.db.set_one(
1749 "ro_tasks",
1750 q_filter={
1751 "target_id": self.vim_targets,
1752 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1753 "locked_at.lt": now - self.task_locked_time,
1754 "to_check_at.lt": self.time_last_task_processed,
1755 },
1756 update_dict={"locked_by": self.my_id, "locked_at": now},
1757 fail_on_empty=False,
1758 )
1759
1760 if locked:
1761 # read and return
1762 ro_task = self.db.get_one(
1763 "ro_tasks",
1764 q_filter={
1765 "target_id": self.vim_targets,
1766 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1767 "locked_at": now,
1768 },
1769 )
1770 return ro_task
1771
1772 if self.time_last_task_processed == now:
1773 self.time_last_task_processed = None
1774 return None
1775 else:
1776 self.time_last_task_processed = now
1777 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1778
1779 except DbException as e:
1780 self.logger.error("Database exception at _get_db_task: {}".format(e))
1781 except Exception as e:
1782 self.logger.critical(
1783 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1784 )
1785
1786 return None
1787
1788 def _get_db_all_tasks(self):
1789 """
1790 Read all content of table ro_tasks to log it
1791 :return: None
1792 """
1793 try:
1794 # Checking the content of the BD:
1795
1796 # read and return
1797 ro_task = self.db.get_list("ro_tasks")
1798 for rt in ro_task:
1799 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1800 return ro_task
1801
1802 except DbException as e:
1803 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1804 except Exception as e:
1805 self.logger.critical(
1806 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1807 )
1808
1809 return None
1810
1811 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1812 """
1813 Generate a log with the following format:
1814
1815 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1816 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1817 task_array_index;task_id;task_action;task_item;task_args
1818
1819 Example:
1820
1821 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1822 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1823 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1824 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1825 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1826 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1827 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1828 """
1829 try:
1830 line = []
1831 i = 0
1832 if ro_task is not None and isinstance(ro_task, dict):
1833 for t in ro_task["tasks"]:
1834 line.clear()
1835 line.append(mark)
1836 line.append(event)
1837 line.append(ro_task.get("_id", ""))
1838 line.append(str(ro_task.get("locked_at", "")))
1839 line.append(str(ro_task.get("modified_at", "")))
1840 line.append(str(ro_task.get("created_at", "")))
1841 line.append(str(ro_task.get("to_check_at", "")))
1842 line.append(str(ro_task.get("locked_by", "")))
1843 line.append(str(ro_task.get("target_id", "")))
1844 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1845 line.append(str(ro_task.get("vim_info", "")))
1846 line.append(str(ro_task.get("tasks", "")))
1847 if isinstance(t, dict):
1848 line.append(str(t.get("status", "")))
1849 line.append(str(t.get("action_id", "")))
1850 line.append(str(i))
1851 line.append(str(t.get("task_id", "")))
1852 line.append(str(t.get("action", "")))
1853 line.append(str(t.get("item", "")))
1854 line.append(str(t.get("find_params", "")))
1855 line.append(str(t.get("params", "")))
1856 else:
1857 line.extend([""] * 2)
1858 line.append(str(i))
1859 line.extend([""] * 5)
1860
1861 i += 1
1862 self.logger.debug(";".join(line))
1863 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1864 i = 0
1865 while True:
1866 st = "tasks.{}.status".format(i)
1867 if st not in db_ro_task_update:
1868 break
1869 line.clear()
1870 line.append(mark)
1871 line.append(event)
1872 line.append(db_ro_task_update.get("_id", ""))
1873 line.append(str(db_ro_task_update.get("locked_at", "")))
1874 line.append(str(db_ro_task_update.get("modified_at", "")))
1875 line.append("")
1876 line.append(str(db_ro_task_update.get("to_check_at", "")))
1877 line.append(str(db_ro_task_update.get("locked_by", "")))
1878 line.append("")
1879 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1880 line.append("")
1881 line.append(str(db_ro_task_update.get("vim_info", "")))
1882 line.append(str(str(db_ro_task_update).count(".status")))
1883 line.append(db_ro_task_update.get(st, ""))
1884 line.append("")
1885 line.append(str(i))
1886 line.extend([""] * 3)
1887 i += 1
1888 self.logger.debug(";".join(line))
1889
1890 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1891 line.clear()
1892 line.append(mark)
1893 line.append(event)
1894 line.append(db_ro_task_delete.get("_id", ""))
1895 line.append("")
1896 line.append(db_ro_task_delete.get("modified_at", ""))
1897 line.extend([""] * 13)
1898 self.logger.debug(";".join(line))
1899
1900 else:
1901 line.clear()
1902 line.append(mark)
1903 line.append(event)
1904 line.extend([""] * 16)
1905 self.logger.debug(";".join(line))
1906
1907 except Exception as e:
1908 self.logger.error("Error logging ro_task: {}".format(e))
1909
1910 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1911 """
1912 Determine if this task need to be done or superseded
1913 :return: None
1914 """
1915 my_task = ro_task["tasks"][task_index]
1916 task_id = my_task["task_id"]
1917 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1918 "created_items", False
1919 )
1920
1921 self.logger.warning("Needed delete: {}".format(needed_delete))
1922 if my_task["status"] == "FAILED":
1923 return None, None # TODO need to be retry??
1924
1925 try:
1926 for index, task in enumerate(ro_task["tasks"]):
1927 if index == task_index or not task:
1928 continue # own task
1929
1930 if (
1931 my_task["target_record"] == task["target_record"]
1932 and task["action"] == "CREATE"
1933 ):
1934 # set to finished
1935 db_update["tasks.{}.status".format(index)] = task[
1936 "status"
1937 ] = "FINISHED"
1938 elif task["action"] == "CREATE" and task["status"] not in (
1939 "FINISHED",
1940 "SUPERSEDED",
1941 ):
1942 needed_delete = False
1943
1944 if needed_delete:
1945 self.logger.warning(
1946 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
1947 )
1948 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1949 else:
1950 return "SUPERSEDED", None
1951 except Exception as e:
1952 if not isinstance(e, NsWorkerException):
1953 self.logger.critical(
1954 "Unexpected exception at _delete_task task={}: {}".format(
1955 task_id, e
1956 ),
1957 exc_info=True,
1958 )
1959
1960 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
1961
1962 def _create_task(self, ro_task, task_index, task_depends, db_update):
1963 """
1964 Determine if this task need to create something at VIM
1965 :return: None
1966 """
1967 my_task = ro_task["tasks"][task_index]
1968 task_id = my_task["task_id"]
1969 task_status = None
1970
1971 if my_task["status"] == "FAILED":
1972 return None, None # TODO need to be retry??
1973 elif my_task["status"] == "SCHEDULED":
1974 # check if already created by another task
1975 for index, task in enumerate(ro_task["tasks"]):
1976 if index == task_index or not task:
1977 continue # own task
1978
1979 if task["action"] == "CREATE" and task["status"] not in (
1980 "SCHEDULED",
1981 "FINISHED",
1982 "SUPERSEDED",
1983 ):
1984 return task["status"], "COPY_VIM_INFO"
1985
1986 try:
1987 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1988 ro_task, task_index, task_depends
1989 )
1990 # TODO update other CREATE tasks
1991 except Exception as e:
1992 if not isinstance(e, NsWorkerException):
1993 self.logger.error(
1994 "Error executing task={}: {}".format(task_id, e), exc_info=True
1995 )
1996
1997 task_status = "FAILED"
1998 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
1999 # TODO update ro_vim_item_update
2000
2001 return task_status, ro_vim_item_update
2002 else:
2003 return None, None
2004
2005 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2006 """
2007 Look for dependency task
2008 :param task_id: Can be one of
2009 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2010 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2011 3. task.task_id: "<action_id>:number"
2012 :param ro_task:
2013 :param target_id:
2014 :return: database ro_task plus index of task
2015 """
2016 if (
2017 task_id.startswith("vim:")
2018 or task_id.startswith("sdn:")
2019 or task_id.startswith("wim:")
2020 ):
2021 target_id, _, task_id = task_id.partition(" ")
2022
2023 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2024 ro_task_dependency = self.db.get_one(
2025 "ro_tasks",
2026 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2027 fail_on_empty=False,
2028 )
2029
2030 if ro_task_dependency:
2031 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2032 if task["target_record_id"] == task_id:
2033 return ro_task_dependency, task_index
2034
2035 else:
2036 if ro_task:
2037 for task_index, task in enumerate(ro_task["tasks"]):
2038 if task and task["task_id"] == task_id:
2039 return ro_task, task_index
2040
2041 ro_task_dependency = self.db.get_one(
2042 "ro_tasks",
2043 q_filter={
2044 "tasks.ANYINDEX.task_id": task_id,
2045 "tasks.ANYINDEX.target_record.ne": None,
2046 },
2047 fail_on_empty=False,
2048 )
2049
2050 self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
2051 if ro_task_dependency:
2052 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2053 if task["task_id"] == task_id:
2054 return ro_task_dependency, task_index
2055 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2056
2057 def _process_pending_tasks(self, ro_task):
2058 ro_task_id = ro_task["_id"]
2059 now = time.time()
2060 # one day
2061 next_check_at = now + (24 * 60 * 60)
2062 db_ro_task_update = {}
2063
2064 def _update_refresh(new_status):
2065 # compute next_refresh
2066 nonlocal task
2067 nonlocal next_check_at
2068 nonlocal db_ro_task_update
2069 nonlocal ro_task
2070
2071 next_refresh = time.time()
2072
2073 if task["item"] in ("image", "flavor"):
2074 next_refresh += self.REFRESH_IMAGE
2075 elif new_status == "BUILD":
2076 next_refresh += self.REFRESH_BUILD
2077 elif new_status == "DONE":
2078 next_refresh += self.REFRESH_ACTIVE
2079 else:
2080 next_refresh += self.REFRESH_ERROR
2081
2082 next_check_at = min(next_check_at, next_refresh)
2083 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2084 ro_task["vim_info"]["refresh_at"] = next_refresh
2085
2086 try:
2087 """
2088 # Log RO tasks only when loglevel is DEBUG
2089 if self.logger.getEffectiveLevel() == logging.DEBUG:
2090 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2091 """
2092 # 0: get task_status_create
2093 lock_object = None
2094 task_status_create = None
2095 task_create = next(
2096 (
2097 t
2098 for t in ro_task["tasks"]
2099 if t
2100 and t["action"] == "CREATE"
2101 and t["status"] in ("BUILD", "DONE")
2102 ),
2103 None,
2104 )
2105
2106 if task_create:
2107 task_status_create = task_create["status"]
2108
2109 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2110 for task_action in ("DELETE", "CREATE", "EXEC"):
2111 db_vim_update = None
2112 new_status = None
2113
2114 for task_index, task in enumerate(ro_task["tasks"]):
2115 if not task:
2116 continue # task deleted
2117
2118 task_depends = {}
2119 target_update = None
2120
2121 if (
2122 (
2123 task_action in ("DELETE", "EXEC")
2124 and task["status"] not in ("SCHEDULED", "BUILD")
2125 )
2126 or task["action"] != task_action
2127 or (
2128 task_action == "CREATE"
2129 and task["status"] in ("FINISHED", "SUPERSEDED")
2130 )
2131 ):
2132 continue
2133
2134 task_path = "tasks.{}.status".format(task_index)
2135 try:
2136 db_vim_info_update = None
2137
2138 if task["status"] == "SCHEDULED":
2139 # check if tasks that this depends on have been completed
2140 dependency_not_completed = False
2141
2142 for dependency_task_id in task.get("depends_on") or ():
2143 (
2144 dependency_ro_task,
2145 dependency_task_index,
2146 ) = self._get_dependency(
2147 dependency_task_id, target_id=ro_task["target_id"]
2148 )
2149 dependency_task = dependency_ro_task["tasks"][
2150 dependency_task_index
2151 ]
2152 self.logger.warning(
2153 "dependency_ro_task={} dependency_task_index={}".format(
2154 dependency_ro_task, dependency_task_index
2155 )
2156 )
2157
2158 if dependency_task["status"] == "SCHEDULED":
2159 dependency_not_completed = True
2160 next_check_at = min(
2161 next_check_at, dependency_ro_task["to_check_at"]
2162 )
2163 # must allow dependent task to be processed first
2164 # to do this set time after last_task_processed
2165 next_check_at = max(
2166 self.time_last_task_processed, next_check_at
2167 )
2168 break
2169 elif dependency_task["status"] == "FAILED":
2170 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2171 task["action"],
2172 task["item"],
2173 dependency_task["action"],
2174 dependency_task["item"],
2175 dependency_task_id,
2176 dependency_ro_task["vim_info"].get(
2177 "vim_message"
2178 ),
2179 )
2180 self.logger.error(
2181 "task={} {}".format(task["task_id"], error_text)
2182 )
2183 raise NsWorkerException(error_text)
2184
2185 task_depends[dependency_task_id] = dependency_ro_task[
2186 "vim_info"
2187 ]["vim_id"]
2188 task_depends[
2189 "TASK-{}".format(dependency_task_id)
2190 ] = dependency_ro_task["vim_info"]["vim_id"]
2191
2192 if dependency_not_completed:
2193 self.logger.warning(
2194 "DEPENDENCY NOT COMPLETED {}".format(
2195 dependency_ro_task["vim_info"]["vim_id"]
2196 )
2197 )
2198 # TODO set at vim_info.vim_details that it is waiting
2199 continue
2200
2201 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2202 # the task of renew this locking. It will update database locket_at periodically
2203 if not lock_object:
2204 lock_object = LockRenew.add_lock_object(
2205 "ro_tasks", ro_task, self
2206 )
2207
2208 if task["action"] == "DELETE":
2209 (new_status, db_vim_info_update,) = self._delete_task(
2210 ro_task, task_index, task_depends, db_ro_task_update
2211 )
2212 new_status = (
2213 "FINISHED" if new_status == "DONE" else new_status
2214 )
2215 # ^with FINISHED instead of DONE it will not be refreshing
2216
2217 if new_status in ("FINISHED", "SUPERSEDED"):
2218 target_update = "DELETE"
2219 elif task["action"] == "EXEC":
2220 (
2221 new_status,
2222 db_vim_info_update,
2223 db_task_update,
2224 ) = self.item2class[task["item"]].exec(
2225 ro_task, task_index, task_depends
2226 )
2227 new_status = (
2228 "FINISHED" if new_status == "DONE" else new_status
2229 )
2230 # ^with FINISHED instead of DONE it will not be refreshing
2231
2232 if db_task_update:
2233 # load into database the modified db_task_update "retries" and "next_retry"
2234 if db_task_update.get("retries"):
2235 db_ro_task_update[
2236 "tasks.{}.retries".format(task_index)
2237 ] = db_task_update["retries"]
2238
2239 next_check_at = time.time() + db_task_update.get(
2240 "next_retry", 60
2241 )
2242 target_update = None
2243 elif task["action"] == "CREATE":
2244 if task["status"] == "SCHEDULED":
2245 if task_status_create:
2246 new_status = task_status_create
2247 target_update = "COPY_VIM_INFO"
2248 else:
2249 new_status, db_vim_info_update = self.item2class[
2250 task["item"]
2251 ].new(ro_task, task_index, task_depends)
2252 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2253 _update_refresh(new_status)
2254 else:
2255 if (
2256 ro_task["vim_info"]["refresh_at"]
2257 and now > ro_task["vim_info"]["refresh_at"]
2258 ):
2259 new_status, db_vim_info_update = self.item2class[
2260 task["item"]
2261 ].refresh(ro_task)
2262 _update_refresh(new_status)
2263 else:
2264 # The refresh is updated to avoid set the value of "refresh_at" to
2265 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2266 # because it can happen that in this case the task is never processed
2267 _update_refresh(task["status"])
2268
2269 except Exception as e:
2270 new_status = "FAILED"
2271 db_vim_info_update = {
2272 "vim_status": "VIM_ERROR",
2273 "vim_message": str(e),
2274 }
2275
2276 if not isinstance(
2277 e, (NsWorkerException, vimconn.VimConnException)
2278 ):
2279 self.logger.error(
2280 "Unexpected exception at _delete_task task={}: {}".format(
2281 task["task_id"], e
2282 ),
2283 exc_info=True,
2284 )
2285
2286 try:
2287 if db_vim_info_update:
2288 db_vim_update = db_vim_info_update.copy()
2289 db_ro_task_update.update(
2290 {
2291 "vim_info." + k: v
2292 for k, v in db_vim_info_update.items()
2293 }
2294 )
2295 ro_task["vim_info"].update(db_vim_info_update)
2296
2297 if new_status:
2298 if task_action == "CREATE":
2299 task_status_create = new_status
2300 db_ro_task_update[task_path] = new_status
2301
2302 if target_update or db_vim_update:
2303 if target_update == "DELETE":
2304 self._update_target(task, None)
2305 elif target_update == "COPY_VIM_INFO":
2306 self._update_target(task, ro_task["vim_info"])
2307 else:
2308 self._update_target(task, db_vim_update)
2309
2310 except Exception as e:
2311 if (
2312 isinstance(e, DbException)
2313 and e.http_code == HTTPStatus.NOT_FOUND
2314 ):
2315 # if the vnfrs or nsrs has been removed from database, this task must be removed
2316 self.logger.debug(
2317 "marking to delete task={}".format(task["task_id"])
2318 )
2319 self.tasks_to_delete.append(task)
2320 else:
2321 self.logger.error(
2322 "Unexpected exception at _update_target task={}: {}".format(
2323 task["task_id"], e
2324 ),
2325 exc_info=True,
2326 )
2327
2328 locked_at = ro_task["locked_at"]
2329
2330 if lock_object:
2331 locked_at = [
2332 lock_object["locked_at"],
2333 lock_object["locked_at"] + self.task_locked_time,
2334 ]
2335 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2336 # contain exactly locked_at + self.task_locked_time
2337 LockRenew.remove_lock_object(lock_object)
2338
2339 q_filter = {
2340 "_id": ro_task["_id"],
2341 "to_check_at": ro_task["to_check_at"],
2342 "locked_at": locked_at,
2343 }
2344 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2345 # outside this task (by ro_nbi) do not update it
2346 db_ro_task_update["locked_by"] = None
2347 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2348 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2349 db_ro_task_update["modified_at"] = now
2350 db_ro_task_update["to_check_at"] = next_check_at
2351
2352 """
2353 # Log RO tasks only when loglevel is DEBUG
2354 if self.logger.getEffectiveLevel() == logging.DEBUG:
2355 db_ro_task_update_log = db_ro_task_update.copy()
2356 db_ro_task_update_log["_id"] = q_filter["_id"]
2357 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2358 """
2359
2360 if not self.db.set_one(
2361 "ro_tasks",
2362 update_dict=db_ro_task_update,
2363 q_filter=q_filter,
2364 fail_on_empty=False,
2365 ):
2366 del db_ro_task_update["to_check_at"]
2367 del q_filter["to_check_at"]
2368 """
2369 # Log RO tasks only when loglevel is DEBUG
2370 if self.logger.getEffectiveLevel() == logging.DEBUG:
2371 self._log_ro_task(
2372 None,
2373 db_ro_task_update_log,
2374 None,
2375 "TASK_WF",
2376 "SET_TASK " + str(q_filter),
2377 )
2378 """
2379 self.db.set_one(
2380 "ro_tasks",
2381 q_filter=q_filter,
2382 update_dict=db_ro_task_update,
2383 fail_on_empty=True,
2384 )
2385 except DbException as e:
2386 self.logger.error(
2387 "ro_task={} Error updating database {}".format(ro_task_id, e)
2388 )
2389 except Exception as e:
2390 self.logger.error(
2391 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2392 )
2393
2394 def _update_target(self, task, ro_vim_item_update):
2395 table, _, temp = task["target_record"].partition(":")
2396 _id, _, path_vim_status = temp.partition(":")
2397 path_item = path_vim_status[: path_vim_status.rfind(".")]
2398 path_item = path_item[: path_item.rfind(".")]
2399 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2400 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2401
2402 if ro_vim_item_update:
2403 update_dict = {
2404 path_vim_status + "." + k: v
2405 for k, v in ro_vim_item_update.items()
2406 if k
2407 in (
2408 "vim_id",
2409 "vim_details",
2410 "vim_message",
2411 "vim_name",
2412 "vim_status",
2413 "interfaces",
2414 "interfaces_backup",
2415 )
2416 }
2417
2418 if path_vim_status.startswith("vdur."):
2419 # for backward compatibility, add vdur.name apart from vdur.vim_name
2420 if ro_vim_item_update.get("vim_name"):
2421 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2422
2423 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2424 if ro_vim_item_update.get("vim_id"):
2425 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2426
2427 # update general status
2428 if ro_vim_item_update.get("vim_status"):
2429 update_dict[path_item + ".status"] = ro_vim_item_update[
2430 "vim_status"
2431 ]
2432
2433 if ro_vim_item_update.get("interfaces"):
2434 path_interfaces = path_item + ".interfaces"
2435
2436 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2437 if iface:
2438 update_dict.update(
2439 {
2440 path_interfaces + ".{}.".format(i) + k: v
2441 for k, v in iface.items()
2442 if k in ("vlan", "compute_node", "pci")
2443 }
2444 )
2445
2446 # put ip_address and mac_address with ip-address and mac-address
2447 if iface.get("ip_address"):
2448 update_dict[
2449 path_interfaces + ".{}.".format(i) + "ip-address"
2450 ] = iface["ip_address"]
2451
2452 if iface.get("mac_address"):
2453 update_dict[
2454 path_interfaces + ".{}.".format(i) + "mac-address"
2455 ] = iface["mac_address"]
2456
2457 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2458 update_dict["ip-address"] = iface.get("ip_address").split(
2459 ";"
2460 )[0]
2461
2462 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2463 update_dict[path_item + ".ip-address"] = iface.get(
2464 "ip_address"
2465 ).split(";")[0]
2466
2467 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2468
2469 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2470 if ro_vim_item_update.get("interfaces"):
2471 search_key = path_vim_status + ".interfaces"
2472 if update_dict.get(search_key):
2473 interfaces_backup_update = {
2474 path_vim_status + ".interfaces_backup": update_dict[search_key]
2475 }
2476
2477 self.db.set_one(
2478 table,
2479 q_filter={"_id": _id},
2480 update_dict=interfaces_backup_update,
2481 )
2482
2483 else:
2484 update_dict = {path_item + ".status": "DELETED"}
2485 self.db.set_one(
2486 table,
2487 q_filter={"_id": _id},
2488 update_dict=update_dict,
2489 unset={path_vim_status: None},
2490 )
2491
2492 def _process_delete_db_tasks(self):
2493 """
2494 Delete task from database because vnfrs or nsrs or both have been deleted
2495 :return: None. Uses and modify self.tasks_to_delete
2496 """
2497 while self.tasks_to_delete:
2498 task = self.tasks_to_delete[0]
2499 vnfrs_deleted = None
2500 nsr_id = task["nsr_id"]
2501
2502 if task["target_record"].startswith("vnfrs:"):
2503 # check if nsrs is present
2504 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2505 vnfrs_deleted = task["target_record"].split(":")[1]
2506
2507 try:
2508 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2509 except Exception as e:
2510 self.logger.error(
2511 "Error deleting task={}: {}".format(task["task_id"], e)
2512 )
2513 self.tasks_to_delete.pop(0)
2514
2515 @staticmethod
2516 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2517 """
2518 Static method because it is called from osm_ng_ro.ns
2519 :param db: instance of database to use
2520 :param nsr_id: affected nsrs id
2521 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2522 :return: None, exception is fails
2523 """
2524 retries = 5
2525 for retry in range(retries):
2526 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2527 now = time.time()
2528 conflict = False
2529
2530 for ro_task in ro_tasks:
2531 db_update = {}
2532 to_delete_ro_task = True
2533
2534 for index, task in enumerate(ro_task["tasks"]):
2535 if not task:
2536 pass
2537 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2538 vnfrs_deleted
2539 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2540 ):
2541 db_update["tasks.{}".format(index)] = None
2542 else:
2543 # used by other nsr, ro_task cannot be deleted
2544 to_delete_ro_task = False
2545
2546 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2547 if to_delete_ro_task:
2548 if not db.del_one(
2549 "ro_tasks",
2550 q_filter={
2551 "_id": ro_task["_id"],
2552 "modified_at": ro_task["modified_at"],
2553 },
2554 fail_on_empty=False,
2555 ):
2556 conflict = True
2557 elif db_update:
2558 db_update["modified_at"] = now
2559 if not db.set_one(
2560 "ro_tasks",
2561 q_filter={
2562 "_id": ro_task["_id"],
2563 "modified_at": ro_task["modified_at"],
2564 },
2565 update_dict=db_update,
2566 fail_on_empty=False,
2567 ):
2568 conflict = True
2569 if not conflict:
2570 return
2571 else:
2572 raise NsWorkerException("Exceeded {} retries".format(retries))
2573
2574 def run(self):
2575 # load database
2576 self.logger.info("Starting")
2577 while True:
2578 # step 1: get commands from queue
2579 try:
2580 if self.vim_targets:
2581 task = self.task_queue.get(block=False)
2582 else:
2583 if not self.idle:
2584 self.logger.debug("enters in idle state")
2585 self.idle = True
2586 task = self.task_queue.get(block=True)
2587 self.idle = False
2588
2589 if task[0] == "terminate":
2590 break
2591 elif task[0] == "load_vim":
2592 self.logger.info("order to load vim {}".format(task[1]))
2593 self._load_vim(task[1])
2594 elif task[0] == "unload_vim":
2595 self.logger.info("order to unload vim {}".format(task[1]))
2596 self._unload_vim(task[1])
2597 elif task[0] == "reload_vim":
2598 self._reload_vim(task[1])
2599 elif task[0] == "check_vim":
2600 self.logger.info("order to check vim {}".format(task[1]))
2601 self._check_vim(task[1])
2602 continue
2603 except Exception as e:
2604 if isinstance(e, queue.Empty):
2605 pass
2606 else:
2607 self.logger.critical(
2608 "Error processing task: {}".format(e), exc_info=True
2609 )
2610
2611 # step 2: process pending_tasks, delete not needed tasks
2612 try:
2613 if self.tasks_to_delete:
2614 self._process_delete_db_tasks()
2615 busy = False
2616 """
2617 # Log RO tasks only when loglevel is DEBUG
2618 if self.logger.getEffectiveLevel() == logging.DEBUG:
2619 _ = self._get_db_all_tasks()
2620 """
2621 ro_task = self._get_db_task()
2622 if ro_task:
2623 self.logger.warning("Task to process: {}".format(ro_task))
2624 time.sleep(1)
2625 self._process_pending_tasks(ro_task)
2626 busy = True
2627 if not busy:
2628 time.sleep(5)
2629 except Exception as e:
2630 self.logger.critical(
2631 "Unexpected exception at run: " + str(e), exc_info=True
2632 )
2633
2634 self.logger.info("Finishing")