Fixing RO Security Vulnerabilities
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn, vimconn
43 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
44 from osm_ro_plugin.vim_dummy import VimDummyConnector
45 import yaml
46
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 }
217 self.logger.debug(
218 "task={} {} new-net={} created={}".format(
219 task_id, ro_task["target_id"], vim_net_id, created
220 )
221 )
222
223 return "BUILD", ro_vim_item_update
224 except (vimconn.VimConnException, NsWorkerException) as e:
225 self.logger.error(
226 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
227 )
228 ro_vim_item_update = {
229 "vim_status": "VIM_ERROR",
230 "created": created,
231 "vim_details": str(e),
232 }
233
234 return "FAILED", ro_vim_item_update
235
236 def refresh(self, ro_task):
237 """Call VIM to get network status"""
238 ro_task_id = ro_task["_id"]
239 target_vim = self.my_vims[ro_task["target_id"]]
240 vim_id = ro_task["vim_info"]["vim_id"]
241 net_to_refresh_list = [vim_id]
242
243 try:
244 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
245 vim_info = vim_dict[vim_id]
246
247 if vim_info["status"] == "ACTIVE":
248 task_status = "DONE"
249 elif vim_info["status"] == "BUILD":
250 task_status = "BUILD"
251 else:
252 task_status = "FAILED"
253 except vimconn.VimConnException as e:
254 # Mark all tasks at VIM_ERROR status
255 self.logger.error(
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id, ro_task["target_id"], vim_id, e
258 )
259 )
260 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
261 task_status = "FAILED"
262
263 ro_vim_item_update = {}
264 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
265 ro_vim_item_update["vim_status"] = vim_info["status"]
266
267 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
268 ro_vim_item_update["vim_name"] = vim_info.get("name")
269
270 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
272 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
273 elif vim_info["status"] == "DELETED":
274 ro_vim_item_update["vim_id"] = None
275 ro_vim_item_update["vim_details"] = "Deleted externally"
276 else:
277 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
278 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
279
280 if ro_vim_item_update:
281 self.logger.debug(
282 "ro_task={} {} get-net={}: status={} {}".format(
283 ro_task_id,
284 ro_task["target_id"],
285 vim_id,
286 ro_vim_item_update.get("vim_status"),
287 ro_vim_item_update.get("vim_details")
288 if ro_vim_item_update.get("vim_status") != "ACTIVE"
289 else "",
290 )
291 )
292
293 return task_status, ro_vim_item_update
294
295 def delete(self, ro_task, task_index):
296 task = ro_task["tasks"][task_index]
297 task_id = task["task_id"]
298 net_vim_id = ro_task["vim_info"]["vim_id"]
299 ro_vim_item_update_ok = {
300 "vim_status": "DELETED",
301 "created": False,
302 "vim_details": "DELETED",
303 "vim_id": None,
304 }
305
306 try:
307 if net_vim_id or ro_task["vim_info"]["created_items"]:
308 target_vim = self.my_vims[ro_task["target_id"]]
309 target_vim.delete_network(
310 net_vim_id, ro_task["vim_info"]["created_items"]
311 )
312 except vimconn.VimConnNotFoundException:
313 ro_vim_item_update_ok["vim_details"] = "already deleted"
314 except vimconn.VimConnException as e:
315 self.logger.error(
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task["_id"], ro_task["target_id"], net_vim_id, e
318 )
319 )
320 ro_vim_item_update = {
321 "vim_status": "VIM_ERROR",
322 "vim_details": "Error while deleting: {}".format(e),
323 }
324
325 return "FAILED", ro_vim_item_update
326
327 self.logger.debug(
328 "task={} {} del-net={} {}".format(
329 task_id,
330 ro_task["target_id"],
331 net_vim_id,
332 ro_vim_item_update_ok.get("vim_details", ""),
333 )
334 )
335
336 return "DONE", ro_vim_item_update_ok
337
338
339 class VimInteractionVdu(VimInteractionBase):
340 max_retries_inject_ssh_key = 20 # 20 times
341 time_retries_inject_ssh_key = 30 # wevery 30 seconds
342
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 created_items = {}
348 target_vim = self.my_vims[ro_task["target_id"]]
349
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391
392 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
393 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
394
395 ro_vim_item_update = {
396 "vim_id": vim_vm_id,
397 "vim_status": "BUILD",
398 "created": created,
399 "created_items": created_items,
400 "vim_details": None,
401 "interfaces_vim_ids": interfaces,
402 "interfaces": [],
403 }
404 self.logger.debug(
405 "task={} {} new-vm={} created={}".format(
406 task_id, ro_task["target_id"], vim_vm_id, created
407 )
408 )
409
410 return "BUILD", ro_vim_item_update
411 except (vimconn.VimConnException, NsWorkerException) as e:
412 self.logger.error(
413 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
414 )
415 ro_vim_item_update = {
416 "vim_status": "VIM_ERROR",
417 "created": created,
418 "vim_details": str(e),
419 }
420
421 return "FAILED", ro_vim_item_update
422
423 def delete(self, ro_task, task_index):
424 task = ro_task["tasks"][task_index]
425 task_id = task["task_id"]
426 vm_vim_id = ro_task["vim_info"]["vim_id"]
427 ro_vim_item_update_ok = {
428 "vim_status": "DELETED",
429 "created": False,
430 "vim_details": "DELETED",
431 "vim_id": None,
432 }
433
434 try:
435 if vm_vim_id or ro_task["vim_info"]["created_items"]:
436 target_vim = self.my_vims[ro_task["target_id"]]
437 target_vim.delete_vminstance(
438 vm_vim_id, ro_task["vim_info"]["created_items"]
439 )
440 except vimconn.VimConnNotFoundException:
441 ro_vim_item_update_ok["vim_details"] = "already deleted"
442 except vimconn.VimConnException as e:
443 self.logger.error(
444 "ro_task={} vim={} del-vm={}: {}".format(
445 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
446 )
447 )
448 ro_vim_item_update = {
449 "vim_status": "VIM_ERROR",
450 "vim_details": "Error while deleting: {}".format(e),
451 }
452
453 return "FAILED", ro_vim_item_update
454
455 self.logger.debug(
456 "task={} {} del-vm={} {}".format(
457 task_id,
458 ro_task["target_id"],
459 vm_vim_id,
460 ro_vim_item_update_ok.get("vim_details", ""),
461 )
462 )
463
464 return "DONE", ro_vim_item_update_ok
465
466 def refresh(self, ro_task):
467 """Call VIM to get vm status"""
468 ro_task_id = ro_task["_id"]
469 target_vim = self.my_vims[ro_task["target_id"]]
470 vim_id = ro_task["vim_info"]["vim_id"]
471
472 if not vim_id:
473 return None, None
474
475 vm_to_refresh_list = [vim_id]
476 try:
477 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
478 vim_info = vim_dict[vim_id]
479
480 if vim_info["status"] == "ACTIVE":
481 task_status = "DONE"
482 elif vim_info["status"] == "BUILD":
483 task_status = "BUILD"
484 else:
485 task_status = "FAILED"
486
487 # try to load and parse vim_information
488 try:
489 vim_info_info = yaml.safe_load(vim_info["vim_info"])
490 if vim_info_info.get("name"):
491 vim_info["name"] = vim_info_info["name"]
492 except Exception as vim_info_error:
493 self.logger.exception(
494 f"{vim_info_error} occured while getting the vim_info from yaml"
495 )
496 except vimconn.VimConnException as e:
497 # Mark all tasks at VIM_ERROR status
498 self.logger.error(
499 "ro_task={} vim={} get-vm={}: {}".format(
500 ro_task_id, ro_task["target_id"], vim_id, e
501 )
502 )
503 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
504 task_status = "FAILED"
505
506 ro_vim_item_update = {}
507
508 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
509 vim_interfaces = []
510 if vim_info.get("interfaces"):
511 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
512 iface = next(
513 (
514 iface
515 for iface in vim_info["interfaces"]
516 if vim_iface_id == iface["vim_interface_id"]
517 ),
518 None,
519 )
520 # if iface:
521 # iface.pop("vim_info", None)
522 vim_interfaces.append(iface)
523
524 task_create = next(
525 t
526 for t in ro_task["tasks"]
527 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
528 )
529 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
530 vim_interfaces[task_create["mgmt_vnf_interface"]][
531 "mgmt_vnf_interface"
532 ] = True
533
534 mgmt_vdu_iface = task_create.get(
535 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
536 )
537 if vim_interfaces:
538 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
539
540 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
541 ro_vim_item_update["interfaces"] = vim_interfaces
542
543 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
544 ro_vim_item_update["vim_status"] = vim_info["status"]
545
546 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
547 ro_vim_item_update["vim_name"] = vim_info.get("name")
548
549 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
550 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
551 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
552 elif vim_info["status"] == "DELETED":
553 ro_vim_item_update["vim_id"] = None
554 ro_vim_item_update["vim_details"] = "Deleted externally"
555 else:
556 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
557 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
558
559 if ro_vim_item_update:
560 self.logger.debug(
561 "ro_task={} {} get-vm={}: status={} {}".format(
562 ro_task_id,
563 ro_task["target_id"],
564 vim_id,
565 ro_vim_item_update.get("vim_status"),
566 ro_vim_item_update.get("vim_details")
567 if ro_vim_item_update.get("vim_status") != "ACTIVE"
568 else "",
569 )
570 )
571
572 return task_status, ro_vim_item_update
573
574 def exec(self, ro_task, task_index, task_depends):
575 task = ro_task["tasks"][task_index]
576 task_id = task["task_id"]
577 target_vim = self.my_vims[ro_task["target_id"]]
578 db_task_update = {"retries": 0}
579 retries = task.get("retries", 0)
580
581 try:
582 params = task["params"]
583 params_copy = deepcopy(params)
584 params_copy["ro_key"] = self.db.decrypt(
585 params_copy.pop("private_key"),
586 params_copy.pop("schema_version"),
587 params_copy.pop("salt"),
588 )
589 params_copy["ip_addr"] = params_copy.pop("ip_address")
590 target_vim.inject_user_key(**params_copy)
591 self.logger.debug(
592 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
593 )
594
595 return (
596 "DONE",
597 None,
598 db_task_update,
599 ) # params_copy["key"]
600 except (vimconn.VimConnException, NsWorkerException) as e:
601 retries += 1
602
603 self.logger.debug(traceback.format_exc())
604 if retries < self.max_retries_inject_ssh_key:
605 return (
606 "BUILD",
607 None,
608 {
609 "retries": retries,
610 "next_retry": self.time_retries_inject_ssh_key,
611 },
612 )
613
614 self.logger.error(
615 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
616 )
617 ro_vim_item_update = {"vim_details": str(e)}
618
619 return "FAILED", ro_vim_item_update, db_task_update
620
621
622 class VimInteractionImage(VimInteractionBase):
623 def new(self, ro_task, task_index, task_depends):
624 task = ro_task["tasks"][task_index]
625 task_id = task["task_id"]
626 created = False
627 created_items = {}
628 target_vim = self.my_vims[ro_task["target_id"]]
629
630 try:
631 # FIND
632 if task.get("find_params"):
633 vim_images = target_vim.get_image_list(**task["find_params"])
634
635 if not vim_images:
636 raise NsWorkerExceptionNotFound(
637 "Image not found with this criteria: '{}'".format(
638 task["find_params"]
639 )
640 )
641 elif len(vim_images) > 1:
642 raise NsWorkerException(
643 "More than one image found with this criteria: '{}'".format(
644 task["find_params"]
645 )
646 )
647 else:
648 vim_image_id = vim_images[0]["id"]
649
650 ro_vim_item_update = {
651 "vim_id": vim_image_id,
652 "vim_status": "DONE",
653 "created": created,
654 "created_items": created_items,
655 "vim_details": None,
656 }
657 self.logger.debug(
658 "task={} {} new-image={} created={}".format(
659 task_id, ro_task["target_id"], vim_image_id, created
660 )
661 )
662
663 return "DONE", ro_vim_item_update
664 except (NsWorkerException, vimconn.VimConnException) as e:
665 self.logger.error(
666 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
667 )
668 ro_vim_item_update = {
669 "vim_status": "VIM_ERROR",
670 "created": created,
671 "vim_details": str(e),
672 }
673
674 return "FAILED", ro_vim_item_update
675
676
677 class VimInteractionFlavor(VimInteractionBase):
678 def delete(self, ro_task, task_index):
679 task = ro_task["tasks"][task_index]
680 task_id = task["task_id"]
681 flavor_vim_id = ro_task["vim_info"]["vim_id"]
682 ro_vim_item_update_ok = {
683 "vim_status": "DELETED",
684 "created": False,
685 "vim_details": "DELETED",
686 "vim_id": None,
687 }
688
689 try:
690 if flavor_vim_id:
691 target_vim = self.my_vims[ro_task["target_id"]]
692 target_vim.delete_flavor(flavor_vim_id)
693 except vimconn.VimConnNotFoundException:
694 ro_vim_item_update_ok["vim_details"] = "already deleted"
695 except vimconn.VimConnException as e:
696 self.logger.error(
697 "ro_task={} vim={} del-flavor={}: {}".format(
698 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
699 )
700 )
701 ro_vim_item_update = {
702 "vim_status": "VIM_ERROR",
703 "vim_details": "Error while deleting: {}".format(e),
704 }
705
706 return "FAILED", ro_vim_item_update
707
708 self.logger.debug(
709 "task={} {} del-flavor={} {}".format(
710 task_id,
711 ro_task["target_id"],
712 flavor_vim_id,
713 ro_vim_item_update_ok.get("vim_details", ""),
714 )
715 )
716
717 return "DONE", ro_vim_item_update_ok
718
719 def new(self, ro_task, task_index, task_depends):
720 task = ro_task["tasks"][task_index]
721 task_id = task["task_id"]
722 created = False
723 created_items = {}
724 target_vim = self.my_vims[ro_task["target_id"]]
725
726 try:
727 # FIND
728 vim_flavor_id = None
729
730 if task.get("find_params"):
731 try:
732 flavor_data = task["find_params"]["flavor_data"]
733 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
734 except vimconn.VimConnNotFoundException:
735 self.logger.exception("VimConnNotFoundException occured.")
736
737 if not vim_flavor_id and task.get("params"):
738 # CREATE
739 flavor_data = task["params"]["flavor_data"]
740 vim_flavor_id = target_vim.new_flavor(flavor_data)
741 created = True
742
743 ro_vim_item_update = {
744 "vim_id": vim_flavor_id,
745 "vim_status": "DONE",
746 "created": created,
747 "created_items": created_items,
748 "vim_details": None,
749 }
750 self.logger.debug(
751 "task={} {} new-flavor={} created={}".format(
752 task_id, ro_task["target_id"], vim_flavor_id, created
753 )
754 )
755
756 return "DONE", ro_vim_item_update
757 except (vimconn.VimConnException, NsWorkerException) as e:
758 self.logger.error(
759 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
760 )
761 ro_vim_item_update = {
762 "vim_status": "VIM_ERROR",
763 "created": created,
764 "vim_details": str(e),
765 }
766
767 return "FAILED", ro_vim_item_update
768
769
770 class VimInteractionAffinityGroup(VimInteractionBase):
771 def delete(self, ro_task, task_index):
772 task = ro_task["tasks"][task_index]
773 task_id = task["task_id"]
774 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
775 ro_vim_item_update_ok = {
776 "vim_status": "DELETED",
777 "created": False,
778 "vim_details": "DELETED",
779 "vim_id": None,
780 }
781
782 try:
783 if affinity_group_vim_id:
784 target_vim = self.my_vims[ro_task["target_id"]]
785 target_vim.delete_affinity_group(affinity_group_vim_id)
786 except vimconn.VimConnNotFoundException:
787 ro_vim_item_update_ok["vim_details"] = "already deleted"
788 except vimconn.VimConnException as e:
789 self.logger.error(
790 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
791 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
792 )
793 )
794 ro_vim_item_update = {
795 "vim_status": "VIM_ERROR",
796 "vim_details": "Error while deleting: {}".format(e),
797 }
798
799 return "FAILED", ro_vim_item_update
800
801 self.logger.debug(
802 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
803 task_id,
804 ro_task["target_id"],
805 affinity_group_vim_id,
806 ro_vim_item_update_ok.get("vim_details", ""),
807 )
808 )
809
810 return "DONE", ro_vim_item_update_ok
811
812 def new(self, ro_task, task_index, task_depends):
813 task = ro_task["tasks"][task_index]
814 task_id = task["task_id"]
815 created = False
816 created_items = {}
817 target_vim = self.my_vims[ro_task["target_id"]]
818
819 try:
820 affinity_group_vim_id = None
821 affinity_group_data = None
822
823 if task.get("params"):
824 affinity_group_data = task["params"].get("affinity_group_data")
825
826 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
827 try:
828 param_affinity_group_id = task["params"]["affinity_group_data"].get(
829 "vim-affinity-group-id"
830 )
831 affinity_group_vim_id = target_vim.get_affinity_group(
832 param_affinity_group_id
833 ).get("id")
834 except vimconn.VimConnNotFoundException:
835 self.logger.error(
836 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
837 "could not be found at VIM. Creating a new one.".format(
838 task_id, ro_task["target_id"], param_affinity_group_id
839 )
840 )
841
842 if not affinity_group_vim_id and affinity_group_data:
843 affinity_group_vim_id = target_vim.new_affinity_group(
844 affinity_group_data
845 )
846 created = True
847
848 ro_vim_item_update = {
849 "vim_id": affinity_group_vim_id,
850 "vim_status": "DONE",
851 "created": created,
852 "created_items": created_items,
853 "vim_details": None,
854 }
855 self.logger.debug(
856 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
857 task_id, ro_task["target_id"], affinity_group_vim_id, created
858 )
859 )
860
861 return "DONE", ro_vim_item_update
862 except (vimconn.VimConnException, NsWorkerException) as e:
863 self.logger.error(
864 "task={} vim={} new-affinity-or-anti-affinity-group:"
865 " {}".format(task_id, ro_task["target_id"], e)
866 )
867 ro_vim_item_update = {
868 "vim_status": "VIM_ERROR",
869 "created": created,
870 "vim_details": str(e),
871 }
872
873 return "FAILED", ro_vim_item_update
874
875
876 class VimInteractionSdnNet(VimInteractionBase):
877 @staticmethod
878 def _match_pci(port_pci, mapping):
879 """
880 Check if port_pci matches with mapping
881 mapping can have brackets to indicate that several chars are accepted. e.g
882 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
883 :param port_pci: text
884 :param mapping: text, can contain brackets to indicate several chars are available
885 :return: True if matches, False otherwise
886 """
887 if not port_pci or not mapping:
888 return False
889 if port_pci == mapping:
890 return True
891
892 mapping_index = 0
893 pci_index = 0
894 while True:
895 bracket_start = mapping.find("[", mapping_index)
896
897 if bracket_start == -1:
898 break
899
900 bracket_end = mapping.find("]", bracket_start)
901 if bracket_end == -1:
902 break
903
904 length = bracket_start - mapping_index
905 if (
906 length
907 and port_pci[pci_index : pci_index + length]
908 != mapping[mapping_index:bracket_start]
909 ):
910 return False
911
912 if (
913 port_pci[pci_index + length]
914 not in mapping[bracket_start + 1 : bracket_end]
915 ):
916 return False
917
918 pci_index += length + 1
919 mapping_index = bracket_end + 1
920
921 if port_pci[pci_index:] != mapping[mapping_index:]:
922 return False
923
924 return True
925
926 def _get_interfaces(self, vlds_to_connect, vim_account_id):
927 """
928 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
929 :param vim_account_id:
930 :return:
931 """
932 interfaces = []
933
934 for vld in vlds_to_connect:
935 table, _, db_id = vld.partition(":")
936 db_id, _, vld = db_id.partition(":")
937 _, _, vld_id = vld.partition(".")
938
939 if table == "vnfrs":
940 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
941 iface_key = "vnf-vld-id"
942 else: # table == "nsrs"
943 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
944 iface_key = "ns-vld-id"
945
946 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
947
948 for db_vnfr in db_vnfrs:
949 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
950 for iface_index, interface in enumerate(vdur["interfaces"]):
951 if interface.get(iface_key) == vld_id and interface.get(
952 "type"
953 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
954 # only SR-IOV o PT
955 interface_ = interface.copy()
956 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
957 db_vnfr["_id"], vdu_index, iface_index
958 )
959
960 if vdur.get("status") == "ERROR":
961 interface_["status"] = "ERROR"
962
963 interfaces.append(interface_)
964
965 return interfaces
966
967 def refresh(self, ro_task):
968 # look for task create
969 task_create_index, _ = next(
970 i_t
971 for i_t in enumerate(ro_task["tasks"])
972 if i_t[1]
973 and i_t[1]["action"] == "CREATE"
974 and i_t[1]["status"] != "FINISHED"
975 )
976
977 return self.new(ro_task, task_create_index, None)
978
979 def new(self, ro_task, task_index, task_depends):
980
981 task = ro_task["tasks"][task_index]
982 task_id = task["task_id"]
983 target_vim = self.my_vims[ro_task["target_id"]]
984
985 sdn_net_id = ro_task["vim_info"]["vim_id"]
986
987 created_items = ro_task["vim_info"].get("created_items")
988 connected_ports = ro_task["vim_info"].get("connected_ports", [])
989 new_connected_ports = []
990 last_update = ro_task["vim_info"].get("last_update", 0)
991 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
992 error_list = []
993 created = ro_task["vim_info"].get("created", False)
994
995 try:
996 # CREATE
997 params = task["params"]
998 vlds_to_connect = params.get("vlds", [])
999 associated_vim = params.get("target_vim")
1000 # external additional ports
1001 additional_ports = params.get("sdn-ports") or ()
1002 _, _, vim_account_id = (
1003 (None, None, None)
1004 if associated_vim is None
1005 else associated_vim.partition(":")
1006 )
1007
1008 if associated_vim:
1009 # get associated VIM
1010 if associated_vim not in self.db_vims:
1011 self.db_vims[associated_vim] = self.db.get_one(
1012 "vim_accounts", {"_id": vim_account_id}
1013 )
1014
1015 db_vim = self.db_vims[associated_vim]
1016
1017 # look for ports to connect
1018 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1019 # print(ports)
1020
1021 sdn_ports = []
1022 pending_ports = error_ports = 0
1023 vlan_used = None
1024 sdn_need_update = False
1025
1026 for port in ports:
1027 vlan_used = port.get("vlan") or vlan_used
1028
1029 # TODO. Do not connect if already done
1030 if not port.get("compute_node") or not port.get("pci"):
1031 if port.get("status") == "ERROR":
1032 error_ports += 1
1033 else:
1034 pending_ports += 1
1035 continue
1036
1037 pmap = None
1038 compute_node_mappings = next(
1039 (
1040 c
1041 for c in db_vim["config"].get("sdn-port-mapping", ())
1042 if c and c["compute_node"] == port["compute_node"]
1043 ),
1044 None,
1045 )
1046
1047 if compute_node_mappings:
1048 # process port_mapping pci of type 0000:af:1[01].[1357]
1049 pmap = next(
1050 (
1051 p
1052 for p in compute_node_mappings["ports"]
1053 if self._match_pci(port["pci"], p.get("pci"))
1054 ),
1055 None,
1056 )
1057
1058 if not pmap:
1059 if not db_vim["config"].get("mapping_not_needed"):
1060 error_list.append(
1061 "Port mapping not found for compute_node={} pci={}".format(
1062 port["compute_node"], port["pci"]
1063 )
1064 )
1065 continue
1066
1067 pmap = {}
1068
1069 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1070 new_port = {
1071 "service_endpoint_id": pmap.get("service_endpoint_id")
1072 or service_endpoint_id,
1073 "service_endpoint_encapsulation_type": "dot1q"
1074 if port["type"] == "SR-IOV"
1075 else None,
1076 "service_endpoint_encapsulation_info": {
1077 "vlan": port.get("vlan"),
1078 "mac": port.get("mac-address"),
1079 "device_id": pmap.get("device_id") or port["compute_node"],
1080 "device_interface_id": pmap.get("device_interface_id")
1081 or port["pci"],
1082 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1083 "switch_port": pmap.get("switch_port"),
1084 "service_mapping_info": pmap.get("service_mapping_info"),
1085 },
1086 }
1087
1088 # TODO
1089 # if port["modified_at"] > last_update:
1090 # sdn_need_update = True
1091 new_connected_ports.append(port["id"]) # TODO
1092 sdn_ports.append(new_port)
1093
1094 if error_ports:
1095 error_list.append(
1096 "{} interfaces have not been created as VDU is on ERROR status".format(
1097 error_ports
1098 )
1099 )
1100
1101 # connect external ports
1102 for index, additional_port in enumerate(additional_ports):
1103 additional_port_id = additional_port.get(
1104 "service_endpoint_id"
1105 ) or "external-{}".format(index)
1106 sdn_ports.append(
1107 {
1108 "service_endpoint_id": additional_port_id,
1109 "service_endpoint_encapsulation_type": additional_port.get(
1110 "service_endpoint_encapsulation_type", "dot1q"
1111 ),
1112 "service_endpoint_encapsulation_info": {
1113 "vlan": additional_port.get("vlan") or vlan_used,
1114 "mac": additional_port.get("mac_address"),
1115 "device_id": additional_port.get("device_id"),
1116 "device_interface_id": additional_port.get(
1117 "device_interface_id"
1118 ),
1119 "switch_dpid": additional_port.get("switch_dpid")
1120 or additional_port.get("switch_id"),
1121 "switch_port": additional_port.get("switch_port"),
1122 "service_mapping_info": additional_port.get(
1123 "service_mapping_info"
1124 ),
1125 },
1126 }
1127 )
1128 new_connected_ports.append(additional_port_id)
1129 sdn_info = ""
1130
1131 # if there are more ports to connect or they have been modified, call create/update
1132 if error_list:
1133 sdn_status = "ERROR"
1134 sdn_info = "; ".join(error_list)
1135 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1136 last_update = time.time()
1137
1138 if not sdn_net_id:
1139 if len(sdn_ports) < 2:
1140 sdn_status = "ACTIVE"
1141
1142 if not pending_ports:
1143 self.logger.debug(
1144 "task={} {} new-sdn-net done, less than 2 ports".format(
1145 task_id, ro_task["target_id"]
1146 )
1147 )
1148 else:
1149 net_type = params.get("type") or "ELAN"
1150 (
1151 sdn_net_id,
1152 created_items,
1153 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1154 created = True
1155 self.logger.debug(
1156 "task={} {} new-sdn-net={} created={}".format(
1157 task_id, ro_task["target_id"], sdn_net_id, created
1158 )
1159 )
1160 else:
1161 created_items = target_vim.edit_connectivity_service(
1162 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1163 )
1164 created = True
1165 self.logger.debug(
1166 "task={} {} update-sdn-net={} created={}".format(
1167 task_id, ro_task["target_id"], sdn_net_id, created
1168 )
1169 )
1170
1171 connected_ports = new_connected_ports
1172 elif sdn_net_id:
1173 wim_status_dict = target_vim.get_connectivity_service_status(
1174 sdn_net_id, conn_info=created_items
1175 )
1176 sdn_status = wim_status_dict["sdn_status"]
1177
1178 if wim_status_dict.get("sdn_info"):
1179 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1180
1181 if wim_status_dict.get("error_msg"):
1182 sdn_info = wim_status_dict.get("error_msg") or ""
1183
1184 if pending_ports:
1185 if sdn_status != "ERROR":
1186 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1187 len(ports) - pending_ports, len(ports)
1188 )
1189
1190 if sdn_status == "ACTIVE":
1191 sdn_status = "BUILD"
1192
1193 ro_vim_item_update = {
1194 "vim_id": sdn_net_id,
1195 "vim_status": sdn_status,
1196 "created": created,
1197 "created_items": created_items,
1198 "connected_ports": connected_ports,
1199 "vim_details": sdn_info,
1200 "last_update": last_update,
1201 }
1202
1203 return sdn_status, ro_vim_item_update
1204 except Exception as e:
1205 self.logger.error(
1206 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1207 exc_info=not isinstance(
1208 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1209 ),
1210 )
1211 ro_vim_item_update = {
1212 "vim_status": "VIM_ERROR",
1213 "created": created,
1214 "vim_details": str(e),
1215 }
1216
1217 return "FAILED", ro_vim_item_update
1218
1219 def delete(self, ro_task, task_index):
1220 task = ro_task["tasks"][task_index]
1221 task_id = task["task_id"]
1222 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1223 ro_vim_item_update_ok = {
1224 "vim_status": "DELETED",
1225 "created": False,
1226 "vim_details": "DELETED",
1227 "vim_id": None,
1228 }
1229
1230 try:
1231 if sdn_vim_id:
1232 target_vim = self.my_vims[ro_task["target_id"]]
1233 target_vim.delete_connectivity_service(
1234 sdn_vim_id, ro_task["vim_info"].get("created_items")
1235 )
1236
1237 except Exception as e:
1238 if (
1239 isinstance(e, sdnconn.SdnConnectorError)
1240 and e.http_code == HTTPStatus.NOT_FOUND.value
1241 ):
1242 ro_vim_item_update_ok["vim_details"] = "already deleted"
1243 else:
1244 self.logger.error(
1245 "ro_task={} vim={} del-sdn-net={}: {}".format(
1246 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1247 ),
1248 exc_info=not isinstance(
1249 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1250 ),
1251 )
1252 ro_vim_item_update = {
1253 "vim_status": "VIM_ERROR",
1254 "vim_details": "Error while deleting: {}".format(e),
1255 }
1256
1257 return "FAILED", ro_vim_item_update
1258
1259 self.logger.debug(
1260 "task={} {} del-sdn-net={} {}".format(
1261 task_id,
1262 ro_task["target_id"],
1263 sdn_vim_id,
1264 ro_vim_item_update_ok.get("vim_details", ""),
1265 )
1266 )
1267
1268 return "DONE", ro_vim_item_update_ok
1269
1270
1271 class ConfigValidate:
1272 def __init__(self, config: Dict):
1273 self.conf = config
1274
1275 @property
1276 def active(self):
1277 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1278 if (
1279 self.conf["period"]["refresh_active"] >= 60
1280 or self.conf["period"]["refresh_active"] == -1
1281 ):
1282 return self.conf["period"]["refresh_active"]
1283
1284 return 60
1285
1286 @property
1287 def build(self):
1288 return self.conf["period"]["refresh_build"]
1289
1290 @property
1291 def image(self):
1292 return self.conf["period"]["refresh_image"]
1293
1294 @property
1295 def error(self):
1296 return self.conf["period"]["refresh_error"]
1297
1298 @property
1299 def queue_size(self):
1300 return self.conf["period"]["queue_size"]
1301
1302
1303 class NsWorker(threading.Thread):
1304 def __init__(self, worker_index, config, plugins, db):
1305 """
1306
1307 :param worker_index: thread index
1308 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1309 :param plugins: global shared dict with the loaded plugins
1310 :param db: database class instance to use
1311 """
1312 threading.Thread.__init__(self)
1313 self.config = config
1314 self.plugins = plugins
1315 self.plugin_name = "unknown"
1316 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1317 self.worker_index = worker_index
1318 # refresh periods for created items
1319 self.refresh_config = ConfigValidate(config)
1320 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1321 # targetvim: vimplugin class
1322 self.my_vims = {}
1323 # targetvim: vim information from database
1324 self.db_vims = {}
1325 # targetvim list
1326 self.vim_targets = []
1327 self.my_id = config["process_id"] + ":" + str(worker_index)
1328 self.db = db
1329 self.item2class = {
1330 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1331 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1332 "image": VimInteractionImage(
1333 self.db, self.my_vims, self.db_vims, self.logger
1334 ),
1335 "flavor": VimInteractionFlavor(
1336 self.db, self.my_vims, self.db_vims, self.logger
1337 ),
1338 "sdn_net": VimInteractionSdnNet(
1339 self.db, self.my_vims, self.db_vims, self.logger
1340 ),
1341 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1342 self.db, self.my_vims, self.db_vims, self.logger
1343 ),
1344 }
1345 self.time_last_task_processed = None
1346 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1347 self.tasks_to_delete = []
1348 # it is idle when there are not vim_targets associated
1349 self.idle = True
1350 self.task_locked_time = config["global"]["task_locked_time"]
1351
1352 def insert_task(self, task):
1353 try:
1354 self.task_queue.put(task, False)
1355 return None
1356 except queue.Full:
1357 raise NsWorkerException("timeout inserting a task")
1358
1359 def terminate(self):
1360 self.insert_task("exit")
1361
1362 def del_task(self, task):
1363 with self.task_lock:
1364 if task["status"] == "SCHEDULED":
1365 task["status"] = "SUPERSEDED"
1366 return True
1367 else: # task["status"] == "processing"
1368 self.task_lock.release()
1369 return False
1370
1371 def _process_vim_config(self, target_id, db_vim):
1372 """
1373 Process vim config, creating vim configuration files as ca_cert
1374 :param target_id: vim/sdn/wim + id
1375 :param db_vim: Vim dictionary obtained from database
1376 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1377 """
1378 if not db_vim.get("config"):
1379 return
1380
1381 file_name = ""
1382
1383 try:
1384 if db_vim["config"].get("ca_cert_content"):
1385 file_name = "{}:{}".format(target_id, self.worker_index)
1386
1387 try:
1388 mkdir(file_name)
1389 except FileExistsError:
1390 self.logger.exception(
1391 "FileExistsError occured while processing vim_config."
1392 )
1393
1394 file_name = file_name + "/ca_cert"
1395
1396 with open(file_name, "w") as f:
1397 f.write(db_vim["config"]["ca_cert_content"])
1398 del db_vim["config"]["ca_cert_content"]
1399 db_vim["config"]["ca_cert"] = file_name
1400 except Exception as e:
1401 raise NsWorkerException(
1402 "Error writing to file '{}': {}".format(file_name, e)
1403 )
1404
1405 def _load_plugin(self, name, type="vim"):
1406 # type can be vim or sdn
1407 if "rovim_dummy" not in self.plugins:
1408 self.plugins["rovim_dummy"] = VimDummyConnector
1409
1410 if "rosdn_dummy" not in self.plugins:
1411 self.plugins["rosdn_dummy"] = SdnDummyConnector
1412
1413 if name in self.plugins:
1414 return self.plugins[name]
1415
1416 try:
1417 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1418 self.plugins[name] = ep.load()
1419 except Exception as e:
1420 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1421
1422 if name and name not in self.plugins:
1423 raise NsWorkerException(
1424 "Plugin 'osm_{n}' has not been installed".format(n=name)
1425 )
1426
1427 return self.plugins[name]
1428
1429 def _unload_vim(self, target_id):
1430 """
1431 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1432 :param target_id: Contains type:_id; where type can be 'vim', ...
1433 :return: None.
1434 """
1435 try:
1436 self.db_vims.pop(target_id, None)
1437 self.my_vims.pop(target_id, None)
1438
1439 if target_id in self.vim_targets:
1440 self.vim_targets.remove(target_id)
1441
1442 self.logger.info("Unloaded {}".format(target_id))
1443 rmtree("{}:{}".format(target_id, self.worker_index))
1444 except FileNotFoundError:
1445 # This is raised by rmtree if folder does not exist.
1446 self.logger.exception("FileNotFoundError occured while unloading VIM.")
1447 except Exception as e:
1448 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1449
1450 def _check_vim(self, target_id):
1451 """
1452 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1453 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1454 :return: None.
1455 """
1456 target, _, _id = target_id.partition(":")
1457 now = time.time()
1458 update_dict = {}
1459 unset_dict = {}
1460 op_text = ""
1461 step = ""
1462 loaded = target_id in self.vim_targets
1463 target_database = (
1464 "vim_accounts"
1465 if target == "vim"
1466 else "wim_accounts"
1467 if target == "wim"
1468 else "sdns"
1469 )
1470
1471 try:
1472 step = "Getting {} from db".format(target_id)
1473 db_vim = self.db.get_one(target_database, {"_id": _id})
1474
1475 for op_index, operation in enumerate(
1476 db_vim["_admin"].get("operations", ())
1477 ):
1478 if operation["operationState"] != "PROCESSING":
1479 continue
1480
1481 locked_at = operation.get("locked_at")
1482
1483 if locked_at is not None and locked_at >= now - self.task_locked_time:
1484 # some other thread is doing this operation
1485 return
1486
1487 # lock
1488 op_text = "_admin.operations.{}.".format(op_index)
1489
1490 if not self.db.set_one(
1491 target_database,
1492 q_filter={
1493 "_id": _id,
1494 op_text + "operationState": "PROCESSING",
1495 op_text + "locked_at": locked_at,
1496 },
1497 update_dict={
1498 op_text + "locked_at": now,
1499 "admin.current_operation": op_index,
1500 },
1501 fail_on_empty=False,
1502 ):
1503 return
1504
1505 unset_dict[op_text + "locked_at"] = None
1506 unset_dict["current_operation"] = None
1507 step = "Loading " + target_id
1508 error_text = self._load_vim(target_id)
1509
1510 if not error_text:
1511 step = "Checking connectivity"
1512
1513 if target == "vim":
1514 self.my_vims[target_id].check_vim_connectivity()
1515 else:
1516 self.my_vims[target_id].check_credentials()
1517
1518 update_dict["_admin.operationalState"] = "ENABLED"
1519 update_dict["_admin.detailed-status"] = ""
1520 unset_dict[op_text + "detailed-status"] = None
1521 update_dict[op_text + "operationState"] = "COMPLETED"
1522
1523 return
1524
1525 except Exception as e:
1526 error_text = "{}: {}".format(step, e)
1527 self.logger.error("{} for {}: {}".format(step, target_id, e))
1528
1529 finally:
1530 if update_dict or unset_dict:
1531 if error_text:
1532 update_dict[op_text + "operationState"] = "FAILED"
1533 update_dict[op_text + "detailed-status"] = error_text
1534 unset_dict.pop(op_text + "detailed-status", None)
1535 update_dict["_admin.operationalState"] = "ERROR"
1536 update_dict["_admin.detailed-status"] = error_text
1537
1538 if op_text:
1539 update_dict[op_text + "statusEnteredTime"] = now
1540
1541 self.db.set_one(
1542 target_database,
1543 q_filter={"_id": _id},
1544 update_dict=update_dict,
1545 unset=unset_dict,
1546 fail_on_empty=False,
1547 )
1548
1549 if not loaded:
1550 self._unload_vim(target_id)
1551
1552 def _reload_vim(self, target_id):
1553 if target_id in self.vim_targets:
1554 self._load_vim(target_id)
1555 else:
1556 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1557 # just remove it to force load again next time it is needed
1558 self.db_vims.pop(target_id, None)
1559
1560 def _load_vim(self, target_id):
1561 """
1562 Load or reload a vim_account, sdn_controller or wim_account.
1563 Read content from database, load the plugin if not loaded.
1564 In case of error loading the plugin, it load a failing VIM_connector
1565 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1566 :param target_id: Contains type:_id; where type can be 'vim', ...
1567 :return: None if ok, descriptive text if error
1568 """
1569 target, _, _id = target_id.partition(":")
1570 target_database = (
1571 "vim_accounts"
1572 if target == "vim"
1573 else "wim_accounts"
1574 if target == "wim"
1575 else "sdns"
1576 )
1577 plugin_name = ""
1578 vim = None
1579
1580 try:
1581 step = "Getting {}={} from db".format(target, _id)
1582 # TODO process for wim, sdnc, ...
1583 vim = self.db.get_one(target_database, {"_id": _id})
1584
1585 # if deep_get(vim, "config", "sdn-controller"):
1586 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1587 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1588
1589 step = "Decrypting password"
1590 schema_version = vim.get("schema_version")
1591 self.db.encrypt_decrypt_fields(
1592 vim,
1593 "decrypt",
1594 fields=("password", "secret"),
1595 schema_version=schema_version,
1596 salt=_id,
1597 )
1598 self._process_vim_config(target_id, vim)
1599
1600 if target == "vim":
1601 plugin_name = "rovim_" + vim["vim_type"]
1602 step = "Loading plugin '{}'".format(plugin_name)
1603 vim_module_conn = self._load_plugin(plugin_name)
1604 step = "Loading {}'".format(target_id)
1605 self.my_vims[target_id] = vim_module_conn(
1606 uuid=vim["_id"],
1607 name=vim["name"],
1608 tenant_id=vim.get("vim_tenant_id"),
1609 tenant_name=vim.get("vim_tenant_name"),
1610 url=vim["vim_url"],
1611 url_admin=None,
1612 user=vim["vim_user"],
1613 passwd=vim["vim_password"],
1614 config=vim.get("config") or {},
1615 persistent_info={},
1616 )
1617 else: # sdn
1618 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1619 step = "Loading plugin '{}'".format(plugin_name)
1620 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1621 step = "Loading {}'".format(target_id)
1622 wim = deepcopy(vim)
1623 wim_config = wim.pop("config", {}) or {}
1624 wim["uuid"] = wim["_id"]
1625 if "url" in wim and "wim_url" not in wim:
1626 wim["wim_url"] = wim["url"]
1627 elif "url" not in wim and "wim_url" in wim:
1628 wim["url"] = wim["wim_url"]
1629
1630 if wim.get("dpid"):
1631 wim_config["dpid"] = wim.pop("dpid")
1632
1633 if wim.get("switch_id"):
1634 wim_config["switch_id"] = wim.pop("switch_id")
1635
1636 # wim, wim_account, config
1637 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1638 self.db_vims[target_id] = vim
1639 self.error_status = None
1640
1641 self.logger.info(
1642 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1643 )
1644 except Exception as e:
1645 self.logger.error(
1646 "Cannot load {} plugin={}: {} {}".format(
1647 target_id, plugin_name, step, e
1648 )
1649 )
1650
1651 self.db_vims[target_id] = vim or {}
1652 self.db_vims[target_id] = FailingConnector(str(e))
1653 error_status = "{} Error: {}".format(step, e)
1654
1655 return error_status
1656 finally:
1657 if target_id not in self.vim_targets:
1658 self.vim_targets.append(target_id)
1659
1660 def _get_db_task(self):
1661 """
1662 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1663 :return: None
1664 """
1665 now = time.time()
1666
1667 if not self.time_last_task_processed:
1668 self.time_last_task_processed = now
1669
1670 try:
1671 while True:
1672 """
1673 # Log RO tasks only when loglevel is DEBUG
1674 if self.logger.getEffectiveLevel() == logging.DEBUG:
1675 self._log_ro_task(
1676 None,
1677 None,
1678 None,
1679 "TASK_WF",
1680 "task_locked_time="
1681 + str(self.task_locked_time)
1682 + " "
1683 + "time_last_task_processed="
1684 + str(self.time_last_task_processed)
1685 + " "
1686 + "now="
1687 + str(now),
1688 )
1689 """
1690 locked = self.db.set_one(
1691 "ro_tasks",
1692 q_filter={
1693 "target_id": self.vim_targets,
1694 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1695 "locked_at.lt": now - self.task_locked_time,
1696 "to_check_at.lt": self.time_last_task_processed,
1697 "to_check_at.gt": -1,
1698 },
1699 update_dict={"locked_by": self.my_id, "locked_at": now},
1700 fail_on_empty=False,
1701 )
1702
1703 if locked:
1704 # read and return
1705 ro_task = self.db.get_one(
1706 "ro_tasks",
1707 q_filter={
1708 "target_id": self.vim_targets,
1709 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1710 "locked_at": now,
1711 },
1712 )
1713 return ro_task
1714
1715 if self.time_last_task_processed == now:
1716 self.time_last_task_processed = None
1717 return None
1718 else:
1719 self.time_last_task_processed = now
1720 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1721
1722 except DbException as e:
1723 self.logger.error("Database exception at _get_db_task: {}".format(e))
1724 except Exception as e:
1725 self.logger.critical(
1726 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1727 )
1728
1729 return None
1730
1731 def _get_db_all_tasks(self):
1732 """
1733 Read all content of table ro_tasks to log it
1734 :return: None
1735 """
1736 try:
1737 # Checking the content of the BD:
1738
1739 # read and return
1740 ro_task = self.db.get_list("ro_tasks")
1741 for rt in ro_task:
1742 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1743 return ro_task
1744
1745 except DbException as e:
1746 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1747 except Exception as e:
1748 self.logger.critical(
1749 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1750 )
1751
1752 return None
1753
1754 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1755 """
1756 Generate a log with the following format:
1757
1758 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1759 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1760 task_array_index;task_id;task_action;task_item;task_args
1761
1762 Example:
1763
1764 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1765 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1766 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1767 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1768 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1769 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1770 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1771 """
1772 try:
1773 line = []
1774 i = 0
1775 if ro_task is not None and isinstance(ro_task, dict):
1776 for t in ro_task["tasks"]:
1777 line.clear()
1778 line.append(mark)
1779 line.append(event)
1780 line.append(ro_task.get("_id", ""))
1781 line.append(str(ro_task.get("locked_at", "")))
1782 line.append(str(ro_task.get("modified_at", "")))
1783 line.append(str(ro_task.get("created_at", "")))
1784 line.append(str(ro_task.get("to_check_at", "")))
1785 line.append(str(ro_task.get("locked_by", "")))
1786 line.append(str(ro_task.get("target_id", "")))
1787 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1788 line.append(str(ro_task.get("vim_info", "")))
1789 line.append(str(ro_task.get("tasks", "")))
1790 if isinstance(t, dict):
1791 line.append(str(t.get("status", "")))
1792 line.append(str(t.get("action_id", "")))
1793 line.append(str(i))
1794 line.append(str(t.get("task_id", "")))
1795 line.append(str(t.get("action", "")))
1796 line.append(str(t.get("item", "")))
1797 line.append(str(t.get("find_params", "")))
1798 line.append(str(t.get("params", "")))
1799 else:
1800 line.extend([""] * 2)
1801 line.append(str(i))
1802 line.extend([""] * 5)
1803
1804 i += 1
1805 self.logger.debug(";".join(line))
1806 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1807 i = 0
1808 while True:
1809 st = "tasks.{}.status".format(i)
1810 if st not in db_ro_task_update:
1811 break
1812 line.clear()
1813 line.append(mark)
1814 line.append(event)
1815 line.append(db_ro_task_update.get("_id", ""))
1816 line.append(str(db_ro_task_update.get("locked_at", "")))
1817 line.append(str(db_ro_task_update.get("modified_at", "")))
1818 line.append("")
1819 line.append(str(db_ro_task_update.get("to_check_at", "")))
1820 line.append(str(db_ro_task_update.get("locked_by", "")))
1821 line.append("")
1822 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1823 line.append("")
1824 line.append(str(db_ro_task_update.get("vim_info", "")))
1825 line.append(str(str(db_ro_task_update).count(".status")))
1826 line.append(db_ro_task_update.get(st, ""))
1827 line.append("")
1828 line.append(str(i))
1829 line.extend([""] * 3)
1830 i += 1
1831 self.logger.debug(";".join(line))
1832
1833 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1834 line.clear()
1835 line.append(mark)
1836 line.append(event)
1837 line.append(db_ro_task_delete.get("_id", ""))
1838 line.append("")
1839 line.append(db_ro_task_delete.get("modified_at", ""))
1840 line.extend([""] * 13)
1841 self.logger.debug(";".join(line))
1842
1843 else:
1844 line.clear()
1845 line.append(mark)
1846 line.append(event)
1847 line.extend([""] * 16)
1848 self.logger.debug(";".join(line))
1849
1850 except Exception as e:
1851 self.logger.error("Error logging ro_task: {}".format(e))
1852
1853 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1854 """
1855 Determine if this task need to be done or superseded
1856 :return: None
1857 """
1858 my_task = ro_task["tasks"][task_index]
1859 task_id = my_task["task_id"]
1860 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1861 "created_items", False
1862 )
1863
1864 if my_task["status"] == "FAILED":
1865 return None, None # TODO need to be retry??
1866
1867 try:
1868 for index, task in enumerate(ro_task["tasks"]):
1869 if index == task_index or not task:
1870 continue # own task
1871
1872 if (
1873 my_task["target_record"] == task["target_record"]
1874 and task["action"] == "CREATE"
1875 ):
1876 # set to finished
1877 db_update["tasks.{}.status".format(index)] = task[
1878 "status"
1879 ] = "FINISHED"
1880 elif task["action"] == "CREATE" and task["status"] not in (
1881 "FINISHED",
1882 "SUPERSEDED",
1883 ):
1884 needed_delete = False
1885
1886 if needed_delete:
1887 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1888 else:
1889 return "SUPERSEDED", None
1890 except Exception as e:
1891 if not isinstance(e, NsWorkerException):
1892 self.logger.critical(
1893 "Unexpected exception at _delete_task task={}: {}".format(
1894 task_id, e
1895 ),
1896 exc_info=True,
1897 )
1898
1899 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1900
1901 def _create_task(self, ro_task, task_index, task_depends, db_update):
1902 """
1903 Determine if this task need to create something at VIM
1904 :return: None
1905 """
1906 my_task = ro_task["tasks"][task_index]
1907 task_id = my_task["task_id"]
1908 task_status = None
1909
1910 if my_task["status"] == "FAILED":
1911 return None, None # TODO need to be retry??
1912 elif my_task["status"] == "SCHEDULED":
1913 # check if already created by another task
1914 for index, task in enumerate(ro_task["tasks"]):
1915 if index == task_index or not task:
1916 continue # own task
1917
1918 if task["action"] == "CREATE" and task["status"] not in (
1919 "SCHEDULED",
1920 "FINISHED",
1921 "SUPERSEDED",
1922 ):
1923 return task["status"], "COPY_VIM_INFO"
1924
1925 try:
1926 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1927 ro_task, task_index, task_depends
1928 )
1929 # TODO update other CREATE tasks
1930 except Exception as e:
1931 if not isinstance(e, NsWorkerException):
1932 self.logger.error(
1933 "Error executing task={}: {}".format(task_id, e), exc_info=True
1934 )
1935
1936 task_status = "FAILED"
1937 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1938 # TODO update ro_vim_item_update
1939
1940 return task_status, ro_vim_item_update
1941 else:
1942 return None, None
1943
1944 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1945 """
1946 Look for dependency task
1947 :param task_id: Can be one of
1948 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1949 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1950 3. task.task_id: "<action_id>:number"
1951 :param ro_task:
1952 :param target_id:
1953 :return: database ro_task plus index of task
1954 """
1955 if (
1956 task_id.startswith("vim:")
1957 or task_id.startswith("sdn:")
1958 or task_id.startswith("wim:")
1959 ):
1960 target_id, _, task_id = task_id.partition(" ")
1961
1962 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1963 ro_task_dependency = self.db.get_one(
1964 "ro_tasks",
1965 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1966 fail_on_empty=False,
1967 )
1968
1969 if ro_task_dependency:
1970 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1971 if task["target_record_id"] == task_id:
1972 return ro_task_dependency, task_index
1973
1974 else:
1975 if ro_task:
1976 for task_index, task in enumerate(ro_task["tasks"]):
1977 if task and task["task_id"] == task_id:
1978 return ro_task, task_index
1979
1980 ro_task_dependency = self.db.get_one(
1981 "ro_tasks",
1982 q_filter={
1983 "tasks.ANYINDEX.task_id": task_id,
1984 "tasks.ANYINDEX.target_record.ne": None,
1985 },
1986 fail_on_empty=False,
1987 )
1988
1989 if ro_task_dependency:
1990 for task_index, task in ro_task_dependency["tasks"]:
1991 if task["task_id"] == task_id:
1992 return ro_task_dependency, task_index
1993 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1994
1995 def update_vm_refresh(self):
1996 """Enables the VM status updates if self.refresh_config.active parameter
1997 is not -1 and than updates the DB accordingly
1998
1999 """
2000 try:
2001 self.logger.debug("Checking if VM status update config")
2002 next_refresh = time.time()
2003 if self.refresh_config.active == -1:
2004 next_refresh = -1
2005 else:
2006 next_refresh += self.refresh_config.active
2007
2008 if next_refresh != -1:
2009 db_ro_task_update = {}
2010 now = time.time()
2011 next_check_at = now + (24 * 60 * 60)
2012 next_check_at = min(next_check_at, next_refresh)
2013 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2014 db_ro_task_update["to_check_at"] = next_check_at
2015
2016 self.logger.debug(
2017 "Finding tasks which to be updated to enable VM status updates"
2018 )
2019 refresh_tasks = self.db.get_list(
2020 "ro_tasks",
2021 q_filter={
2022 "tasks.status": "DONE",
2023 "to_check_at.lt": 0,
2024 },
2025 )
2026 self.logger.debug("Updating tasks to change the to_check_at status")
2027 for task in refresh_tasks:
2028 q_filter = {
2029 "_id": task["_id"],
2030 }
2031 self.db.set_one(
2032 "ro_tasks",
2033 q_filter=q_filter,
2034 update_dict=db_ro_task_update,
2035 fail_on_empty=True,
2036 )
2037
2038 except Exception as e:
2039 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2040
2041 def _process_pending_tasks(self, ro_task):
2042 ro_task_id = ro_task["_id"]
2043 now = time.time()
2044 # one day
2045 next_check_at = now + (24 * 60 * 60)
2046 db_ro_task_update = {}
2047
2048 def _update_refresh(new_status):
2049 # compute next_refresh
2050 nonlocal task
2051 nonlocal next_check_at
2052 nonlocal db_ro_task_update
2053 nonlocal ro_task
2054
2055 next_refresh = time.time()
2056
2057 if task["item"] in ("image", "flavor"):
2058 next_refresh += self.refresh_config.image
2059 elif new_status == "BUILD":
2060 next_refresh += self.refresh_config.build
2061 elif new_status == "DONE":
2062 if self.refresh_config.active == -1:
2063 next_refresh = -1
2064 else:
2065 next_refresh += self.refresh_config.active
2066 else:
2067 next_refresh += self.refresh_config.error
2068
2069 next_check_at = min(next_check_at, next_refresh)
2070 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2071 ro_task["vim_info"]["refresh_at"] = next_refresh
2072
2073 try:
2074 """
2075 # Log RO tasks only when loglevel is DEBUG
2076 if self.logger.getEffectiveLevel() == logging.DEBUG:
2077 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2078 """
2079 # Check if vim status refresh is enabled again
2080 self.update_vm_refresh()
2081 # 0: get task_status_create
2082 lock_object = None
2083 task_status_create = None
2084 task_create = next(
2085 (
2086 t
2087 for t in ro_task["tasks"]
2088 if t
2089 and t["action"] == "CREATE"
2090 and t["status"] in ("BUILD", "DONE")
2091 ),
2092 None,
2093 )
2094
2095 if task_create:
2096 task_status_create = task_create["status"]
2097
2098 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2099 for task_action in ("DELETE", "CREATE", "EXEC"):
2100 db_vim_update = None
2101 new_status = None
2102
2103 for task_index, task in enumerate(ro_task["tasks"]):
2104 if not task:
2105 continue # task deleted
2106
2107 task_depends = {}
2108 target_update = None
2109
2110 if (
2111 (
2112 task_action in ("DELETE", "EXEC")
2113 and task["status"] not in ("SCHEDULED", "BUILD")
2114 )
2115 or task["action"] != task_action
2116 or (
2117 task_action == "CREATE"
2118 and task["status"] in ("FINISHED", "SUPERSEDED")
2119 )
2120 ):
2121 continue
2122
2123 task_path = "tasks.{}.status".format(task_index)
2124 try:
2125 db_vim_info_update = None
2126
2127 if task["status"] == "SCHEDULED":
2128 # check if tasks that this depends on have been completed
2129 dependency_not_completed = False
2130
2131 for dependency_task_id in task.get("depends_on") or ():
2132 (
2133 dependency_ro_task,
2134 dependency_task_index,
2135 ) = self._get_dependency(
2136 dependency_task_id, target_id=ro_task["target_id"]
2137 )
2138 dependency_task = dependency_ro_task["tasks"][
2139 dependency_task_index
2140 ]
2141
2142 if dependency_task["status"] == "SCHEDULED":
2143 dependency_not_completed = True
2144 next_check_at = min(
2145 next_check_at, dependency_ro_task["to_check_at"]
2146 )
2147 # must allow dependent task to be processed first
2148 # to do this set time after last_task_processed
2149 next_check_at = max(
2150 self.time_last_task_processed, next_check_at
2151 )
2152 break
2153 elif dependency_task["status"] == "FAILED":
2154 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2155 task["action"],
2156 task["item"],
2157 dependency_task["action"],
2158 dependency_task["item"],
2159 dependency_task_id,
2160 dependency_ro_task["vim_info"].get(
2161 "vim_details"
2162 ),
2163 )
2164 self.logger.error(
2165 "task={} {}".format(task["task_id"], error_text)
2166 )
2167 raise NsWorkerException(error_text)
2168
2169 task_depends[dependency_task_id] = dependency_ro_task[
2170 "vim_info"
2171 ]["vim_id"]
2172 task_depends[
2173 "TASK-{}".format(dependency_task_id)
2174 ] = dependency_ro_task["vim_info"]["vim_id"]
2175
2176 if dependency_not_completed:
2177 # TODO set at vim_info.vim_details that it is waiting
2178 continue
2179
2180 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2181 # the task of renew this locking. It will update database locket_at periodically
2182 if not lock_object:
2183 lock_object = LockRenew.add_lock_object(
2184 "ro_tasks", ro_task, self
2185 )
2186
2187 if task["action"] == "DELETE":
2188 (new_status, db_vim_info_update,) = self._delete_task(
2189 ro_task, task_index, task_depends, db_ro_task_update
2190 )
2191 new_status = (
2192 "FINISHED" if new_status == "DONE" else new_status
2193 )
2194 # ^with FINISHED instead of DONE it will not be refreshing
2195
2196 if new_status in ("FINISHED", "SUPERSEDED"):
2197 target_update = "DELETE"
2198 elif task["action"] == "EXEC":
2199 (
2200 new_status,
2201 db_vim_info_update,
2202 db_task_update,
2203 ) = self.item2class[task["item"]].exec(
2204 ro_task, task_index, task_depends
2205 )
2206 new_status = (
2207 "FINISHED" if new_status == "DONE" else new_status
2208 )
2209 # ^with FINISHED instead of DONE it will not be refreshing
2210
2211 if db_task_update:
2212 # load into database the modified db_task_update "retries" and "next_retry"
2213 if db_task_update.get("retries"):
2214 db_ro_task_update[
2215 "tasks.{}.retries".format(task_index)
2216 ] = db_task_update["retries"]
2217
2218 next_check_at = time.time() + db_task_update.get(
2219 "next_retry", 60
2220 )
2221 target_update = None
2222 elif task["action"] == "CREATE":
2223 if task["status"] == "SCHEDULED":
2224 if task_status_create:
2225 new_status = task_status_create
2226 target_update = "COPY_VIM_INFO"
2227 else:
2228 new_status, db_vim_info_update = self.item2class[
2229 task["item"]
2230 ].new(ro_task, task_index, task_depends)
2231 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2232 _update_refresh(new_status)
2233 else:
2234 refresh_at = ro_task["vim_info"]["refresh_at"]
2235 if refresh_at and refresh_at != -1 and now > refresh_at:
2236 new_status, db_vim_info_update = self.item2class[
2237 task["item"]
2238 ].refresh(ro_task)
2239 _update_refresh(new_status)
2240 else:
2241 # The refresh is updated to avoid set the value of "refresh_at" to
2242 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2243 # because it can happen that in this case the task is never processed
2244 _update_refresh(task["status"])
2245
2246 except Exception as e:
2247 new_status = "FAILED"
2248 db_vim_info_update = {
2249 "vim_status": "VIM_ERROR",
2250 "vim_details": str(e),
2251 }
2252
2253 if not isinstance(
2254 e, (NsWorkerException, vimconn.VimConnException)
2255 ):
2256 self.logger.error(
2257 "Unexpected exception at _delete_task task={}: {}".format(
2258 task["task_id"], e
2259 ),
2260 exc_info=True,
2261 )
2262
2263 try:
2264 if db_vim_info_update:
2265 db_vim_update = db_vim_info_update.copy()
2266 db_ro_task_update.update(
2267 {
2268 "vim_info." + k: v
2269 for k, v in db_vim_info_update.items()
2270 }
2271 )
2272 ro_task["vim_info"].update(db_vim_info_update)
2273
2274 if new_status:
2275 if task_action == "CREATE":
2276 task_status_create = new_status
2277 db_ro_task_update[task_path] = new_status
2278
2279 if target_update or db_vim_update:
2280 if target_update == "DELETE":
2281 self._update_target(task, None)
2282 elif target_update == "COPY_VIM_INFO":
2283 self._update_target(task, ro_task["vim_info"])
2284 else:
2285 self._update_target(task, db_vim_update)
2286
2287 except Exception as e:
2288 if (
2289 isinstance(e, DbException)
2290 and e.http_code == HTTPStatus.NOT_FOUND
2291 ):
2292 # if the vnfrs or nsrs has been removed from database, this task must be removed
2293 self.logger.debug(
2294 "marking to delete task={}".format(task["task_id"])
2295 )
2296 self.tasks_to_delete.append(task)
2297 else:
2298 self.logger.error(
2299 "Unexpected exception at _update_target task={}: {}".format(
2300 task["task_id"], e
2301 ),
2302 exc_info=True,
2303 )
2304
2305 locked_at = ro_task["locked_at"]
2306
2307 if lock_object:
2308 locked_at = [
2309 lock_object["locked_at"],
2310 lock_object["locked_at"] + self.task_locked_time,
2311 ]
2312 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2313 # contain exactly locked_at + self.task_locked_time
2314 LockRenew.remove_lock_object(lock_object)
2315
2316 q_filter = {
2317 "_id": ro_task["_id"],
2318 "to_check_at": ro_task["to_check_at"],
2319 "locked_at": locked_at,
2320 }
2321 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2322 # outside this task (by ro_nbi) do not update it
2323 db_ro_task_update["locked_by"] = None
2324 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2325 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2326 db_ro_task_update["modified_at"] = now
2327 db_ro_task_update["to_check_at"] = next_check_at
2328
2329 """
2330 # Log RO tasks only when loglevel is DEBUG
2331 if self.logger.getEffectiveLevel() == logging.DEBUG:
2332 db_ro_task_update_log = db_ro_task_update.copy()
2333 db_ro_task_update_log["_id"] = q_filter["_id"]
2334 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2335 """
2336
2337 if not self.db.set_one(
2338 "ro_tasks",
2339 update_dict=db_ro_task_update,
2340 q_filter=q_filter,
2341 fail_on_empty=False,
2342 ):
2343 del db_ro_task_update["to_check_at"]
2344 del q_filter["to_check_at"]
2345 """
2346 # Log RO tasks only when loglevel is DEBUG
2347 if self.logger.getEffectiveLevel() == logging.DEBUG:
2348 self._log_ro_task(
2349 None,
2350 db_ro_task_update_log,
2351 None,
2352 "TASK_WF",
2353 "SET_TASK " + str(q_filter),
2354 )
2355 """
2356 self.db.set_one(
2357 "ro_tasks",
2358 q_filter=q_filter,
2359 update_dict=db_ro_task_update,
2360 fail_on_empty=True,
2361 )
2362 except DbException as e:
2363 self.logger.error(
2364 "ro_task={} Error updating database {}".format(ro_task_id, e)
2365 )
2366 except Exception as e:
2367 self.logger.error(
2368 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2369 )
2370
2371 def _update_target(self, task, ro_vim_item_update):
2372 table, _, temp = task["target_record"].partition(":")
2373 _id, _, path_vim_status = temp.partition(":")
2374 path_item = path_vim_status[: path_vim_status.rfind(".")]
2375 path_item = path_item[: path_item.rfind(".")]
2376 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2377 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2378
2379 if ro_vim_item_update:
2380 update_dict = {
2381 path_vim_status + "." + k: v
2382 for k, v in ro_vim_item_update.items()
2383 if k
2384 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2385 }
2386
2387 if path_vim_status.startswith("vdur."):
2388 # for backward compatibility, add vdur.name apart from vdur.vim_name
2389 if ro_vim_item_update.get("vim_name"):
2390 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2391
2392 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2393 if ro_vim_item_update.get("vim_id"):
2394 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2395
2396 # update general status
2397 if ro_vim_item_update.get("vim_status"):
2398 update_dict[path_item + ".status"] = ro_vim_item_update[
2399 "vim_status"
2400 ]
2401
2402 if ro_vim_item_update.get("interfaces"):
2403 path_interfaces = path_item + ".interfaces"
2404
2405 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2406 if iface:
2407 update_dict.update(
2408 {
2409 path_interfaces + ".{}.".format(i) + k: v
2410 for k, v in iface.items()
2411 if k in ("vlan", "compute_node", "pci")
2412 }
2413 )
2414
2415 # put ip_address and mac_address with ip-address and mac-address
2416 if iface.get("ip_address"):
2417 update_dict[
2418 path_interfaces + ".{}.".format(i) + "ip-address"
2419 ] = iface["ip_address"]
2420
2421 if iface.get("mac_address"):
2422 update_dict[
2423 path_interfaces + ".{}.".format(i) + "mac-address"
2424 ] = iface["mac_address"]
2425
2426 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2427 update_dict["ip-address"] = iface.get("ip_address").split(
2428 ";"
2429 )[0]
2430
2431 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2432 update_dict[path_item + ".ip-address"] = iface.get(
2433 "ip_address"
2434 ).split(";")[0]
2435
2436 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2437 else:
2438 update_dict = {path_item + ".status": "DELETED"}
2439 self.db.set_one(
2440 table,
2441 q_filter={"_id": _id},
2442 update_dict=update_dict,
2443 unset={path_vim_status: None},
2444 )
2445
2446 def _process_delete_db_tasks(self):
2447 """
2448 Delete task from database because vnfrs or nsrs or both have been deleted
2449 :return: None. Uses and modify self.tasks_to_delete
2450 """
2451 while self.tasks_to_delete:
2452 task = self.tasks_to_delete[0]
2453 vnfrs_deleted = None
2454 nsr_id = task["nsr_id"]
2455
2456 if task["target_record"].startswith("vnfrs:"):
2457 # check if nsrs is present
2458 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2459 vnfrs_deleted = task["target_record"].split(":")[1]
2460
2461 try:
2462 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2463 except Exception as e:
2464 self.logger.error(
2465 "Error deleting task={}: {}".format(task["task_id"], e)
2466 )
2467 self.tasks_to_delete.pop(0)
2468
2469 @staticmethod
2470 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2471 """
2472 Static method because it is called from osm_ng_ro.ns
2473 :param db: instance of database to use
2474 :param nsr_id: affected nsrs id
2475 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2476 :return: None, exception is fails
2477 """
2478 retries = 5
2479 for retry in range(retries):
2480 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2481 now = time.time()
2482 conflict = False
2483
2484 for ro_task in ro_tasks:
2485 db_update = {}
2486 to_delete_ro_task = True
2487
2488 for index, task in enumerate(ro_task["tasks"]):
2489 if not task:
2490 pass
2491 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2492 vnfrs_deleted
2493 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2494 ):
2495 db_update["tasks.{}".format(index)] = None
2496 else:
2497 # used by other nsr, ro_task cannot be deleted
2498 to_delete_ro_task = False
2499
2500 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2501 if to_delete_ro_task:
2502 if not db.del_one(
2503 "ro_tasks",
2504 q_filter={
2505 "_id": ro_task["_id"],
2506 "modified_at": ro_task["modified_at"],
2507 },
2508 fail_on_empty=False,
2509 ):
2510 conflict = True
2511 elif db_update:
2512 db_update["modified_at"] = now
2513 if not db.set_one(
2514 "ro_tasks",
2515 q_filter={
2516 "_id": ro_task["_id"],
2517 "modified_at": ro_task["modified_at"],
2518 },
2519 update_dict=db_update,
2520 fail_on_empty=False,
2521 ):
2522 conflict = True
2523 if not conflict:
2524 return
2525 else:
2526 raise NsWorkerException("Exceeded {} retries".format(retries))
2527
2528 def run(self):
2529 # load database
2530 self.logger.info("Starting")
2531 while True:
2532 # step 1: get commands from queue
2533 try:
2534 if self.vim_targets:
2535 task = self.task_queue.get(block=False)
2536 else:
2537 if not self.idle:
2538 self.logger.debug("enters in idle state")
2539 self.idle = True
2540 task = self.task_queue.get(block=True)
2541 self.idle = False
2542
2543 if task[0] == "terminate":
2544 break
2545 elif task[0] == "load_vim":
2546 self.logger.info("order to load vim {}".format(task[1]))
2547 self._load_vim(task[1])
2548 elif task[0] == "unload_vim":
2549 self.logger.info("order to unload vim {}".format(task[1]))
2550 self._unload_vim(task[1])
2551 elif task[0] == "reload_vim":
2552 self._reload_vim(task[1])
2553 elif task[0] == "check_vim":
2554 self.logger.info("order to check vim {}".format(task[1]))
2555 self._check_vim(task[1])
2556 continue
2557 except Exception as e:
2558 if isinstance(e, queue.Empty):
2559 pass
2560 else:
2561 self.logger.critical(
2562 "Error processing task: {}".format(e), exc_info=True
2563 )
2564
2565 # step 2: process pending_tasks, delete not needed tasks
2566 try:
2567 if self.tasks_to_delete:
2568 self._process_delete_db_tasks()
2569 busy = False
2570 """
2571 # Log RO tasks only when loglevel is DEBUG
2572 if self.logger.getEffectiveLevel() == logging.DEBUG:
2573 _ = self._get_db_all_tasks()
2574 """
2575 ro_task = self._get_db_task()
2576 if ro_task:
2577 self._process_pending_tasks(ro_task)
2578 busy = True
2579 if not busy:
2580 time.sleep(5)
2581 except Exception as e:
2582 self.logger.critical(
2583 "Unexpected exception at run: " + str(e), exc_info=True
2584 )
2585
2586 self.logger.info("Finishing")