Fix black issues
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn, vimconn
43 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
44 from osm_ro_plugin.vim_dummy import VimDummyConnector
45 import yaml
46
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 }
217 self.logger.debug(
218 "task={} {} new-net={} created={}".format(
219 task_id, ro_task["target_id"], vim_net_id, created
220 )
221 )
222
223 return "BUILD", ro_vim_item_update
224 except (vimconn.VimConnException, NsWorkerException) as e:
225 self.logger.error(
226 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
227 )
228 ro_vim_item_update = {
229 "vim_status": "VIM_ERROR",
230 "created": created,
231 "vim_details": str(e),
232 }
233
234 return "FAILED", ro_vim_item_update
235
236 def refresh(self, ro_task):
237 """Call VIM to get network status"""
238 ro_task_id = ro_task["_id"]
239 target_vim = self.my_vims[ro_task["target_id"]]
240 vim_id = ro_task["vim_info"]["vim_id"]
241 net_to_refresh_list = [vim_id]
242
243 try:
244 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
245 vim_info = vim_dict[vim_id]
246
247 if vim_info["status"] == "ACTIVE":
248 task_status = "DONE"
249 elif vim_info["status"] == "BUILD":
250 task_status = "BUILD"
251 else:
252 task_status = "FAILED"
253 except vimconn.VimConnException as e:
254 # Mark all tasks at VIM_ERROR status
255 self.logger.error(
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id, ro_task["target_id"], vim_id, e
258 )
259 )
260 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
261 task_status = "FAILED"
262
263 ro_vim_item_update = {}
264 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
265 ro_vim_item_update["vim_status"] = vim_info["status"]
266
267 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
268 ro_vim_item_update["vim_name"] = vim_info.get("name")
269
270 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
272 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
273 elif vim_info["status"] == "DELETED":
274 ro_vim_item_update["vim_id"] = None
275 ro_vim_item_update["vim_details"] = "Deleted externally"
276 else:
277 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
278 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
279
280 if ro_vim_item_update:
281 self.logger.debug(
282 "ro_task={} {} get-net={}: status={} {}".format(
283 ro_task_id,
284 ro_task["target_id"],
285 vim_id,
286 ro_vim_item_update.get("vim_status"),
287 ro_vim_item_update.get("vim_details")
288 if ro_vim_item_update.get("vim_status") != "ACTIVE"
289 else "",
290 )
291 )
292
293 return task_status, ro_vim_item_update
294
295 def delete(self, ro_task, task_index):
296 task = ro_task["tasks"][task_index]
297 task_id = task["task_id"]
298 net_vim_id = ro_task["vim_info"]["vim_id"]
299 ro_vim_item_update_ok = {
300 "vim_status": "DELETED",
301 "created": False,
302 "vim_details": "DELETED",
303 "vim_id": None,
304 }
305
306 try:
307 if net_vim_id or ro_task["vim_info"]["created_items"]:
308 target_vim = self.my_vims[ro_task["target_id"]]
309 target_vim.delete_network(
310 net_vim_id, ro_task["vim_info"]["created_items"]
311 )
312 except vimconn.VimConnNotFoundException:
313 ro_vim_item_update_ok["vim_details"] = "already deleted"
314 except vimconn.VimConnException as e:
315 self.logger.error(
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task["_id"], ro_task["target_id"], net_vim_id, e
318 )
319 )
320 ro_vim_item_update = {
321 "vim_status": "VIM_ERROR",
322 "vim_details": "Error while deleting: {}".format(e),
323 }
324
325 return "FAILED", ro_vim_item_update
326
327 self.logger.debug(
328 "task={} {} del-net={} {}".format(
329 task_id,
330 ro_task["target_id"],
331 net_vim_id,
332 ro_vim_item_update_ok.get("vim_details", ""),
333 )
334 )
335
336 return "DONE", ro_vim_item_update_ok
337
338
339 class VimInteractionVdu(VimInteractionBase):
340 max_retries_inject_ssh_key = 20 # 20 times
341 time_retries_inject_ssh_key = 30 # wevery 30 seconds
342
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 created_items = {}
348 target_vim = self.my_vims[ro_task["target_id"]]
349
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391
392 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
393 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
394
395 ro_vim_item_update = {
396 "vim_id": vim_vm_id,
397 "vim_status": "BUILD",
398 "created": created,
399 "created_items": created_items,
400 "vim_details": None,
401 "interfaces_vim_ids": interfaces,
402 "interfaces": [],
403 }
404 self.logger.debug(
405 "task={} {} new-vm={} created={}".format(
406 task_id, ro_task["target_id"], vim_vm_id, created
407 )
408 )
409
410 return "BUILD", ro_vim_item_update
411 except (vimconn.VimConnException, NsWorkerException) as e:
412 self.logger.error(
413 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
414 )
415 ro_vim_item_update = {
416 "vim_status": "VIM_ERROR",
417 "created": created,
418 "vim_details": str(e),
419 }
420
421 return "FAILED", ro_vim_item_update
422
423 def delete(self, ro_task, task_index):
424 task = ro_task["tasks"][task_index]
425 task_id = task["task_id"]
426 vm_vim_id = ro_task["vim_info"]["vim_id"]
427 ro_vim_item_update_ok = {
428 "vim_status": "DELETED",
429 "created": False,
430 "vim_details": "DELETED",
431 "vim_id": None,
432 }
433
434 try:
435 if vm_vim_id or ro_task["vim_info"]["created_items"]:
436 target_vim = self.my_vims[ro_task["target_id"]]
437 target_vim.delete_vminstance(
438 vm_vim_id, ro_task["vim_info"]["created_items"]
439 )
440 except vimconn.VimConnNotFoundException:
441 ro_vim_item_update_ok["vim_details"] = "already deleted"
442 except vimconn.VimConnException as e:
443 self.logger.error(
444 "ro_task={} vim={} del-vm={}: {}".format(
445 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
446 )
447 )
448 ro_vim_item_update = {
449 "vim_status": "VIM_ERROR",
450 "vim_details": "Error while deleting: {}".format(e),
451 }
452
453 return "FAILED", ro_vim_item_update
454
455 self.logger.debug(
456 "task={} {} del-vm={} {}".format(
457 task_id,
458 ro_task["target_id"],
459 vm_vim_id,
460 ro_vim_item_update_ok.get("vim_details", ""),
461 )
462 )
463
464 return "DONE", ro_vim_item_update_ok
465
466 def refresh(self, ro_task):
467 """Call VIM to get vm status"""
468 ro_task_id = ro_task["_id"]
469 target_vim = self.my_vims[ro_task["target_id"]]
470 vim_id = ro_task["vim_info"]["vim_id"]
471
472 if not vim_id:
473 return None, None
474
475 vm_to_refresh_list = [vim_id]
476 try:
477 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
478 vim_info = vim_dict[vim_id]
479
480 if vim_info["status"] == "ACTIVE":
481 task_status = "DONE"
482 elif vim_info["status"] == "BUILD":
483 task_status = "BUILD"
484 else:
485 task_status = "FAILED"
486
487 # try to load and parse vim_information
488 try:
489 vim_info_info = yaml.safe_load(vim_info["vim_info"])
490 if vim_info_info.get("name"):
491 vim_info["name"] = vim_info_info["name"]
492 except Exception as vim_info_error:
493 self.logger.exception(
494 f"{vim_info_error} occured while getting the vim_info from yaml"
495 )
496 except vimconn.VimConnException as e:
497 # Mark all tasks at VIM_ERROR status
498 self.logger.error(
499 "ro_task={} vim={} get-vm={}: {}".format(
500 ro_task_id, ro_task["target_id"], vim_id, e
501 )
502 )
503 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
504 task_status = "FAILED"
505
506 ro_vim_item_update = {}
507
508 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
509 vim_interfaces = []
510 if vim_info.get("interfaces"):
511 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
512 iface = next(
513 (
514 iface
515 for iface in vim_info["interfaces"]
516 if vim_iface_id == iface["vim_interface_id"]
517 ),
518 None,
519 )
520 # if iface:
521 # iface.pop("vim_info", None)
522 vim_interfaces.append(iface)
523
524 task_create = next(
525 t
526 for t in ro_task["tasks"]
527 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
528 )
529 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
530 vim_interfaces[task_create["mgmt_vnf_interface"]][
531 "mgmt_vnf_interface"
532 ] = True
533
534 mgmt_vdu_iface = task_create.get(
535 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
536 )
537 if vim_interfaces:
538 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
539
540 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
541 ro_vim_item_update["interfaces"] = vim_interfaces
542
543 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
544 ro_vim_item_update["vim_status"] = vim_info["status"]
545
546 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
547 ro_vim_item_update["vim_name"] = vim_info.get("name")
548
549 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
550 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
551 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
552 elif vim_info["status"] == "DELETED":
553 ro_vim_item_update["vim_id"] = None
554 ro_vim_item_update["vim_details"] = "Deleted externally"
555 else:
556 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
557 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
558
559 if ro_vim_item_update:
560 self.logger.debug(
561 "ro_task={} {} get-vm={}: status={} {}".format(
562 ro_task_id,
563 ro_task["target_id"],
564 vim_id,
565 ro_vim_item_update.get("vim_status"),
566 ro_vim_item_update.get("vim_details")
567 if ro_vim_item_update.get("vim_status") != "ACTIVE"
568 else "",
569 )
570 )
571
572 return task_status, ro_vim_item_update
573
574 def exec(self, ro_task, task_index, task_depends):
575 task = ro_task["tasks"][task_index]
576 task_id = task["task_id"]
577 target_vim = self.my_vims[ro_task["target_id"]]
578 db_task_update = {"retries": 0}
579 retries = task.get("retries", 0)
580
581 try:
582 params = task["params"]
583 params_copy = deepcopy(params)
584 params_copy["ro_key"] = self.db.decrypt(
585 params_copy.pop("private_key"),
586 params_copy.pop("schema_version"),
587 params_copy.pop("salt"),
588 )
589 params_copy["ip_addr"] = params_copy.pop("ip_address")
590 target_vim.inject_user_key(**params_copy)
591 self.logger.debug(
592 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
593 )
594
595 return (
596 "DONE",
597 None,
598 db_task_update,
599 ) # params_copy["key"]
600 except (vimconn.VimConnException, NsWorkerException) as e:
601 retries += 1
602
603 self.logger.debug(traceback.format_exc())
604 if retries < self.max_retries_inject_ssh_key:
605 return (
606 "BUILD",
607 None,
608 {
609 "retries": retries,
610 "next_retry": self.time_retries_inject_ssh_key,
611 },
612 )
613
614 self.logger.error(
615 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
616 )
617 ro_vim_item_update = {"vim_details": str(e)}
618
619 return "FAILED", ro_vim_item_update, db_task_update
620
621
622 class VimInteractionImage(VimInteractionBase):
623 def new(self, ro_task, task_index, task_depends):
624 task = ro_task["tasks"][task_index]
625 task_id = task["task_id"]
626 created = False
627 created_items = {}
628 target_vim = self.my_vims[ro_task["target_id"]]
629
630 try:
631 # FIND
632 if task.get("find_params"):
633 vim_images = target_vim.get_image_list(**task["find_params"])
634
635 if not vim_images:
636 raise NsWorkerExceptionNotFound(
637 "Image not found with this criteria: '{}'".format(
638 task["find_params"]
639 )
640 )
641 elif len(vim_images) > 1:
642 raise NsWorkerException(
643 "More than one image found with this criteria: '{}'".format(
644 task["find_params"]
645 )
646 )
647 else:
648 vim_image_id = vim_images[0]["id"]
649
650 ro_vim_item_update = {
651 "vim_id": vim_image_id,
652 "vim_status": "DONE",
653 "created": created,
654 "created_items": created_items,
655 "vim_details": None,
656 }
657 self.logger.debug(
658 "task={} {} new-image={} created={}".format(
659 task_id, ro_task["target_id"], vim_image_id, created
660 )
661 )
662
663 return "DONE", ro_vim_item_update
664 except (NsWorkerException, vimconn.VimConnException) as e:
665 self.logger.error(
666 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
667 )
668 ro_vim_item_update = {
669 "vim_status": "VIM_ERROR",
670 "created": created,
671 "vim_details": str(e),
672 }
673
674 return "FAILED", ro_vim_item_update
675
676
677 class VimInteractionFlavor(VimInteractionBase):
678 def delete(self, ro_task, task_index):
679 task = ro_task["tasks"][task_index]
680 task_id = task["task_id"]
681 flavor_vim_id = ro_task["vim_info"]["vim_id"]
682 ro_vim_item_update_ok = {
683 "vim_status": "DELETED",
684 "created": False,
685 "vim_details": "DELETED",
686 "vim_id": None,
687 }
688
689 try:
690 if flavor_vim_id:
691 target_vim = self.my_vims[ro_task["target_id"]]
692 target_vim.delete_flavor(flavor_vim_id)
693 except vimconn.VimConnNotFoundException:
694 ro_vim_item_update_ok["vim_details"] = "already deleted"
695 except vimconn.VimConnException as e:
696 self.logger.error(
697 "ro_task={} vim={} del-flavor={}: {}".format(
698 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
699 )
700 )
701 ro_vim_item_update = {
702 "vim_status": "VIM_ERROR",
703 "vim_details": "Error while deleting: {}".format(e),
704 }
705
706 return "FAILED", ro_vim_item_update
707
708 self.logger.debug(
709 "task={} {} del-flavor={} {}".format(
710 task_id,
711 ro_task["target_id"],
712 flavor_vim_id,
713 ro_vim_item_update_ok.get("vim_details", ""),
714 )
715 )
716
717 return "DONE", ro_vim_item_update_ok
718
719 def new(self, ro_task, task_index, task_depends):
720 task = ro_task["tasks"][task_index]
721 task_id = task["task_id"]
722 created = False
723 created_items = {}
724 target_vim = self.my_vims[ro_task["target_id"]]
725
726 try:
727 # FIND
728 vim_flavor_id = None
729
730 if task.get("find_params"):
731 try:
732 flavor_data = task["find_params"]["flavor_data"]
733 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
734 except vimconn.VimConnNotFoundException:
735 self.logger.exception("VimConnNotFoundException occured.")
736
737 if not vim_flavor_id and task.get("params"):
738 # CREATE
739 flavor_data = task["params"]["flavor_data"]
740 vim_flavor_id = target_vim.new_flavor(flavor_data)
741 created = True
742
743 ro_vim_item_update = {
744 "vim_id": vim_flavor_id,
745 "vim_status": "DONE",
746 "created": created,
747 "created_items": created_items,
748 "vim_details": None,
749 }
750 self.logger.debug(
751 "task={} {} new-flavor={} created={}".format(
752 task_id, ro_task["target_id"], vim_flavor_id, created
753 )
754 )
755
756 return "DONE", ro_vim_item_update
757 except (vimconn.VimConnException, NsWorkerException) as e:
758 self.logger.error(
759 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
760 )
761 ro_vim_item_update = {
762 "vim_status": "VIM_ERROR",
763 "created": created,
764 "vim_details": str(e),
765 }
766
767 return "FAILED", ro_vim_item_update
768
769
770 class VimInteractionAffinityGroup(VimInteractionBase):
771 def delete(self, ro_task, task_index):
772 task = ro_task["tasks"][task_index]
773 task_id = task["task_id"]
774 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
775 ro_vim_item_update_ok = {
776 "vim_status": "DELETED",
777 "created": False,
778 "vim_details": "DELETED",
779 "vim_id": None,
780 }
781
782 try:
783 if affinity_group_vim_id:
784 target_vim = self.my_vims[ro_task["target_id"]]
785 target_vim.delete_affinity_group(affinity_group_vim_id)
786 except vimconn.VimConnNotFoundException:
787 ro_vim_item_update_ok["vim_details"] = "already deleted"
788 except vimconn.VimConnException as e:
789 self.logger.error(
790 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
791 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
792 )
793 )
794 ro_vim_item_update = {
795 "vim_status": "VIM_ERROR",
796 "vim_details": "Error while deleting: {}".format(e),
797 }
798
799 return "FAILED", ro_vim_item_update
800
801 self.logger.debug(
802 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
803 task_id,
804 ro_task["target_id"],
805 affinity_group_vim_id,
806 ro_vim_item_update_ok.get("vim_details", ""),
807 )
808 )
809
810 return "DONE", ro_vim_item_update_ok
811
812 def new(self, ro_task, task_index, task_depends):
813 task = ro_task["tasks"][task_index]
814 task_id = task["task_id"]
815 created = False
816 created_items = {}
817 target_vim = self.my_vims[ro_task["target_id"]]
818
819 try:
820 affinity_group_vim_id = None
821 affinity_group_data = None
822
823 if task.get("params"):
824 affinity_group_data = task["params"].get("affinity_group_data")
825
826 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
827 try:
828 param_affinity_group_id = task["params"]["affinity_group_data"].get(
829 "vim-affinity-group-id"
830 )
831 affinity_group_vim_id = target_vim.get_affinity_group(
832 param_affinity_group_id
833 ).get("id")
834 except vimconn.VimConnNotFoundException:
835 self.logger.error(
836 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
837 "could not be found at VIM. Creating a new one.".format(
838 task_id, ro_task["target_id"], param_affinity_group_id
839 )
840 )
841
842 if not affinity_group_vim_id and affinity_group_data:
843 affinity_group_vim_id = target_vim.new_affinity_group(
844 affinity_group_data
845 )
846 created = True
847
848 ro_vim_item_update = {
849 "vim_id": affinity_group_vim_id,
850 "vim_status": "DONE",
851 "created": created,
852 "created_items": created_items,
853 "vim_details": None,
854 }
855 self.logger.debug(
856 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
857 task_id, ro_task["target_id"], affinity_group_vim_id, created
858 )
859 )
860
861 return "DONE", ro_vim_item_update
862 except (vimconn.VimConnException, NsWorkerException) as e:
863 self.logger.error(
864 "task={} vim={} new-affinity-or-anti-affinity-group:"
865 " {}".format(task_id, ro_task["target_id"], e)
866 )
867 ro_vim_item_update = {
868 "vim_status": "VIM_ERROR",
869 "created": created,
870 "vim_details": str(e),
871 }
872
873 return "FAILED", ro_vim_item_update
874
875
876 class VimInteractionSdnNet(VimInteractionBase):
877 @staticmethod
878 def _match_pci(port_pci, mapping):
879 """
880 Check if port_pci matches with mapping
881 mapping can have brackets to indicate that several chars are accepted. e.g
882 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
883 :param port_pci: text
884 :param mapping: text, can contain brackets to indicate several chars are available
885 :return: True if matches, False otherwise
886 """
887 if not port_pci or not mapping:
888 return False
889 if port_pci == mapping:
890 return True
891
892 mapping_index = 0
893 pci_index = 0
894 while True:
895 bracket_start = mapping.find("[", mapping_index)
896
897 if bracket_start == -1:
898 break
899
900 bracket_end = mapping.find("]", bracket_start)
901 if bracket_end == -1:
902 break
903
904 length = bracket_start - mapping_index
905 if (
906 length
907 and port_pci[pci_index : pci_index + length]
908 != mapping[mapping_index:bracket_start]
909 ):
910 return False
911
912 if (
913 port_pci[pci_index + length]
914 not in mapping[bracket_start + 1 : bracket_end]
915 ):
916 return False
917
918 pci_index += length + 1
919 mapping_index = bracket_end + 1
920
921 if port_pci[pci_index:] != mapping[mapping_index:]:
922 return False
923
924 return True
925
926 def _get_interfaces(self, vlds_to_connect, vim_account_id):
927 """
928 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
929 :param vim_account_id:
930 :return:
931 """
932 interfaces = []
933
934 for vld in vlds_to_connect:
935 table, _, db_id = vld.partition(":")
936 db_id, _, vld = db_id.partition(":")
937 _, _, vld_id = vld.partition(".")
938
939 if table == "vnfrs":
940 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
941 iface_key = "vnf-vld-id"
942 else: # table == "nsrs"
943 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
944 iface_key = "ns-vld-id"
945
946 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
947
948 for db_vnfr in db_vnfrs:
949 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
950 for iface_index, interface in enumerate(vdur["interfaces"]):
951 if interface.get(iface_key) == vld_id and interface.get(
952 "type"
953 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
954 # only SR-IOV o PT
955 interface_ = interface.copy()
956 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
957 db_vnfr["_id"], vdu_index, iface_index
958 )
959
960 if vdur.get("status") == "ERROR":
961 interface_["status"] = "ERROR"
962
963 interfaces.append(interface_)
964
965 return interfaces
966
967 def refresh(self, ro_task):
968 # look for task create
969 task_create_index, _ = next(
970 i_t
971 for i_t in enumerate(ro_task["tasks"])
972 if i_t[1]
973 and i_t[1]["action"] == "CREATE"
974 and i_t[1]["status"] != "FINISHED"
975 )
976
977 return self.new(ro_task, task_create_index, None)
978
979 def new(self, ro_task, task_index, task_depends):
980 task = ro_task["tasks"][task_index]
981 task_id = task["task_id"]
982 target_vim = self.my_vims[ro_task["target_id"]]
983
984 sdn_net_id = ro_task["vim_info"]["vim_id"]
985
986 created_items = ro_task["vim_info"].get("created_items")
987 connected_ports = ro_task["vim_info"].get("connected_ports", [])
988 new_connected_ports = []
989 last_update = ro_task["vim_info"].get("last_update", 0)
990 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
991 error_list = []
992 created = ro_task["vim_info"].get("created", False)
993
994 try:
995 # CREATE
996 params = task["params"]
997 vlds_to_connect = params.get("vlds", [])
998 associated_vim = params.get("target_vim")
999 # external additional ports
1000 additional_ports = params.get("sdn-ports") or ()
1001 _, _, vim_account_id = (
1002 (None, None, None)
1003 if associated_vim is None
1004 else associated_vim.partition(":")
1005 )
1006
1007 if associated_vim:
1008 # get associated VIM
1009 if associated_vim not in self.db_vims:
1010 self.db_vims[associated_vim] = self.db.get_one(
1011 "vim_accounts", {"_id": vim_account_id}
1012 )
1013
1014 db_vim = self.db_vims[associated_vim]
1015
1016 # look for ports to connect
1017 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1018 # print(ports)
1019
1020 sdn_ports = []
1021 pending_ports = error_ports = 0
1022 vlan_used = None
1023 sdn_need_update = False
1024
1025 for port in ports:
1026 vlan_used = port.get("vlan") or vlan_used
1027
1028 # TODO. Do not connect if already done
1029 if not port.get("compute_node") or not port.get("pci"):
1030 if port.get("status") == "ERROR":
1031 error_ports += 1
1032 else:
1033 pending_ports += 1
1034 continue
1035
1036 pmap = None
1037 compute_node_mappings = next(
1038 (
1039 c
1040 for c in db_vim["config"].get("sdn-port-mapping", ())
1041 if c and c["compute_node"] == port["compute_node"]
1042 ),
1043 None,
1044 )
1045
1046 if compute_node_mappings:
1047 # process port_mapping pci of type 0000:af:1[01].[1357]
1048 pmap = next(
1049 (
1050 p
1051 for p in compute_node_mappings["ports"]
1052 if self._match_pci(port["pci"], p.get("pci"))
1053 ),
1054 None,
1055 )
1056
1057 if not pmap:
1058 if not db_vim["config"].get("mapping_not_needed"):
1059 error_list.append(
1060 "Port mapping not found for compute_node={} pci={}".format(
1061 port["compute_node"], port["pci"]
1062 )
1063 )
1064 continue
1065
1066 pmap = {}
1067
1068 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1069 new_port = {
1070 "service_endpoint_id": pmap.get("service_endpoint_id")
1071 or service_endpoint_id,
1072 "service_endpoint_encapsulation_type": "dot1q"
1073 if port["type"] == "SR-IOV"
1074 else None,
1075 "service_endpoint_encapsulation_info": {
1076 "vlan": port.get("vlan"),
1077 "mac": port.get("mac-address"),
1078 "device_id": pmap.get("device_id") or port["compute_node"],
1079 "device_interface_id": pmap.get("device_interface_id")
1080 or port["pci"],
1081 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1082 "switch_port": pmap.get("switch_port"),
1083 "service_mapping_info": pmap.get("service_mapping_info"),
1084 },
1085 }
1086
1087 # TODO
1088 # if port["modified_at"] > last_update:
1089 # sdn_need_update = True
1090 new_connected_ports.append(port["id"]) # TODO
1091 sdn_ports.append(new_port)
1092
1093 if error_ports:
1094 error_list.append(
1095 "{} interfaces have not been created as VDU is on ERROR status".format(
1096 error_ports
1097 )
1098 )
1099
1100 # connect external ports
1101 for index, additional_port in enumerate(additional_ports):
1102 additional_port_id = additional_port.get(
1103 "service_endpoint_id"
1104 ) or "external-{}".format(index)
1105 sdn_ports.append(
1106 {
1107 "service_endpoint_id": additional_port_id,
1108 "service_endpoint_encapsulation_type": additional_port.get(
1109 "service_endpoint_encapsulation_type", "dot1q"
1110 ),
1111 "service_endpoint_encapsulation_info": {
1112 "vlan": additional_port.get("vlan") or vlan_used,
1113 "mac": additional_port.get("mac_address"),
1114 "device_id": additional_port.get("device_id"),
1115 "device_interface_id": additional_port.get(
1116 "device_interface_id"
1117 ),
1118 "switch_dpid": additional_port.get("switch_dpid")
1119 or additional_port.get("switch_id"),
1120 "switch_port": additional_port.get("switch_port"),
1121 "service_mapping_info": additional_port.get(
1122 "service_mapping_info"
1123 ),
1124 },
1125 }
1126 )
1127 new_connected_ports.append(additional_port_id)
1128 sdn_info = ""
1129
1130 # if there are more ports to connect or they have been modified, call create/update
1131 if error_list:
1132 sdn_status = "ERROR"
1133 sdn_info = "; ".join(error_list)
1134 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1135 last_update = time.time()
1136
1137 if not sdn_net_id:
1138 if len(sdn_ports) < 2:
1139 sdn_status = "ACTIVE"
1140
1141 if not pending_ports:
1142 self.logger.debug(
1143 "task={} {} new-sdn-net done, less than 2 ports".format(
1144 task_id, ro_task["target_id"]
1145 )
1146 )
1147 else:
1148 net_type = params.get("type") or "ELAN"
1149 (
1150 sdn_net_id,
1151 created_items,
1152 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1153 created = True
1154 self.logger.debug(
1155 "task={} {} new-sdn-net={} created={}".format(
1156 task_id, ro_task["target_id"], sdn_net_id, created
1157 )
1158 )
1159 else:
1160 created_items = target_vim.edit_connectivity_service(
1161 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1162 )
1163 created = True
1164 self.logger.debug(
1165 "task={} {} update-sdn-net={} created={}".format(
1166 task_id, ro_task["target_id"], sdn_net_id, created
1167 )
1168 )
1169
1170 connected_ports = new_connected_ports
1171 elif sdn_net_id:
1172 wim_status_dict = target_vim.get_connectivity_service_status(
1173 sdn_net_id, conn_info=created_items
1174 )
1175 sdn_status = wim_status_dict["sdn_status"]
1176
1177 if wim_status_dict.get("sdn_info"):
1178 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1179
1180 if wim_status_dict.get("error_msg"):
1181 sdn_info = wim_status_dict.get("error_msg") or ""
1182
1183 if pending_ports:
1184 if sdn_status != "ERROR":
1185 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1186 len(ports) - pending_ports, len(ports)
1187 )
1188
1189 if sdn_status == "ACTIVE":
1190 sdn_status = "BUILD"
1191
1192 ro_vim_item_update = {
1193 "vim_id": sdn_net_id,
1194 "vim_status": sdn_status,
1195 "created": created,
1196 "created_items": created_items,
1197 "connected_ports": connected_ports,
1198 "vim_details": sdn_info,
1199 "last_update": last_update,
1200 }
1201
1202 return sdn_status, ro_vim_item_update
1203 except Exception as e:
1204 self.logger.error(
1205 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1206 exc_info=not isinstance(
1207 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1208 ),
1209 )
1210 ro_vim_item_update = {
1211 "vim_status": "VIM_ERROR",
1212 "created": created,
1213 "vim_details": str(e),
1214 }
1215
1216 return "FAILED", ro_vim_item_update
1217
1218 def delete(self, ro_task, task_index):
1219 task = ro_task["tasks"][task_index]
1220 task_id = task["task_id"]
1221 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1222 ro_vim_item_update_ok = {
1223 "vim_status": "DELETED",
1224 "created": False,
1225 "vim_details": "DELETED",
1226 "vim_id": None,
1227 }
1228
1229 try:
1230 if sdn_vim_id:
1231 target_vim = self.my_vims[ro_task["target_id"]]
1232 target_vim.delete_connectivity_service(
1233 sdn_vim_id, ro_task["vim_info"].get("created_items")
1234 )
1235
1236 except Exception as e:
1237 if (
1238 isinstance(e, sdnconn.SdnConnectorError)
1239 and e.http_code == HTTPStatus.NOT_FOUND.value
1240 ):
1241 ro_vim_item_update_ok["vim_details"] = "already deleted"
1242 else:
1243 self.logger.error(
1244 "ro_task={} vim={} del-sdn-net={}: {}".format(
1245 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1246 ),
1247 exc_info=not isinstance(
1248 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1249 ),
1250 )
1251 ro_vim_item_update = {
1252 "vim_status": "VIM_ERROR",
1253 "vim_details": "Error while deleting: {}".format(e),
1254 }
1255
1256 return "FAILED", ro_vim_item_update
1257
1258 self.logger.debug(
1259 "task={} {} del-sdn-net={} {}".format(
1260 task_id,
1261 ro_task["target_id"],
1262 sdn_vim_id,
1263 ro_vim_item_update_ok.get("vim_details", ""),
1264 )
1265 )
1266
1267 return "DONE", ro_vim_item_update_ok
1268
1269
1270 class ConfigValidate:
1271 def __init__(self, config: Dict):
1272 self.conf = config
1273
1274 @property
1275 def active(self):
1276 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1277 if (
1278 self.conf["period"]["refresh_active"] >= 60
1279 or self.conf["period"]["refresh_active"] == -1
1280 ):
1281 return self.conf["period"]["refresh_active"]
1282
1283 return 60
1284
1285 @property
1286 def build(self):
1287 return self.conf["period"]["refresh_build"]
1288
1289 @property
1290 def image(self):
1291 return self.conf["period"]["refresh_image"]
1292
1293 @property
1294 def error(self):
1295 return self.conf["period"]["refresh_error"]
1296
1297 @property
1298 def queue_size(self):
1299 return self.conf["period"]["queue_size"]
1300
1301
1302 class NsWorker(threading.Thread):
1303 def __init__(self, worker_index, config, plugins, db):
1304 """
1305
1306 :param worker_index: thread index
1307 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1308 :param plugins: global shared dict with the loaded plugins
1309 :param db: database class instance to use
1310 """
1311 threading.Thread.__init__(self)
1312 self.config = config
1313 self.plugins = plugins
1314 self.plugin_name = "unknown"
1315 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1316 self.worker_index = worker_index
1317 # refresh periods for created items
1318 self.refresh_config = ConfigValidate(config)
1319 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1320 # targetvim: vimplugin class
1321 self.my_vims = {}
1322 # targetvim: vim information from database
1323 self.db_vims = {}
1324 # targetvim list
1325 self.vim_targets = []
1326 self.my_id = config["process_id"] + ":" + str(worker_index)
1327 self.db = db
1328 self.item2class = {
1329 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1330 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1331 "image": VimInteractionImage(
1332 self.db, self.my_vims, self.db_vims, self.logger
1333 ),
1334 "flavor": VimInteractionFlavor(
1335 self.db, self.my_vims, self.db_vims, self.logger
1336 ),
1337 "sdn_net": VimInteractionSdnNet(
1338 self.db, self.my_vims, self.db_vims, self.logger
1339 ),
1340 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1341 self.db, self.my_vims, self.db_vims, self.logger
1342 ),
1343 }
1344 self.time_last_task_processed = None
1345 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1346 self.tasks_to_delete = []
1347 # it is idle when there are not vim_targets associated
1348 self.idle = True
1349 self.task_locked_time = config["global"]["task_locked_time"]
1350
1351 def insert_task(self, task):
1352 try:
1353 self.task_queue.put(task, False)
1354 return None
1355 except queue.Full:
1356 raise NsWorkerException("timeout inserting a task")
1357
1358 def terminate(self):
1359 self.insert_task("exit")
1360
1361 def del_task(self, task):
1362 with self.task_lock:
1363 if task["status"] == "SCHEDULED":
1364 task["status"] = "SUPERSEDED"
1365 return True
1366 else: # task["status"] == "processing"
1367 self.task_lock.release()
1368 return False
1369
1370 def _process_vim_config(self, target_id, db_vim):
1371 """
1372 Process vim config, creating vim configuration files as ca_cert
1373 :param target_id: vim/sdn/wim + id
1374 :param db_vim: Vim dictionary obtained from database
1375 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1376 """
1377 if not db_vim.get("config"):
1378 return
1379
1380 file_name = ""
1381
1382 try:
1383 if db_vim["config"].get("ca_cert_content"):
1384 file_name = "{}:{}".format(target_id, self.worker_index)
1385
1386 try:
1387 mkdir(file_name)
1388 except FileExistsError:
1389 self.logger.exception(
1390 "FileExistsError occured while processing vim_config."
1391 )
1392
1393 file_name = file_name + "/ca_cert"
1394
1395 with open(file_name, "w") as f:
1396 f.write(db_vim["config"]["ca_cert_content"])
1397 del db_vim["config"]["ca_cert_content"]
1398 db_vim["config"]["ca_cert"] = file_name
1399 except Exception as e:
1400 raise NsWorkerException(
1401 "Error writing to file '{}': {}".format(file_name, e)
1402 )
1403
1404 def _load_plugin(self, name, type="vim"):
1405 # type can be vim or sdn
1406 if "rovim_dummy" not in self.plugins:
1407 self.plugins["rovim_dummy"] = VimDummyConnector
1408
1409 if "rosdn_dummy" not in self.plugins:
1410 self.plugins["rosdn_dummy"] = SdnDummyConnector
1411
1412 if name in self.plugins:
1413 return self.plugins[name]
1414
1415 try:
1416 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1417 self.plugins[name] = ep.load()
1418 except Exception as e:
1419 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1420
1421 if name and name not in self.plugins:
1422 raise NsWorkerException(
1423 "Plugin 'osm_{n}' has not been installed".format(n=name)
1424 )
1425
1426 return self.plugins[name]
1427
1428 def _unload_vim(self, target_id):
1429 """
1430 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1431 :param target_id: Contains type:_id; where type can be 'vim', ...
1432 :return: None.
1433 """
1434 try:
1435 self.db_vims.pop(target_id, None)
1436 self.my_vims.pop(target_id, None)
1437
1438 if target_id in self.vim_targets:
1439 self.vim_targets.remove(target_id)
1440
1441 self.logger.info("Unloaded {}".format(target_id))
1442 rmtree("{}:{}".format(target_id, self.worker_index))
1443 except FileNotFoundError:
1444 # This is raised by rmtree if folder does not exist.
1445 self.logger.exception("FileNotFoundError occured while unloading VIM.")
1446 except Exception as e:
1447 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1448
1449 def _check_vim(self, target_id):
1450 """
1451 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1452 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1453 :return: None.
1454 """
1455 target, _, _id = target_id.partition(":")
1456 now = time.time()
1457 update_dict = {}
1458 unset_dict = {}
1459 op_text = ""
1460 step = ""
1461 loaded = target_id in self.vim_targets
1462 target_database = (
1463 "vim_accounts"
1464 if target == "vim"
1465 else "wim_accounts"
1466 if target == "wim"
1467 else "sdns"
1468 )
1469
1470 try:
1471 step = "Getting {} from db".format(target_id)
1472 db_vim = self.db.get_one(target_database, {"_id": _id})
1473
1474 for op_index, operation in enumerate(
1475 db_vim["_admin"].get("operations", ())
1476 ):
1477 if operation["operationState"] != "PROCESSING":
1478 continue
1479
1480 locked_at = operation.get("locked_at")
1481
1482 if locked_at is not None and locked_at >= now - self.task_locked_time:
1483 # some other thread is doing this operation
1484 return
1485
1486 # lock
1487 op_text = "_admin.operations.{}.".format(op_index)
1488
1489 if not self.db.set_one(
1490 target_database,
1491 q_filter={
1492 "_id": _id,
1493 op_text + "operationState": "PROCESSING",
1494 op_text + "locked_at": locked_at,
1495 },
1496 update_dict={
1497 op_text + "locked_at": now,
1498 "admin.current_operation": op_index,
1499 },
1500 fail_on_empty=False,
1501 ):
1502 return
1503
1504 unset_dict[op_text + "locked_at"] = None
1505 unset_dict["current_operation"] = None
1506 step = "Loading " + target_id
1507 error_text = self._load_vim(target_id)
1508
1509 if not error_text:
1510 step = "Checking connectivity"
1511
1512 if target == "vim":
1513 self.my_vims[target_id].check_vim_connectivity()
1514 else:
1515 self.my_vims[target_id].check_credentials()
1516
1517 update_dict["_admin.operationalState"] = "ENABLED"
1518 update_dict["_admin.detailed-status"] = ""
1519 unset_dict[op_text + "detailed-status"] = None
1520 update_dict[op_text + "operationState"] = "COMPLETED"
1521
1522 return
1523
1524 except Exception as e:
1525 error_text = "{}: {}".format(step, e)
1526 self.logger.error("{} for {}: {}".format(step, target_id, e))
1527
1528 finally:
1529 if update_dict or unset_dict:
1530 if error_text:
1531 update_dict[op_text + "operationState"] = "FAILED"
1532 update_dict[op_text + "detailed-status"] = error_text
1533 unset_dict.pop(op_text + "detailed-status", None)
1534 update_dict["_admin.operationalState"] = "ERROR"
1535 update_dict["_admin.detailed-status"] = error_text
1536
1537 if op_text:
1538 update_dict[op_text + "statusEnteredTime"] = now
1539
1540 self.db.set_one(
1541 target_database,
1542 q_filter={"_id": _id},
1543 update_dict=update_dict,
1544 unset=unset_dict,
1545 fail_on_empty=False,
1546 )
1547
1548 if not loaded:
1549 self._unload_vim(target_id)
1550
1551 def _reload_vim(self, target_id):
1552 if target_id in self.vim_targets:
1553 self._load_vim(target_id)
1554 else:
1555 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1556 # just remove it to force load again next time it is needed
1557 self.db_vims.pop(target_id, None)
1558
1559 def _load_vim(self, target_id):
1560 """
1561 Load or reload a vim_account, sdn_controller or wim_account.
1562 Read content from database, load the plugin if not loaded.
1563 In case of error loading the plugin, it load a failing VIM_connector
1564 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1565 :param target_id: Contains type:_id; where type can be 'vim', ...
1566 :return: None if ok, descriptive text if error
1567 """
1568 target, _, _id = target_id.partition(":")
1569 target_database = (
1570 "vim_accounts"
1571 if target == "vim"
1572 else "wim_accounts"
1573 if target == "wim"
1574 else "sdns"
1575 )
1576 plugin_name = ""
1577 vim = None
1578
1579 try:
1580 step = "Getting {}={} from db".format(target, _id)
1581 # TODO process for wim, sdnc, ...
1582 vim = self.db.get_one(target_database, {"_id": _id})
1583
1584 # if deep_get(vim, "config", "sdn-controller"):
1585 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1586 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1587
1588 step = "Decrypting password"
1589 schema_version = vim.get("schema_version")
1590 self.db.encrypt_decrypt_fields(
1591 vim,
1592 "decrypt",
1593 fields=("password", "secret"),
1594 schema_version=schema_version,
1595 salt=_id,
1596 )
1597 self._process_vim_config(target_id, vim)
1598
1599 if target == "vim":
1600 plugin_name = "rovim_" + vim["vim_type"]
1601 step = "Loading plugin '{}'".format(plugin_name)
1602 vim_module_conn = self._load_plugin(plugin_name)
1603 step = "Loading {}'".format(target_id)
1604 self.my_vims[target_id] = vim_module_conn(
1605 uuid=vim["_id"],
1606 name=vim["name"],
1607 tenant_id=vim.get("vim_tenant_id"),
1608 tenant_name=vim.get("vim_tenant_name"),
1609 url=vim["vim_url"],
1610 url_admin=None,
1611 user=vim["vim_user"],
1612 passwd=vim["vim_password"],
1613 config=vim.get("config") or {},
1614 persistent_info={},
1615 )
1616 else: # sdn
1617 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1618 step = "Loading plugin '{}'".format(plugin_name)
1619 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1620 step = "Loading {}'".format(target_id)
1621 wim = deepcopy(vim)
1622 wim_config = wim.pop("config", {}) or {}
1623 wim["uuid"] = wim["_id"]
1624 if "url" in wim and "wim_url" not in wim:
1625 wim["wim_url"] = wim["url"]
1626 elif "url" not in wim and "wim_url" in wim:
1627 wim["url"] = wim["wim_url"]
1628
1629 if wim.get("dpid"):
1630 wim_config["dpid"] = wim.pop("dpid")
1631
1632 if wim.get("switch_id"):
1633 wim_config["switch_id"] = wim.pop("switch_id")
1634
1635 # wim, wim_account, config
1636 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1637 self.db_vims[target_id] = vim
1638 self.error_status = None
1639
1640 self.logger.info(
1641 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1642 )
1643 except Exception as e:
1644 self.logger.error(
1645 "Cannot load {} plugin={}: {} {}".format(
1646 target_id, plugin_name, step, e
1647 )
1648 )
1649
1650 self.db_vims[target_id] = vim or {}
1651 self.db_vims[target_id] = FailingConnector(str(e))
1652 error_status = "{} Error: {}".format(step, e)
1653
1654 return error_status
1655 finally:
1656 if target_id not in self.vim_targets:
1657 self.vim_targets.append(target_id)
1658
1659 def _get_db_task(self):
1660 """
1661 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1662 :return: None
1663 """
1664 now = time.time()
1665
1666 if not self.time_last_task_processed:
1667 self.time_last_task_processed = now
1668
1669 try:
1670 while True:
1671 """
1672 # Log RO tasks only when loglevel is DEBUG
1673 if self.logger.getEffectiveLevel() == logging.DEBUG:
1674 self._log_ro_task(
1675 None,
1676 None,
1677 None,
1678 "TASK_WF",
1679 "task_locked_time="
1680 + str(self.task_locked_time)
1681 + " "
1682 + "time_last_task_processed="
1683 + str(self.time_last_task_processed)
1684 + " "
1685 + "now="
1686 + str(now),
1687 )
1688 """
1689 locked = self.db.set_one(
1690 "ro_tasks",
1691 q_filter={
1692 "target_id": self.vim_targets,
1693 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1694 "locked_at.lt": now - self.task_locked_time,
1695 "to_check_at.lt": self.time_last_task_processed,
1696 "to_check_at.gt": -1,
1697 },
1698 update_dict={"locked_by": self.my_id, "locked_at": now},
1699 fail_on_empty=False,
1700 )
1701
1702 if locked:
1703 # read and return
1704 ro_task = self.db.get_one(
1705 "ro_tasks",
1706 q_filter={
1707 "target_id": self.vim_targets,
1708 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1709 "locked_at": now,
1710 },
1711 )
1712 return ro_task
1713
1714 if self.time_last_task_processed == now:
1715 self.time_last_task_processed = None
1716 return None
1717 else:
1718 self.time_last_task_processed = now
1719 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1720
1721 except DbException as e:
1722 self.logger.error("Database exception at _get_db_task: {}".format(e))
1723 except Exception as e:
1724 self.logger.critical(
1725 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1726 )
1727
1728 return None
1729
1730 def _get_db_all_tasks(self):
1731 """
1732 Read all content of table ro_tasks to log it
1733 :return: None
1734 """
1735 try:
1736 # Checking the content of the BD:
1737
1738 # read and return
1739 ro_task = self.db.get_list("ro_tasks")
1740 for rt in ro_task:
1741 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1742 return ro_task
1743
1744 except DbException as e:
1745 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1746 except Exception as e:
1747 self.logger.critical(
1748 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1749 )
1750
1751 return None
1752
1753 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1754 """
1755 Generate a log with the following format:
1756
1757 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1758 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1759 task_array_index;task_id;task_action;task_item;task_args
1760
1761 Example:
1762
1763 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1764 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1765 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1766 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1767 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1768 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1769 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1770 """
1771 try:
1772 line = []
1773 i = 0
1774 if ro_task is not None and isinstance(ro_task, dict):
1775 for t in ro_task["tasks"]:
1776 line.clear()
1777 line.append(mark)
1778 line.append(event)
1779 line.append(ro_task.get("_id", ""))
1780 line.append(str(ro_task.get("locked_at", "")))
1781 line.append(str(ro_task.get("modified_at", "")))
1782 line.append(str(ro_task.get("created_at", "")))
1783 line.append(str(ro_task.get("to_check_at", "")))
1784 line.append(str(ro_task.get("locked_by", "")))
1785 line.append(str(ro_task.get("target_id", "")))
1786 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1787 line.append(str(ro_task.get("vim_info", "")))
1788 line.append(str(ro_task.get("tasks", "")))
1789 if isinstance(t, dict):
1790 line.append(str(t.get("status", "")))
1791 line.append(str(t.get("action_id", "")))
1792 line.append(str(i))
1793 line.append(str(t.get("task_id", "")))
1794 line.append(str(t.get("action", "")))
1795 line.append(str(t.get("item", "")))
1796 line.append(str(t.get("find_params", "")))
1797 line.append(str(t.get("params", "")))
1798 else:
1799 line.extend([""] * 2)
1800 line.append(str(i))
1801 line.extend([""] * 5)
1802
1803 i += 1
1804 self.logger.debug(";".join(line))
1805 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1806 i = 0
1807 while True:
1808 st = "tasks.{}.status".format(i)
1809 if st not in db_ro_task_update:
1810 break
1811 line.clear()
1812 line.append(mark)
1813 line.append(event)
1814 line.append(db_ro_task_update.get("_id", ""))
1815 line.append(str(db_ro_task_update.get("locked_at", "")))
1816 line.append(str(db_ro_task_update.get("modified_at", "")))
1817 line.append("")
1818 line.append(str(db_ro_task_update.get("to_check_at", "")))
1819 line.append(str(db_ro_task_update.get("locked_by", "")))
1820 line.append("")
1821 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1822 line.append("")
1823 line.append(str(db_ro_task_update.get("vim_info", "")))
1824 line.append(str(str(db_ro_task_update).count(".status")))
1825 line.append(db_ro_task_update.get(st, ""))
1826 line.append("")
1827 line.append(str(i))
1828 line.extend([""] * 3)
1829 i += 1
1830 self.logger.debug(";".join(line))
1831
1832 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1833 line.clear()
1834 line.append(mark)
1835 line.append(event)
1836 line.append(db_ro_task_delete.get("_id", ""))
1837 line.append("")
1838 line.append(db_ro_task_delete.get("modified_at", ""))
1839 line.extend([""] * 13)
1840 self.logger.debug(";".join(line))
1841
1842 else:
1843 line.clear()
1844 line.append(mark)
1845 line.append(event)
1846 line.extend([""] * 16)
1847 self.logger.debug(";".join(line))
1848
1849 except Exception as e:
1850 self.logger.error("Error logging ro_task: {}".format(e))
1851
1852 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1853 """
1854 Determine if this task need to be done or superseded
1855 :return: None
1856 """
1857 my_task = ro_task["tasks"][task_index]
1858 task_id = my_task["task_id"]
1859 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1860 "created_items", False
1861 )
1862
1863 if my_task["status"] == "FAILED":
1864 return None, None # TODO need to be retry??
1865
1866 try:
1867 for index, task in enumerate(ro_task["tasks"]):
1868 if index == task_index or not task:
1869 continue # own task
1870
1871 if (
1872 my_task["target_record"] == task["target_record"]
1873 and task["action"] == "CREATE"
1874 ):
1875 # set to finished
1876 db_update["tasks.{}.status".format(index)] = task[
1877 "status"
1878 ] = "FINISHED"
1879 elif task["action"] == "CREATE" and task["status"] not in (
1880 "FINISHED",
1881 "SUPERSEDED",
1882 ):
1883 needed_delete = False
1884
1885 if needed_delete:
1886 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1887 else:
1888 return "SUPERSEDED", None
1889 except Exception as e:
1890 if not isinstance(e, NsWorkerException):
1891 self.logger.critical(
1892 "Unexpected exception at _delete_task task={}: {}".format(
1893 task_id, e
1894 ),
1895 exc_info=True,
1896 )
1897
1898 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1899
1900 def _create_task(self, ro_task, task_index, task_depends, db_update):
1901 """
1902 Determine if this task need to create something at VIM
1903 :return: None
1904 """
1905 my_task = ro_task["tasks"][task_index]
1906 task_id = my_task["task_id"]
1907 task_status = None
1908
1909 if my_task["status"] == "FAILED":
1910 return None, None # TODO need to be retry??
1911 elif my_task["status"] == "SCHEDULED":
1912 # check if already created by another task
1913 for index, task in enumerate(ro_task["tasks"]):
1914 if index == task_index or not task:
1915 continue # own task
1916
1917 if task["action"] == "CREATE" and task["status"] not in (
1918 "SCHEDULED",
1919 "FINISHED",
1920 "SUPERSEDED",
1921 ):
1922 return task["status"], "COPY_VIM_INFO"
1923
1924 try:
1925 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1926 ro_task, task_index, task_depends
1927 )
1928 # TODO update other CREATE tasks
1929 except Exception as e:
1930 if not isinstance(e, NsWorkerException):
1931 self.logger.error(
1932 "Error executing task={}: {}".format(task_id, e), exc_info=True
1933 )
1934
1935 task_status = "FAILED"
1936 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1937 # TODO update ro_vim_item_update
1938
1939 return task_status, ro_vim_item_update
1940 else:
1941 return None, None
1942
1943 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1944 """
1945 Look for dependency task
1946 :param task_id: Can be one of
1947 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1948 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1949 3. task.task_id: "<action_id>:number"
1950 :param ro_task:
1951 :param target_id:
1952 :return: database ro_task plus index of task
1953 """
1954 if (
1955 task_id.startswith("vim:")
1956 or task_id.startswith("sdn:")
1957 or task_id.startswith("wim:")
1958 ):
1959 target_id, _, task_id = task_id.partition(" ")
1960
1961 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1962 ro_task_dependency = self.db.get_one(
1963 "ro_tasks",
1964 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1965 fail_on_empty=False,
1966 )
1967
1968 if ro_task_dependency:
1969 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1970 if task["target_record_id"] == task_id:
1971 return ro_task_dependency, task_index
1972
1973 else:
1974 if ro_task:
1975 for task_index, task in enumerate(ro_task["tasks"]):
1976 if task and task["task_id"] == task_id:
1977 return ro_task, task_index
1978
1979 ro_task_dependency = self.db.get_one(
1980 "ro_tasks",
1981 q_filter={
1982 "tasks.ANYINDEX.task_id": task_id,
1983 "tasks.ANYINDEX.target_record.ne": None,
1984 },
1985 fail_on_empty=False,
1986 )
1987
1988 if ro_task_dependency:
1989 for task_index, task in ro_task_dependency["tasks"]:
1990 if task["task_id"] == task_id:
1991 return ro_task_dependency, task_index
1992 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1993
1994 def update_vm_refresh(self):
1995 """Enables the VM status updates if self.refresh_config.active parameter
1996 is not -1 and than updates the DB accordingly
1997
1998 """
1999 try:
2000 self.logger.debug("Checking if VM status update config")
2001 next_refresh = time.time()
2002 if self.refresh_config.active == -1:
2003 next_refresh = -1
2004 else:
2005 next_refresh += self.refresh_config.active
2006
2007 if next_refresh != -1:
2008 db_ro_task_update = {}
2009 now = time.time()
2010 next_check_at = now + (24 * 60 * 60)
2011 next_check_at = min(next_check_at, next_refresh)
2012 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2013 db_ro_task_update["to_check_at"] = next_check_at
2014
2015 self.logger.debug(
2016 "Finding tasks which to be updated to enable VM status updates"
2017 )
2018 refresh_tasks = self.db.get_list(
2019 "ro_tasks",
2020 q_filter={
2021 "tasks.status": "DONE",
2022 "to_check_at.lt": 0,
2023 },
2024 )
2025 self.logger.debug("Updating tasks to change the to_check_at status")
2026 for task in refresh_tasks:
2027 q_filter = {
2028 "_id": task["_id"],
2029 }
2030 self.db.set_one(
2031 "ro_tasks",
2032 q_filter=q_filter,
2033 update_dict=db_ro_task_update,
2034 fail_on_empty=True,
2035 )
2036
2037 except Exception as e:
2038 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2039
2040 def _process_pending_tasks(self, ro_task):
2041 ro_task_id = ro_task["_id"]
2042 now = time.time()
2043 # one day
2044 next_check_at = now + (24 * 60 * 60)
2045 db_ro_task_update = {}
2046
2047 def _update_refresh(new_status):
2048 # compute next_refresh
2049 nonlocal task
2050 nonlocal next_check_at
2051 nonlocal db_ro_task_update
2052 nonlocal ro_task
2053
2054 next_refresh = time.time()
2055
2056 if task["item"] in ("image", "flavor"):
2057 next_refresh += self.refresh_config.image
2058 elif new_status == "BUILD":
2059 next_refresh += self.refresh_config.build
2060 elif new_status == "DONE":
2061 if self.refresh_config.active == -1:
2062 next_refresh = -1
2063 else:
2064 next_refresh += self.refresh_config.active
2065 else:
2066 next_refresh += self.refresh_config.error
2067
2068 next_check_at = min(next_check_at, next_refresh)
2069 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2070 ro_task["vim_info"]["refresh_at"] = next_refresh
2071
2072 try:
2073 """
2074 # Log RO tasks only when loglevel is DEBUG
2075 if self.logger.getEffectiveLevel() == logging.DEBUG:
2076 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2077 """
2078 # Check if vim status refresh is enabled again
2079 self.update_vm_refresh()
2080 # 0: get task_status_create
2081 lock_object = None
2082 task_status_create = None
2083 task_create = next(
2084 (
2085 t
2086 for t in ro_task["tasks"]
2087 if t
2088 and t["action"] == "CREATE"
2089 and t["status"] in ("BUILD", "DONE")
2090 ),
2091 None,
2092 )
2093
2094 if task_create:
2095 task_status_create = task_create["status"]
2096
2097 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2098 for task_action in ("DELETE", "CREATE", "EXEC"):
2099 db_vim_update = None
2100 new_status = None
2101
2102 for task_index, task in enumerate(ro_task["tasks"]):
2103 if not task:
2104 continue # task deleted
2105
2106 task_depends = {}
2107 target_update = None
2108
2109 if (
2110 (
2111 task_action in ("DELETE", "EXEC")
2112 and task["status"] not in ("SCHEDULED", "BUILD")
2113 )
2114 or task["action"] != task_action
2115 or (
2116 task_action == "CREATE"
2117 and task["status"] in ("FINISHED", "SUPERSEDED")
2118 )
2119 ):
2120 continue
2121
2122 task_path = "tasks.{}.status".format(task_index)
2123 try:
2124 db_vim_info_update = None
2125
2126 if task["status"] == "SCHEDULED":
2127 # check if tasks that this depends on have been completed
2128 dependency_not_completed = False
2129
2130 for dependency_task_id in task.get("depends_on") or ():
2131 (
2132 dependency_ro_task,
2133 dependency_task_index,
2134 ) = self._get_dependency(
2135 dependency_task_id, target_id=ro_task["target_id"]
2136 )
2137 dependency_task = dependency_ro_task["tasks"][
2138 dependency_task_index
2139 ]
2140
2141 if dependency_task["status"] == "SCHEDULED":
2142 dependency_not_completed = True
2143 next_check_at = min(
2144 next_check_at, dependency_ro_task["to_check_at"]
2145 )
2146 # must allow dependent task to be processed first
2147 # to do this set time after last_task_processed
2148 next_check_at = max(
2149 self.time_last_task_processed, next_check_at
2150 )
2151 break
2152 elif dependency_task["status"] == "FAILED":
2153 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2154 task["action"],
2155 task["item"],
2156 dependency_task["action"],
2157 dependency_task["item"],
2158 dependency_task_id,
2159 dependency_ro_task["vim_info"].get(
2160 "vim_details"
2161 ),
2162 )
2163 self.logger.error(
2164 "task={} {}".format(task["task_id"], error_text)
2165 )
2166 raise NsWorkerException(error_text)
2167
2168 task_depends[dependency_task_id] = dependency_ro_task[
2169 "vim_info"
2170 ]["vim_id"]
2171 task_depends[
2172 "TASK-{}".format(dependency_task_id)
2173 ] = dependency_ro_task["vim_info"]["vim_id"]
2174
2175 if dependency_not_completed:
2176 # TODO set at vim_info.vim_details that it is waiting
2177 continue
2178
2179 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2180 # the task of renew this locking. It will update database locket_at periodically
2181 if not lock_object:
2182 lock_object = LockRenew.add_lock_object(
2183 "ro_tasks", ro_task, self
2184 )
2185
2186 if task["action"] == "DELETE":
2187 (
2188 new_status,
2189 db_vim_info_update,
2190 ) = self._delete_task(
2191 ro_task, task_index, task_depends, db_ro_task_update
2192 )
2193 new_status = (
2194 "FINISHED" if new_status == "DONE" else new_status
2195 )
2196 # ^with FINISHED instead of DONE it will not be refreshing
2197
2198 if new_status in ("FINISHED", "SUPERSEDED"):
2199 target_update = "DELETE"
2200 elif task["action"] == "EXEC":
2201 (
2202 new_status,
2203 db_vim_info_update,
2204 db_task_update,
2205 ) = self.item2class[task["item"]].exec(
2206 ro_task, task_index, task_depends
2207 )
2208 new_status = (
2209 "FINISHED" if new_status == "DONE" else new_status
2210 )
2211 # ^with FINISHED instead of DONE it will not be refreshing
2212
2213 if db_task_update:
2214 # load into database the modified db_task_update "retries" and "next_retry"
2215 if db_task_update.get("retries"):
2216 db_ro_task_update[
2217 "tasks.{}.retries".format(task_index)
2218 ] = db_task_update["retries"]
2219
2220 next_check_at = time.time() + db_task_update.get(
2221 "next_retry", 60
2222 )
2223 target_update = None
2224 elif task["action"] == "CREATE":
2225 if task["status"] == "SCHEDULED":
2226 if task_status_create:
2227 new_status = task_status_create
2228 target_update = "COPY_VIM_INFO"
2229 else:
2230 new_status, db_vim_info_update = self.item2class[
2231 task["item"]
2232 ].new(ro_task, task_index, task_depends)
2233 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2234 _update_refresh(new_status)
2235 else:
2236 refresh_at = ro_task["vim_info"]["refresh_at"]
2237 if refresh_at and refresh_at != -1 and now > refresh_at:
2238 new_status, db_vim_info_update = self.item2class[
2239 task["item"]
2240 ].refresh(ro_task)
2241 _update_refresh(new_status)
2242 else:
2243 # The refresh is updated to avoid set the value of "refresh_at" to
2244 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2245 # because it can happen that in this case the task is never processed
2246 _update_refresh(task["status"])
2247
2248 except Exception as e:
2249 new_status = "FAILED"
2250 db_vim_info_update = {
2251 "vim_status": "VIM_ERROR",
2252 "vim_details": str(e),
2253 }
2254
2255 if not isinstance(
2256 e, (NsWorkerException, vimconn.VimConnException)
2257 ):
2258 self.logger.error(
2259 "Unexpected exception at _delete_task task={}: {}".format(
2260 task["task_id"], e
2261 ),
2262 exc_info=True,
2263 )
2264
2265 try:
2266 if db_vim_info_update:
2267 db_vim_update = db_vim_info_update.copy()
2268 db_ro_task_update.update(
2269 {
2270 "vim_info." + k: v
2271 for k, v in db_vim_info_update.items()
2272 }
2273 )
2274 ro_task["vim_info"].update(db_vim_info_update)
2275
2276 if new_status:
2277 if task_action == "CREATE":
2278 task_status_create = new_status
2279 db_ro_task_update[task_path] = new_status
2280
2281 if target_update or db_vim_update:
2282 if target_update == "DELETE":
2283 self._update_target(task, None)
2284 elif target_update == "COPY_VIM_INFO":
2285 self._update_target(task, ro_task["vim_info"])
2286 else:
2287 self._update_target(task, db_vim_update)
2288
2289 except Exception as e:
2290 if (
2291 isinstance(e, DbException)
2292 and e.http_code == HTTPStatus.NOT_FOUND
2293 ):
2294 # if the vnfrs or nsrs has been removed from database, this task must be removed
2295 self.logger.debug(
2296 "marking to delete task={}".format(task["task_id"])
2297 )
2298 self.tasks_to_delete.append(task)
2299 else:
2300 self.logger.error(
2301 "Unexpected exception at _update_target task={}: {}".format(
2302 task["task_id"], e
2303 ),
2304 exc_info=True,
2305 )
2306
2307 locked_at = ro_task["locked_at"]
2308
2309 if lock_object:
2310 locked_at = [
2311 lock_object["locked_at"],
2312 lock_object["locked_at"] + self.task_locked_time,
2313 ]
2314 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2315 # contain exactly locked_at + self.task_locked_time
2316 LockRenew.remove_lock_object(lock_object)
2317
2318 q_filter = {
2319 "_id": ro_task["_id"],
2320 "to_check_at": ro_task["to_check_at"],
2321 "locked_at": locked_at,
2322 }
2323 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2324 # outside this task (by ro_nbi) do not update it
2325 db_ro_task_update["locked_by"] = None
2326 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2327 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2328 db_ro_task_update["modified_at"] = now
2329 db_ro_task_update["to_check_at"] = next_check_at
2330
2331 """
2332 # Log RO tasks only when loglevel is DEBUG
2333 if self.logger.getEffectiveLevel() == logging.DEBUG:
2334 db_ro_task_update_log = db_ro_task_update.copy()
2335 db_ro_task_update_log["_id"] = q_filter["_id"]
2336 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2337 """
2338
2339 if not self.db.set_one(
2340 "ro_tasks",
2341 update_dict=db_ro_task_update,
2342 q_filter=q_filter,
2343 fail_on_empty=False,
2344 ):
2345 del db_ro_task_update["to_check_at"]
2346 del q_filter["to_check_at"]
2347 """
2348 # Log RO tasks only when loglevel is DEBUG
2349 if self.logger.getEffectiveLevel() == logging.DEBUG:
2350 self._log_ro_task(
2351 None,
2352 db_ro_task_update_log,
2353 None,
2354 "TASK_WF",
2355 "SET_TASK " + str(q_filter),
2356 )
2357 """
2358 self.db.set_one(
2359 "ro_tasks",
2360 q_filter=q_filter,
2361 update_dict=db_ro_task_update,
2362 fail_on_empty=True,
2363 )
2364 except DbException as e:
2365 self.logger.error(
2366 "ro_task={} Error updating database {}".format(ro_task_id, e)
2367 )
2368 except Exception as e:
2369 self.logger.error(
2370 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2371 )
2372
2373 def _update_target(self, task, ro_vim_item_update):
2374 table, _, temp = task["target_record"].partition(":")
2375 _id, _, path_vim_status = temp.partition(":")
2376 path_item = path_vim_status[: path_vim_status.rfind(".")]
2377 path_item = path_item[: path_item.rfind(".")]
2378 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2379 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2380
2381 if ro_vim_item_update:
2382 update_dict = {
2383 path_vim_status + "." + k: v
2384 for k, v in ro_vim_item_update.items()
2385 if k
2386 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2387 }
2388
2389 if path_vim_status.startswith("vdur."):
2390 # for backward compatibility, add vdur.name apart from vdur.vim_name
2391 if ro_vim_item_update.get("vim_name"):
2392 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2393
2394 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2395 if ro_vim_item_update.get("vim_id"):
2396 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2397
2398 # update general status
2399 if ro_vim_item_update.get("vim_status"):
2400 update_dict[path_item + ".status"] = ro_vim_item_update[
2401 "vim_status"
2402 ]
2403
2404 if ro_vim_item_update.get("interfaces"):
2405 path_interfaces = path_item + ".interfaces"
2406
2407 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2408 if iface:
2409 update_dict.update(
2410 {
2411 path_interfaces + ".{}.".format(i) + k: v
2412 for k, v in iface.items()
2413 if k in ("vlan", "compute_node", "pci")
2414 }
2415 )
2416
2417 # put ip_address and mac_address with ip-address and mac-address
2418 if iface.get("ip_address"):
2419 update_dict[
2420 path_interfaces + ".{}.".format(i) + "ip-address"
2421 ] = iface["ip_address"]
2422
2423 if iface.get("mac_address"):
2424 update_dict[
2425 path_interfaces + ".{}.".format(i) + "mac-address"
2426 ] = iface["mac_address"]
2427
2428 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2429 update_dict["ip-address"] = iface.get("ip_address").split(
2430 ";"
2431 )[0]
2432
2433 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2434 update_dict[path_item + ".ip-address"] = iface.get(
2435 "ip_address"
2436 ).split(";")[0]
2437
2438 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2439 else:
2440 update_dict = {path_item + ".status": "DELETED"}
2441 self.db.set_one(
2442 table,
2443 q_filter={"_id": _id},
2444 update_dict=update_dict,
2445 unset={path_vim_status: None},
2446 )
2447
2448 def _process_delete_db_tasks(self):
2449 """
2450 Delete task from database because vnfrs or nsrs or both have been deleted
2451 :return: None. Uses and modify self.tasks_to_delete
2452 """
2453 while self.tasks_to_delete:
2454 task = self.tasks_to_delete[0]
2455 vnfrs_deleted = None
2456 nsr_id = task["nsr_id"]
2457
2458 if task["target_record"].startswith("vnfrs:"):
2459 # check if nsrs is present
2460 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2461 vnfrs_deleted = task["target_record"].split(":")[1]
2462
2463 try:
2464 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2465 except Exception as e:
2466 self.logger.error(
2467 "Error deleting task={}: {}".format(task["task_id"], e)
2468 )
2469 self.tasks_to_delete.pop(0)
2470
2471 @staticmethod
2472 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2473 """
2474 Static method because it is called from osm_ng_ro.ns
2475 :param db: instance of database to use
2476 :param nsr_id: affected nsrs id
2477 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2478 :return: None, exception is fails
2479 """
2480 retries = 5
2481 for retry in range(retries):
2482 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2483 now = time.time()
2484 conflict = False
2485
2486 for ro_task in ro_tasks:
2487 db_update = {}
2488 to_delete_ro_task = True
2489
2490 for index, task in enumerate(ro_task["tasks"]):
2491 if not task:
2492 pass
2493 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2494 vnfrs_deleted
2495 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2496 ):
2497 db_update["tasks.{}".format(index)] = None
2498 else:
2499 # used by other nsr, ro_task cannot be deleted
2500 to_delete_ro_task = False
2501
2502 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2503 if to_delete_ro_task:
2504 if not db.del_one(
2505 "ro_tasks",
2506 q_filter={
2507 "_id": ro_task["_id"],
2508 "modified_at": ro_task["modified_at"],
2509 },
2510 fail_on_empty=False,
2511 ):
2512 conflict = True
2513 elif db_update:
2514 db_update["modified_at"] = now
2515 if not db.set_one(
2516 "ro_tasks",
2517 q_filter={
2518 "_id": ro_task["_id"],
2519 "modified_at": ro_task["modified_at"],
2520 },
2521 update_dict=db_update,
2522 fail_on_empty=False,
2523 ):
2524 conflict = True
2525 if not conflict:
2526 return
2527 else:
2528 raise NsWorkerException("Exceeded {} retries".format(retries))
2529
2530 def run(self):
2531 # load database
2532 self.logger.info("Starting")
2533 while True:
2534 # step 1: get commands from queue
2535 try:
2536 if self.vim_targets:
2537 task = self.task_queue.get(block=False)
2538 else:
2539 if not self.idle:
2540 self.logger.debug("enters in idle state")
2541 self.idle = True
2542 task = self.task_queue.get(block=True)
2543 self.idle = False
2544
2545 if task[0] == "terminate":
2546 break
2547 elif task[0] == "load_vim":
2548 self.logger.info("order to load vim {}".format(task[1]))
2549 self._load_vim(task[1])
2550 elif task[0] == "unload_vim":
2551 self.logger.info("order to unload vim {}".format(task[1]))
2552 self._unload_vim(task[1])
2553 elif task[0] == "reload_vim":
2554 self._reload_vim(task[1])
2555 elif task[0] == "check_vim":
2556 self.logger.info("order to check vim {}".format(task[1]))
2557 self._check_vim(task[1])
2558 continue
2559 except Exception as e:
2560 if isinstance(e, queue.Empty):
2561 pass
2562 else:
2563 self.logger.critical(
2564 "Error processing task: {}".format(e), exc_info=True
2565 )
2566
2567 # step 2: process pending_tasks, delete not needed tasks
2568 try:
2569 if self.tasks_to_delete:
2570 self._process_delete_db_tasks()
2571 busy = False
2572 """
2573 # Log RO tasks only when loglevel is DEBUG
2574 if self.logger.getEffectiveLevel() == logging.DEBUG:
2575 _ = self._get_db_all_tasks()
2576 """
2577 ro_task = self._get_db_task()
2578 if ro_task:
2579 self._process_pending_tasks(ro_task)
2580 busy = True
2581 if not busy:
2582 time.sleep(5)
2583 except Exception as e:
2584 self.logger.critical(
2585 "Unexpected exception at run: " + str(e), exc_info=True
2586 )
2587
2588 self.logger.info("Finishing")