Feature 10960 Performance optimizations for the polling of VM status in RO
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn, vimconn
43 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
44 from osm_ro_plugin.vim_dummy import VimDummyConnector
45 import yaml
46
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 }
217 self.logger.debug(
218 "task={} {} new-net={} created={}".format(
219 task_id, ro_task["target_id"], vim_net_id, created
220 )
221 )
222
223 return "BUILD", ro_vim_item_update
224 except (vimconn.VimConnException, NsWorkerException) as e:
225 self.logger.error(
226 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
227 )
228 ro_vim_item_update = {
229 "vim_status": "VIM_ERROR",
230 "created": created,
231 "vim_details": str(e),
232 }
233
234 return "FAILED", ro_vim_item_update
235
236 def refresh(self, ro_task):
237 """Call VIM to get network status"""
238 ro_task_id = ro_task["_id"]
239 target_vim = self.my_vims[ro_task["target_id"]]
240 vim_id = ro_task["vim_info"]["vim_id"]
241 net_to_refresh_list = [vim_id]
242
243 try:
244 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
245 vim_info = vim_dict[vim_id]
246
247 if vim_info["status"] == "ACTIVE":
248 task_status = "DONE"
249 elif vim_info["status"] == "BUILD":
250 task_status = "BUILD"
251 else:
252 task_status = "FAILED"
253 except vimconn.VimConnException as e:
254 # Mark all tasks at VIM_ERROR status
255 self.logger.error(
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id, ro_task["target_id"], vim_id, e
258 )
259 )
260 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
261 task_status = "FAILED"
262
263 ro_vim_item_update = {}
264 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
265 ro_vim_item_update["vim_status"] = vim_info["status"]
266
267 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
268 ro_vim_item_update["vim_name"] = vim_info.get("name")
269
270 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
272 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
273 elif vim_info["status"] == "DELETED":
274 ro_vim_item_update["vim_id"] = None
275 ro_vim_item_update["vim_details"] = "Deleted externally"
276 else:
277 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
278 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
279
280 if ro_vim_item_update:
281 self.logger.debug(
282 "ro_task={} {} get-net={}: status={} {}".format(
283 ro_task_id,
284 ro_task["target_id"],
285 vim_id,
286 ro_vim_item_update.get("vim_status"),
287 ro_vim_item_update.get("vim_details")
288 if ro_vim_item_update.get("vim_status") != "ACTIVE"
289 else "",
290 )
291 )
292
293 return task_status, ro_vim_item_update
294
295 def delete(self, ro_task, task_index):
296 task = ro_task["tasks"][task_index]
297 task_id = task["task_id"]
298 net_vim_id = ro_task["vim_info"]["vim_id"]
299 ro_vim_item_update_ok = {
300 "vim_status": "DELETED",
301 "created": False,
302 "vim_details": "DELETED",
303 "vim_id": None,
304 }
305
306 try:
307 if net_vim_id or ro_task["vim_info"]["created_items"]:
308 target_vim = self.my_vims[ro_task["target_id"]]
309 target_vim.delete_network(
310 net_vim_id, ro_task["vim_info"]["created_items"]
311 )
312 except vimconn.VimConnNotFoundException:
313 ro_vim_item_update_ok["vim_details"] = "already deleted"
314 except vimconn.VimConnException as e:
315 self.logger.error(
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task["_id"], ro_task["target_id"], net_vim_id, e
318 )
319 )
320 ro_vim_item_update = {
321 "vim_status": "VIM_ERROR",
322 "vim_details": "Error while deleting: {}".format(e),
323 }
324
325 return "FAILED", ro_vim_item_update
326
327 self.logger.debug(
328 "task={} {} del-net={} {}".format(
329 task_id,
330 ro_task["target_id"],
331 net_vim_id,
332 ro_vim_item_update_ok.get("vim_details", ""),
333 )
334 )
335
336 return "DONE", ro_vim_item_update_ok
337
338
339 class VimInteractionVdu(VimInteractionBase):
340 max_retries_inject_ssh_key = 20 # 20 times
341 time_retries_inject_ssh_key = 30 # wevery 30 seconds
342
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 created_items = {}
348 target_vim = self.my_vims[ro_task["target_id"]]
349
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391
392 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
393 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
394
395 ro_vim_item_update = {
396 "vim_id": vim_vm_id,
397 "vim_status": "BUILD",
398 "created": created,
399 "created_items": created_items,
400 "vim_details": None,
401 "interfaces_vim_ids": interfaces,
402 "interfaces": [],
403 }
404 self.logger.debug(
405 "task={} {} new-vm={} created={}".format(
406 task_id, ro_task["target_id"], vim_vm_id, created
407 )
408 )
409
410 return "BUILD", ro_vim_item_update
411 except (vimconn.VimConnException, NsWorkerException) as e:
412 self.logger.error(
413 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
414 )
415 ro_vim_item_update = {
416 "vim_status": "VIM_ERROR",
417 "created": created,
418 "vim_details": str(e),
419 }
420
421 return "FAILED", ro_vim_item_update
422
423 def delete(self, ro_task, task_index):
424 task = ro_task["tasks"][task_index]
425 task_id = task["task_id"]
426 vm_vim_id = ro_task["vim_info"]["vim_id"]
427 ro_vim_item_update_ok = {
428 "vim_status": "DELETED",
429 "created": False,
430 "vim_details": "DELETED",
431 "vim_id": None,
432 }
433
434 try:
435 if vm_vim_id or ro_task["vim_info"]["created_items"]:
436 target_vim = self.my_vims[ro_task["target_id"]]
437 target_vim.delete_vminstance(
438 vm_vim_id, ro_task["vim_info"]["created_items"]
439 )
440 except vimconn.VimConnNotFoundException:
441 ro_vim_item_update_ok["vim_details"] = "already deleted"
442 except vimconn.VimConnException as e:
443 self.logger.error(
444 "ro_task={} vim={} del-vm={}: {}".format(
445 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
446 )
447 )
448 ro_vim_item_update = {
449 "vim_status": "VIM_ERROR",
450 "vim_details": "Error while deleting: {}".format(e),
451 }
452
453 return "FAILED", ro_vim_item_update
454
455 self.logger.debug(
456 "task={} {} del-vm={} {}".format(
457 task_id,
458 ro_task["target_id"],
459 vm_vim_id,
460 ro_vim_item_update_ok.get("vim_details", ""),
461 )
462 )
463
464 return "DONE", ro_vim_item_update_ok
465
466 def refresh(self, ro_task):
467 """Call VIM to get vm status"""
468 ro_task_id = ro_task["_id"]
469 target_vim = self.my_vims[ro_task["target_id"]]
470 vim_id = ro_task["vim_info"]["vim_id"]
471
472 if not vim_id:
473 return None, None
474
475 vm_to_refresh_list = [vim_id]
476 try:
477 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
478 vim_info = vim_dict[vim_id]
479
480 if vim_info["status"] == "ACTIVE":
481 task_status = "DONE"
482 elif vim_info["status"] == "BUILD":
483 task_status = "BUILD"
484 else:
485 task_status = "FAILED"
486
487 # try to load and parse vim_information
488 try:
489 vim_info_info = yaml.safe_load(vim_info["vim_info"])
490 if vim_info_info.get("name"):
491 vim_info["name"] = vim_info_info["name"]
492 except Exception as vim_info_error:
493 self.logger.exception(
494 f"{vim_info_error} occured while getting the vim_info from yaml"
495 )
496 except vimconn.VimConnException as e:
497 # Mark all tasks at VIM_ERROR status
498 self.logger.error(
499 "ro_task={} vim={} get-vm={}: {}".format(
500 ro_task_id, ro_task["target_id"], vim_id, e
501 )
502 )
503 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
504 task_status = "FAILED"
505
506 ro_vim_item_update = {}
507
508 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
509 vim_interfaces = []
510 if vim_info.get("interfaces"):
511 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
512 iface = next(
513 (
514 iface
515 for iface in vim_info["interfaces"]
516 if vim_iface_id == iface["vim_interface_id"]
517 ),
518 None,
519 )
520 # if iface:
521 # iface.pop("vim_info", None)
522 vim_interfaces.append(iface)
523
524 task_create = next(
525 t
526 for t in ro_task["tasks"]
527 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
528 )
529 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
530 vim_interfaces[task_create["mgmt_vnf_interface"]][
531 "mgmt_vnf_interface"
532 ] = True
533
534 mgmt_vdu_iface = task_create.get(
535 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
536 )
537 if vim_interfaces:
538 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
539
540 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
541 ro_vim_item_update["interfaces"] = vim_interfaces
542
543 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
544 ro_vim_item_update["vim_status"] = vim_info["status"]
545
546 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
547 ro_vim_item_update["vim_name"] = vim_info.get("name")
548
549 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
550 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
551 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
552 elif vim_info["status"] == "DELETED":
553 ro_vim_item_update["vim_id"] = None
554 ro_vim_item_update["vim_details"] = "Deleted externally"
555 else:
556 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
557 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
558
559 if ro_vim_item_update:
560 self.logger.debug(
561 "ro_task={} {} get-vm={}: status={} {}".format(
562 ro_task_id,
563 ro_task["target_id"],
564 vim_id,
565 ro_vim_item_update.get("vim_status"),
566 ro_vim_item_update.get("vim_details")
567 if ro_vim_item_update.get("vim_status") != "ACTIVE"
568 else "",
569 )
570 )
571
572 return task_status, ro_vim_item_update
573
574 def exec(self, ro_task, task_index, task_depends):
575 task = ro_task["tasks"][task_index]
576 task_id = task["task_id"]
577 target_vim = self.my_vims[ro_task["target_id"]]
578 db_task_update = {"retries": 0}
579 retries = task.get("retries", 0)
580
581 try:
582 params = task["params"]
583 params_copy = deepcopy(params)
584 params_copy["ro_key"] = self.db.decrypt(
585 params_copy.pop("private_key"),
586 params_copy.pop("schema_version"),
587 params_copy.pop("salt"),
588 )
589 params_copy["ip_addr"] = params_copy.pop("ip_address")
590 target_vim.inject_user_key(**params_copy)
591 self.logger.debug(
592 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
593 )
594
595 return (
596 "DONE",
597 None,
598 db_task_update,
599 ) # params_copy["key"]
600 except (vimconn.VimConnException, NsWorkerException) as e:
601 retries += 1
602
603 self.logger.debug(traceback.format_exc())
604 if retries < self.max_retries_inject_ssh_key:
605 return (
606 "BUILD",
607 None,
608 {
609 "retries": retries,
610 "next_retry": self.time_retries_inject_ssh_key,
611 },
612 )
613
614 self.logger.error(
615 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
616 )
617 ro_vim_item_update = {"vim_details": str(e)}
618
619 return "FAILED", ro_vim_item_update, db_task_update
620
621
622 class VimInteractionImage(VimInteractionBase):
623 def new(self, ro_task, task_index, task_depends):
624 task = ro_task["tasks"][task_index]
625 task_id = task["task_id"]
626 created = False
627 created_items = {}
628 target_vim = self.my_vims[ro_task["target_id"]]
629
630 try:
631 # FIND
632 if task.get("find_params"):
633 vim_images = target_vim.get_image_list(**task["find_params"])
634
635 if not vim_images:
636 raise NsWorkerExceptionNotFound(
637 "Image not found with this criteria: '{}'".format(
638 task["find_params"]
639 )
640 )
641 elif len(vim_images) > 1:
642 raise NsWorkerException(
643 "More than one image found with this criteria: '{}'".format(
644 task["find_params"]
645 )
646 )
647 else:
648 vim_image_id = vim_images[0]["id"]
649
650 ro_vim_item_update = {
651 "vim_id": vim_image_id,
652 "vim_status": "DONE",
653 "created": created,
654 "created_items": created_items,
655 "vim_details": None,
656 }
657 self.logger.debug(
658 "task={} {} new-image={} created={}".format(
659 task_id, ro_task["target_id"], vim_image_id, created
660 )
661 )
662
663 return "DONE", ro_vim_item_update
664 except (NsWorkerException, vimconn.VimConnException) as e:
665 self.logger.error(
666 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
667 )
668 ro_vim_item_update = {
669 "vim_status": "VIM_ERROR",
670 "created": created,
671 "vim_details": str(e),
672 }
673
674 return "FAILED", ro_vim_item_update
675
676
677 class VimInteractionFlavor(VimInteractionBase):
678 def delete(self, ro_task, task_index):
679 task = ro_task["tasks"][task_index]
680 task_id = task["task_id"]
681 flavor_vim_id = ro_task["vim_info"]["vim_id"]
682 ro_vim_item_update_ok = {
683 "vim_status": "DELETED",
684 "created": False,
685 "vim_details": "DELETED",
686 "vim_id": None,
687 }
688
689 try:
690 if flavor_vim_id:
691 target_vim = self.my_vims[ro_task["target_id"]]
692 target_vim.delete_flavor(flavor_vim_id)
693 except vimconn.VimConnNotFoundException:
694 ro_vim_item_update_ok["vim_details"] = "already deleted"
695 except vimconn.VimConnException as e:
696 self.logger.error(
697 "ro_task={} vim={} del-flavor={}: {}".format(
698 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
699 )
700 )
701 ro_vim_item_update = {
702 "vim_status": "VIM_ERROR",
703 "vim_details": "Error while deleting: {}".format(e),
704 }
705
706 return "FAILED", ro_vim_item_update
707
708 self.logger.debug(
709 "task={} {} del-flavor={} {}".format(
710 task_id,
711 ro_task["target_id"],
712 flavor_vim_id,
713 ro_vim_item_update_ok.get("vim_details", ""),
714 )
715 )
716
717 return "DONE", ro_vim_item_update_ok
718
719 def new(self, ro_task, task_index, task_depends):
720 task = ro_task["tasks"][task_index]
721 task_id = task["task_id"]
722 created = False
723 created_items = {}
724 target_vim = self.my_vims[ro_task["target_id"]]
725
726 try:
727 # FIND
728 vim_flavor_id = None
729
730 if task.get("find_params"):
731 try:
732 flavor_data = task["find_params"]["flavor_data"]
733 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
734 except vimconn.VimConnNotFoundException:
735 self.logger.exception("VimConnNotFoundException occured.")
736
737 if not vim_flavor_id and task.get("params"):
738 # CREATE
739 flavor_data = task["params"]["flavor_data"]
740 vim_flavor_id = target_vim.new_flavor(flavor_data)
741 created = True
742
743 ro_vim_item_update = {
744 "vim_id": vim_flavor_id,
745 "vim_status": "DONE",
746 "created": created,
747 "created_items": created_items,
748 "vim_details": None,
749 }
750 self.logger.debug(
751 "task={} {} new-flavor={} created={}".format(
752 task_id, ro_task["target_id"], vim_flavor_id, created
753 )
754 )
755
756 return "DONE", ro_vim_item_update
757 except (vimconn.VimConnException, NsWorkerException) as e:
758 self.logger.error(
759 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
760 )
761 ro_vim_item_update = {
762 "vim_status": "VIM_ERROR",
763 "created": created,
764 "vim_details": str(e),
765 }
766
767 return "FAILED", ro_vim_item_update
768
769
770 class VimInteractionAffinityGroup(VimInteractionBase):
771 def delete(self, ro_task, task_index):
772 task = ro_task["tasks"][task_index]
773 task_id = task["task_id"]
774 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
775 ro_vim_item_update_ok = {
776 "vim_status": "DELETED",
777 "created": False,
778 "vim_details": "DELETED",
779 "vim_id": None,
780 }
781
782 try:
783 if affinity_group_vim_id:
784 target_vim = self.my_vims[ro_task["target_id"]]
785 target_vim.delete_affinity_group(affinity_group_vim_id)
786 except vimconn.VimConnNotFoundException:
787 ro_vim_item_update_ok["vim_details"] = "already deleted"
788 except vimconn.VimConnException as e:
789 self.logger.error(
790 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
791 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
792 )
793 )
794 ro_vim_item_update = {
795 "vim_status": "VIM_ERROR",
796 "vim_details": "Error while deleting: {}".format(e),
797 }
798
799 return "FAILED", ro_vim_item_update
800
801 self.logger.debug(
802 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
803 task_id,
804 ro_task["target_id"],
805 affinity_group_vim_id,
806 ro_vim_item_update_ok.get("vim_details", ""),
807 )
808 )
809
810 return "DONE", ro_vim_item_update_ok
811
812 def new(self, ro_task, task_index, task_depends):
813 task = ro_task["tasks"][task_index]
814 task_id = task["task_id"]
815 created = False
816 created_items = {}
817 target_vim = self.my_vims[ro_task["target_id"]]
818
819 try:
820 affinity_group_vim_id = None
821 affinity_group_data = None
822
823 if task.get("params"):
824 affinity_group_data = task["params"].get("affinity_group_data")
825
826 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
827 try:
828 param_affinity_group_id = task["params"]["affinity_group_data"].get(
829 "vim-affinity-group-id"
830 )
831 affinity_group_vim_id = target_vim.get_affinity_group(
832 param_affinity_group_id
833 ).get("id")
834 except vimconn.VimConnNotFoundException:
835 self.logger.error(
836 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
837 "could not be found at VIM. Creating a new one.".format(
838 task_id, ro_task["target_id"], param_affinity_group_id
839 )
840 )
841
842 if not affinity_group_vim_id and affinity_group_data:
843 affinity_group_vim_id = target_vim.new_affinity_group(
844 affinity_group_data
845 )
846 created = True
847
848 ro_vim_item_update = {
849 "vim_id": affinity_group_vim_id,
850 "vim_status": "DONE",
851 "created": created,
852 "created_items": created_items,
853 "vim_details": None,
854 }
855 self.logger.debug(
856 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
857 task_id, ro_task["target_id"], affinity_group_vim_id, created
858 )
859 )
860
861 return "DONE", ro_vim_item_update
862 except (vimconn.VimConnException, NsWorkerException) as e:
863 self.logger.error(
864 "task={} vim={} new-affinity-or-anti-affinity-group:"
865 " {}".format(task_id, ro_task["target_id"], e)
866 )
867 ro_vim_item_update = {
868 "vim_status": "VIM_ERROR",
869 "created": created,
870 "vim_details": str(e),
871 }
872
873 return "FAILED", ro_vim_item_update
874
875
876 class VimInteractionSdnNet(VimInteractionBase):
877 @staticmethod
878 def _match_pci(port_pci, mapping):
879 """
880 Check if port_pci matches with mapping
881 mapping can have brackets to indicate that several chars are accepted. e.g
882 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
883 :param port_pci: text
884 :param mapping: text, can contain brackets to indicate several chars are available
885 :return: True if matches, False otherwise
886 """
887 if not port_pci or not mapping:
888 return False
889 if port_pci == mapping:
890 return True
891
892 mapping_index = 0
893 pci_index = 0
894 while True:
895 bracket_start = mapping.find("[", mapping_index)
896
897 if bracket_start == -1:
898 break
899
900 bracket_end = mapping.find("]", bracket_start)
901 if bracket_end == -1:
902 break
903
904 length = bracket_start - mapping_index
905 if (
906 length
907 and port_pci[pci_index : pci_index + length]
908 != mapping[mapping_index:bracket_start]
909 ):
910 return False
911
912 if (
913 port_pci[pci_index + length]
914 not in mapping[bracket_start + 1 : bracket_end]
915 ):
916 return False
917
918 pci_index += length + 1
919 mapping_index = bracket_end + 1
920
921 if port_pci[pci_index:] != mapping[mapping_index:]:
922 return False
923
924 return True
925
926 def _get_interfaces(self, vlds_to_connect, vim_account_id):
927 """
928 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
929 :param vim_account_id:
930 :return:
931 """
932 interfaces = []
933
934 for vld in vlds_to_connect:
935 table, _, db_id = vld.partition(":")
936 db_id, _, vld = db_id.partition(":")
937 _, _, vld_id = vld.partition(".")
938
939 if table == "vnfrs":
940 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
941 iface_key = "vnf-vld-id"
942 else: # table == "nsrs"
943 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
944 iface_key = "ns-vld-id"
945
946 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
947
948 for db_vnfr in db_vnfrs:
949 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
950 for iface_index, interface in enumerate(vdur["interfaces"]):
951 if interface.get(iface_key) == vld_id and interface.get(
952 "type"
953 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
954 # only SR-IOV o PT
955 interface_ = interface.copy()
956 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
957 db_vnfr["_id"], vdu_index, iface_index
958 )
959
960 if vdur.get("status") == "ERROR":
961 interface_["status"] = "ERROR"
962
963 interfaces.append(interface_)
964
965 return interfaces
966
967 def refresh(self, ro_task):
968 # look for task create
969 task_create_index, _ = next(
970 i_t
971 for i_t in enumerate(ro_task["tasks"])
972 if i_t[1]
973 and i_t[1]["action"] == "CREATE"
974 and i_t[1]["status"] != "FINISHED"
975 )
976
977 return self.new(ro_task, task_create_index, None)
978
979 def new(self, ro_task, task_index, task_depends):
980 task = ro_task["tasks"][task_index]
981 task_id = task["task_id"]
982 target_vim = self.my_vims[ro_task["target_id"]]
983
984 sdn_net_id = ro_task["vim_info"]["vim_id"]
985
986 created_items = ro_task["vim_info"].get("created_items")
987 connected_ports = ro_task["vim_info"].get("connected_ports", [])
988 new_connected_ports = []
989 last_update = ro_task["vim_info"].get("last_update", 0)
990 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
991 error_list = []
992 created = ro_task["vim_info"].get("created", False)
993
994 try:
995 # CREATE
996 params = task["params"]
997 vlds_to_connect = params.get("vlds", [])
998 associated_vim = params.get("target_vim")
999 # external additional ports
1000 additional_ports = params.get("sdn-ports") or ()
1001 _, _, vim_account_id = (
1002 (None, None, None)
1003 if associated_vim is None
1004 else associated_vim.partition(":")
1005 )
1006
1007 if associated_vim:
1008 # get associated VIM
1009 if associated_vim not in self.db_vims:
1010 self.db_vims[associated_vim] = self.db.get_one(
1011 "vim_accounts", {"_id": vim_account_id}
1012 )
1013
1014 db_vim = self.db_vims[associated_vim]
1015
1016 # look for ports to connect
1017 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1018 # print(ports)
1019
1020 sdn_ports = []
1021 pending_ports = error_ports = 0
1022 vlan_used = None
1023 sdn_need_update = False
1024
1025 for port in ports:
1026 vlan_used = port.get("vlan") or vlan_used
1027
1028 # TODO. Do not connect if already done
1029 if not port.get("compute_node") or not port.get("pci"):
1030 if port.get("status") == "ERROR":
1031 error_ports += 1
1032 else:
1033 pending_ports += 1
1034 continue
1035
1036 pmap = None
1037 compute_node_mappings = next(
1038 (
1039 c
1040 for c in db_vim["config"].get("sdn-port-mapping", ())
1041 if c and c["compute_node"] == port["compute_node"]
1042 ),
1043 None,
1044 )
1045
1046 if compute_node_mappings:
1047 # process port_mapping pci of type 0000:af:1[01].[1357]
1048 pmap = next(
1049 (
1050 p
1051 for p in compute_node_mappings["ports"]
1052 if self._match_pci(port["pci"], p.get("pci"))
1053 ),
1054 None,
1055 )
1056
1057 if not pmap:
1058 if not db_vim["config"].get("mapping_not_needed"):
1059 error_list.append(
1060 "Port mapping not found for compute_node={} pci={}".format(
1061 port["compute_node"], port["pci"]
1062 )
1063 )
1064 continue
1065
1066 pmap = {}
1067
1068 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1069 new_port = {
1070 "service_endpoint_id": pmap.get("service_endpoint_id")
1071 or service_endpoint_id,
1072 "service_endpoint_encapsulation_type": "dot1q"
1073 if port["type"] == "SR-IOV"
1074 else None,
1075 "service_endpoint_encapsulation_info": {
1076 "vlan": port.get("vlan"),
1077 "mac": port.get("mac-address"),
1078 "device_id": pmap.get("device_id") or port["compute_node"],
1079 "device_interface_id": pmap.get("device_interface_id")
1080 or port["pci"],
1081 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1082 "switch_port": pmap.get("switch_port"),
1083 "service_mapping_info": pmap.get("service_mapping_info"),
1084 },
1085 }
1086
1087 # TODO
1088 # if port["modified_at"] > last_update:
1089 # sdn_need_update = True
1090 new_connected_ports.append(port["id"]) # TODO
1091 sdn_ports.append(new_port)
1092
1093 if error_ports:
1094 error_list.append(
1095 "{} interfaces have not been created as VDU is on ERROR status".format(
1096 error_ports
1097 )
1098 )
1099
1100 # connect external ports
1101 for index, additional_port in enumerate(additional_ports):
1102 additional_port_id = additional_port.get(
1103 "service_endpoint_id"
1104 ) or "external-{}".format(index)
1105 sdn_ports.append(
1106 {
1107 "service_endpoint_id": additional_port_id,
1108 "service_endpoint_encapsulation_type": additional_port.get(
1109 "service_endpoint_encapsulation_type", "dot1q"
1110 ),
1111 "service_endpoint_encapsulation_info": {
1112 "vlan": additional_port.get("vlan") or vlan_used,
1113 "mac": additional_port.get("mac_address"),
1114 "device_id": additional_port.get("device_id"),
1115 "device_interface_id": additional_port.get(
1116 "device_interface_id"
1117 ),
1118 "switch_dpid": additional_port.get("switch_dpid")
1119 or additional_port.get("switch_id"),
1120 "switch_port": additional_port.get("switch_port"),
1121 "service_mapping_info": additional_port.get(
1122 "service_mapping_info"
1123 ),
1124 },
1125 }
1126 )
1127 new_connected_ports.append(additional_port_id)
1128 sdn_info = ""
1129
1130 # if there are more ports to connect or they have been modified, call create/update
1131 if error_list:
1132 sdn_status = "ERROR"
1133 sdn_info = "; ".join(error_list)
1134 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1135 last_update = time.time()
1136
1137 if not sdn_net_id:
1138 if len(sdn_ports) < 2:
1139 sdn_status = "ACTIVE"
1140
1141 if not pending_ports:
1142 self.logger.debug(
1143 "task={} {} new-sdn-net done, less than 2 ports".format(
1144 task_id, ro_task["target_id"]
1145 )
1146 )
1147 else:
1148 net_type = params.get("type") or "ELAN"
1149 (
1150 sdn_net_id,
1151 created_items,
1152 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1153 created = True
1154 self.logger.debug(
1155 "task={} {} new-sdn-net={} created={}".format(
1156 task_id, ro_task["target_id"], sdn_net_id, created
1157 )
1158 )
1159 else:
1160 created_items = target_vim.edit_connectivity_service(
1161 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1162 )
1163 created = True
1164 self.logger.debug(
1165 "task={} {} update-sdn-net={} created={}".format(
1166 task_id, ro_task["target_id"], sdn_net_id, created
1167 )
1168 )
1169
1170 connected_ports = new_connected_ports
1171 elif sdn_net_id:
1172 wim_status_dict = target_vim.get_connectivity_service_status(
1173 sdn_net_id, conn_info=created_items
1174 )
1175 sdn_status = wim_status_dict["sdn_status"]
1176
1177 if wim_status_dict.get("sdn_info"):
1178 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1179
1180 if wim_status_dict.get("error_msg"):
1181 sdn_info = wim_status_dict.get("error_msg") or ""
1182
1183 if pending_ports:
1184 if sdn_status != "ERROR":
1185 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1186 len(ports) - pending_ports, len(ports)
1187 )
1188
1189 if sdn_status == "ACTIVE":
1190 sdn_status = "BUILD"
1191
1192 ro_vim_item_update = {
1193 "vim_id": sdn_net_id,
1194 "vim_status": sdn_status,
1195 "created": created,
1196 "created_items": created_items,
1197 "connected_ports": connected_ports,
1198 "vim_details": sdn_info,
1199 "last_update": last_update,
1200 }
1201
1202 return sdn_status, ro_vim_item_update
1203 except Exception as e:
1204 self.logger.error(
1205 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1206 exc_info=not isinstance(
1207 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1208 ),
1209 )
1210 ro_vim_item_update = {
1211 "vim_status": "VIM_ERROR",
1212 "created": created,
1213 "vim_details": str(e),
1214 }
1215
1216 return "FAILED", ro_vim_item_update
1217
1218 def delete(self, ro_task, task_index):
1219 task = ro_task["tasks"][task_index]
1220 task_id = task["task_id"]
1221 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1222 ro_vim_item_update_ok = {
1223 "vim_status": "DELETED",
1224 "created": False,
1225 "vim_details": "DELETED",
1226 "vim_id": None,
1227 }
1228
1229 try:
1230 if sdn_vim_id:
1231 target_vim = self.my_vims[ro_task["target_id"]]
1232 target_vim.delete_connectivity_service(
1233 sdn_vim_id, ro_task["vim_info"].get("created_items")
1234 )
1235
1236 except Exception as e:
1237 if (
1238 isinstance(e, sdnconn.SdnConnectorError)
1239 and e.http_code == HTTPStatus.NOT_FOUND.value
1240 ):
1241 ro_vim_item_update_ok["vim_details"] = "already deleted"
1242 else:
1243 self.logger.error(
1244 "ro_task={} vim={} del-sdn-net={}: {}".format(
1245 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1246 ),
1247 exc_info=not isinstance(
1248 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1249 ),
1250 )
1251 ro_vim_item_update = {
1252 "vim_status": "VIM_ERROR",
1253 "vim_details": "Error while deleting: {}".format(e),
1254 }
1255
1256 return "FAILED", ro_vim_item_update
1257
1258 self.logger.debug(
1259 "task={} {} del-sdn-net={} {}".format(
1260 task_id,
1261 ro_task["target_id"],
1262 sdn_vim_id,
1263 ro_vim_item_update_ok.get("vim_details", ""),
1264 )
1265 )
1266
1267 return "DONE", ro_vim_item_update_ok
1268
1269
1270 class ConfigValidate:
1271 def __init__(self, config: Dict):
1272 self.conf = config
1273
1274 @property
1275 def active(self):
1276 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1277 if (
1278 self.conf["period"]["refresh_active"] >= 60
1279 or self.conf["period"]["refresh_active"] == -1
1280 ):
1281 return self.conf["period"]["refresh_active"]
1282
1283 return 60
1284
1285 @property
1286 def build(self):
1287 return self.conf["period"]["refresh_build"]
1288
1289 @property
1290 def image(self):
1291 return self.conf["period"]["refresh_image"]
1292
1293 @property
1294 def error(self):
1295 return self.conf["period"]["refresh_error"]
1296
1297 @property
1298 def queue_size(self):
1299 return self.conf["period"]["queue_size"]
1300
1301
1302 class NsWorker(threading.Thread):
1303 def __init__(self, worker_index, config, plugins, db):
1304 """
1305
1306 :param worker_index: thread index
1307 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1308 :param plugins: global shared dict with the loaded plugins
1309 :param db: database class instance to use
1310 """
1311 threading.Thread.__init__(self)
1312 self.config = config
1313 self.plugins = plugins
1314 self.plugin_name = "unknown"
1315 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1316 self.worker_index = worker_index
1317 # refresh periods for created items
1318 self.refresh_config = ConfigValidate(config)
1319 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1320 # targetvim: vimplugin class
1321 self.my_vims = {}
1322 # targetvim: vim information from database
1323 self.db_vims = {}
1324 # targetvim list
1325 self.vim_targets = []
1326 self.my_id = config["process_id"] + ":" + str(worker_index)
1327 self.db = db
1328 self.item2class = {
1329 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1330 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1331 "image": VimInteractionImage(
1332 self.db, self.my_vims, self.db_vims, self.logger
1333 ),
1334 "flavor": VimInteractionFlavor(
1335 self.db, self.my_vims, self.db_vims, self.logger
1336 ),
1337 "sdn_net": VimInteractionSdnNet(
1338 self.db, self.my_vims, self.db_vims, self.logger
1339 ),
1340 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1341 self.db, self.my_vims, self.db_vims, self.logger
1342 ),
1343 }
1344 self.time_last_task_processed = None
1345 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1346 self.tasks_to_delete = []
1347 # it is idle when there are not vim_targets associated
1348 self.idle = True
1349 self.task_locked_time = config["global"]["task_locked_time"]
1350
1351 def insert_task(self, task):
1352 try:
1353 self.task_queue.put(task, False)
1354 return None
1355 except queue.Full:
1356 raise NsWorkerException("timeout inserting a task")
1357
1358 def terminate(self):
1359 self.insert_task("exit")
1360
1361 def del_task(self, task):
1362 with self.task_lock:
1363 if task["status"] == "SCHEDULED":
1364 task["status"] = "SUPERSEDED"
1365 return True
1366 else: # task["status"] == "processing"
1367 self.task_lock.release()
1368 return False
1369
1370 def _process_vim_config(self, target_id, db_vim):
1371 """
1372 Process vim config, creating vim configuration files as ca_cert
1373 :param target_id: vim/sdn/wim + id
1374 :param db_vim: Vim dictionary obtained from database
1375 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1376 """
1377 if not db_vim.get("config"):
1378 return
1379
1380 file_name = ""
1381
1382 try:
1383 if db_vim["config"].get("ca_cert_content"):
1384 file_name = "{}:{}".format(target_id, self.worker_index)
1385
1386 try:
1387 mkdir(file_name)
1388 except FileExistsError:
1389 self.logger.exception(
1390 "FileExistsError occured while processing vim_config."
1391 )
1392
1393 file_name = file_name + "/ca_cert"
1394
1395 with open(file_name, "w") as f:
1396 f.write(db_vim["config"]["ca_cert_content"])
1397 del db_vim["config"]["ca_cert_content"]
1398 db_vim["config"]["ca_cert"] = file_name
1399 except Exception as e:
1400 raise NsWorkerException(
1401 "Error writing to file '{}': {}".format(file_name, e)
1402 )
1403
1404 def _load_plugin(self, name, type="vim"):
1405 # type can be vim or sdn
1406 if "rovim_dummy" not in self.plugins:
1407 self.plugins["rovim_dummy"] = VimDummyConnector
1408
1409 if "rosdn_dummy" not in self.plugins:
1410 self.plugins["rosdn_dummy"] = SdnDummyConnector
1411
1412 if name in self.plugins:
1413 return self.plugins[name]
1414
1415 try:
1416 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1417 self.plugins[name] = ep.load()
1418 except Exception as e:
1419 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1420
1421 if name and name not in self.plugins:
1422 raise NsWorkerException(
1423 "Plugin 'osm_{n}' has not been installed".format(n=name)
1424 )
1425
1426 return self.plugins[name]
1427
1428 def _unload_vim(self, target_id):
1429 """
1430 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1431 :param target_id: Contains type:_id; where type can be 'vim', ...
1432 :return: None.
1433 """
1434 try:
1435 self.db_vims.pop(target_id, None)
1436 self.my_vims.pop(target_id, None)
1437
1438 if target_id in self.vim_targets:
1439 self.vim_targets.remove(target_id)
1440
1441 self.logger.info("Unloaded {}".format(target_id))
1442 rmtree("{}:{}".format(target_id, self.worker_index))
1443 except FileNotFoundError:
1444 # This is raised by rmtree if folder does not exist.
1445 self.logger.exception("FileNotFoundError occured while unloading VIM.")
1446 except Exception as e:
1447 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1448
1449 def _check_vim(self, target_id):
1450 """
1451 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1452 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1453 :return: None.
1454 """
1455 target, _, _id = target_id.partition(":")
1456 now = time.time()
1457 update_dict = {}
1458 unset_dict = {}
1459 op_text = ""
1460 step = ""
1461 loaded = target_id in self.vim_targets
1462 target_database = (
1463 "vim_accounts"
1464 if target == "vim"
1465 else "wim_accounts"
1466 if target == "wim"
1467 else "sdns"
1468 )
1469
1470 try:
1471 step = "Getting {} from db".format(target_id)
1472 db_vim = self.db.get_one(target_database, {"_id": _id})
1473
1474 for op_index, operation in enumerate(
1475 db_vim["_admin"].get("operations", ())
1476 ):
1477 if operation["operationState"] != "PROCESSING":
1478 continue
1479
1480 locked_at = operation.get("locked_at")
1481
1482 if locked_at is not None and locked_at >= now - self.task_locked_time:
1483 # some other thread is doing this operation
1484 return
1485
1486 # lock
1487 op_text = "_admin.operations.{}.".format(op_index)
1488
1489 if not self.db.set_one(
1490 target_database,
1491 q_filter={
1492 "_id": _id,
1493 op_text + "operationState": "PROCESSING",
1494 op_text + "locked_at": locked_at,
1495 },
1496 update_dict={
1497 op_text + "locked_at": now,
1498 "admin.current_operation": op_index,
1499 },
1500 fail_on_empty=False,
1501 ):
1502 return
1503
1504 unset_dict[op_text + "locked_at"] = None
1505 unset_dict["current_operation"] = None
1506 step = "Loading " + target_id
1507 error_text = self._load_vim(target_id)
1508
1509 if not error_text:
1510 step = "Checking connectivity"
1511
1512 if target == "vim":
1513 self.my_vims[target_id].check_vim_connectivity()
1514 else:
1515 self.my_vims[target_id].check_credentials()
1516
1517 update_dict["_admin.operationalState"] = "ENABLED"
1518 update_dict["_admin.detailed-status"] = ""
1519 unset_dict[op_text + "detailed-status"] = None
1520 update_dict[op_text + "operationState"] = "COMPLETED"
1521
1522 return
1523
1524 except Exception as e:
1525 error_text = "{}: {}".format(step, e)
1526 self.logger.error("{} for {}: {}".format(step, target_id, e))
1527
1528 finally:
1529 if update_dict or unset_dict:
1530 if error_text:
1531 update_dict[op_text + "operationState"] = "FAILED"
1532 update_dict[op_text + "detailed-status"] = error_text
1533 unset_dict.pop(op_text + "detailed-status", None)
1534 update_dict["_admin.operationalState"] = "ERROR"
1535 update_dict["_admin.detailed-status"] = error_text
1536
1537 if op_text:
1538 update_dict[op_text + "statusEnteredTime"] = now
1539
1540 self.db.set_one(
1541 target_database,
1542 q_filter={"_id": _id},
1543 update_dict=update_dict,
1544 unset=unset_dict,
1545 fail_on_empty=False,
1546 )
1547
1548 if not loaded:
1549 self._unload_vim(target_id)
1550
1551 def _reload_vim(self, target_id):
1552 if target_id in self.vim_targets:
1553 self._load_vim(target_id)
1554 else:
1555 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1556 # just remove it to force load again next time it is needed
1557 self.db_vims.pop(target_id, None)
1558
1559 def _load_vim(self, target_id):
1560 """
1561 Load or reload a vim_account, sdn_controller or wim_account.
1562 Read content from database, load the plugin if not loaded.
1563 In case of error loading the plugin, it load a failing VIM_connector
1564 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1565 :param target_id: Contains type:_id; where type can be 'vim', ...
1566 :return: None if ok, descriptive text if error
1567 """
1568 target, _, _id = target_id.partition(":")
1569 target_database = (
1570 "vim_accounts"
1571 if target == "vim"
1572 else "wim_accounts"
1573 if target == "wim"
1574 else "sdns"
1575 )
1576 plugin_name = ""
1577 vim = None
1578
1579 try:
1580 step = "Getting {}={} from db".format(target, _id)
1581 # TODO process for wim, sdnc, ...
1582 vim = self.db.get_one(target_database, {"_id": _id})
1583
1584 # if deep_get(vim, "config", "sdn-controller"):
1585 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1586 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1587
1588 step = "Decrypting password"
1589 schema_version = vim.get("schema_version")
1590 self.db.encrypt_decrypt_fields(
1591 vim,
1592 "decrypt",
1593 fields=("password", "secret"),
1594 schema_version=schema_version,
1595 salt=_id,
1596 )
1597 self._process_vim_config(target_id, vim)
1598
1599 if target == "vim":
1600 plugin_name = "rovim_" + vim["vim_type"]
1601 step = "Loading plugin '{}'".format(plugin_name)
1602 vim_module_conn = self._load_plugin(plugin_name)
1603 step = "Loading {}'".format(target_id)
1604 self.my_vims[target_id] = vim_module_conn(
1605 uuid=vim["_id"],
1606 name=vim["name"],
1607 tenant_id=vim.get("vim_tenant_id"),
1608 tenant_name=vim.get("vim_tenant_name"),
1609 url=vim["vim_url"],
1610 url_admin=None,
1611 user=vim["vim_user"],
1612 passwd=vim["vim_password"],
1613 config=vim.get("config") or {},
1614 persistent_info={},
1615 )
1616 else: # sdn
1617 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1618 step = "Loading plugin '{}'".format(plugin_name)
1619 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1620 step = "Loading {}'".format(target_id)
1621 wim = deepcopy(vim)
1622 wim_config = wim.pop("config", {}) or {}
1623 wim["uuid"] = wim["_id"]
1624 if "url" in wim and "wim_url" not in wim:
1625 wim["wim_url"] = wim["url"]
1626 elif "url" not in wim and "wim_url" in wim:
1627 wim["url"] = wim["wim_url"]
1628
1629 if wim.get("dpid"):
1630 wim_config["dpid"] = wim.pop("dpid")
1631
1632 if wim.get("switch_id"):
1633 wim_config["switch_id"] = wim.pop("switch_id")
1634
1635 # wim, wim_account, config
1636 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1637 self.db_vims[target_id] = vim
1638 self.error_status = None
1639
1640 self.logger.info(
1641 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1642 )
1643 except Exception as e:
1644 self.logger.error(
1645 "Cannot load {} plugin={}: {} {}".format(
1646 target_id, plugin_name, step, e
1647 )
1648 )
1649
1650 self.db_vims[target_id] = vim or {}
1651 self.db_vims[target_id] = FailingConnector(str(e))
1652 error_status = "{} Error: {}".format(step, e)
1653
1654 return error_status
1655 finally:
1656 if target_id not in self.vim_targets:
1657 self.vim_targets.append(target_id)
1658
1659 def _get_db_task(self):
1660 """
1661 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1662 :return: None
1663 """
1664 now = time.time()
1665
1666 if not self.time_last_task_processed:
1667 self.time_last_task_processed = now
1668
1669 try:
1670 while True:
1671 """
1672 # Log RO tasks only when loglevel is DEBUG
1673 if self.logger.getEffectiveLevel() == logging.DEBUG:
1674 self._log_ro_task(
1675 None,
1676 None,
1677 None,
1678 "TASK_WF",
1679 "task_locked_time="
1680 + str(self.task_locked_time)
1681 + " "
1682 + "time_last_task_processed="
1683 + str(self.time_last_task_processed)
1684 + " "
1685 + "now="
1686 + str(now),
1687 )
1688 """
1689 locked = self.db.set_one(
1690 "ro_tasks",
1691 q_filter={
1692 "target_id": self.vim_targets,
1693 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1694 "locked_at.lt": now - self.task_locked_time,
1695 "to_check_at.lt": self.time_last_task_processed,
1696 "to_check_at.gt": -1,
1697 },
1698 update_dict={"locked_by": self.my_id, "locked_at": now},
1699 fail_on_empty=False,
1700 )
1701
1702 if locked:
1703 # read and return
1704 ro_task = self.db.get_one(
1705 "ro_tasks",
1706 q_filter={
1707 "target_id": self.vim_targets,
1708 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1709 "locked_at": now,
1710 },
1711 )
1712 return ro_task
1713
1714 if self.time_last_task_processed == now:
1715 self.time_last_task_processed = None
1716 return None
1717 else:
1718 self.time_last_task_processed = now
1719 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1720
1721 except DbException as e:
1722 self.logger.error("Database exception at _get_db_task: {}".format(e))
1723 except Exception as e:
1724 self.logger.critical(
1725 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1726 )
1727
1728 return None
1729
1730 def _get_db_all_tasks(self):
1731 """
1732 Read all content of table ro_tasks to log it
1733 :return: None
1734 """
1735 try:
1736 # Checking the content of the BD:
1737
1738 # read and return
1739 ro_task = self.db.get_list("ro_tasks")
1740 for rt in ro_task:
1741 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1742 return ro_task
1743
1744 except DbException as e:
1745 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1746 except Exception as e:
1747 self.logger.critical(
1748 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1749 )
1750
1751 return None
1752
1753 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1754 """
1755 Generate a log with the following format:
1756
1757 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1758 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1759 task_array_index;task_id;task_action;task_item;task_args
1760
1761 Example:
1762
1763 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1764 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1765 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1766 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1767 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1768 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1769 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1770 """
1771 try:
1772 line = []
1773 i = 0
1774 if ro_task is not None and isinstance(ro_task, dict):
1775 for t in ro_task["tasks"]:
1776 line.clear()
1777 line.append(mark)
1778 line.append(event)
1779 line.append(ro_task.get("_id", ""))
1780 line.append(str(ro_task.get("locked_at", "")))
1781 line.append(str(ro_task.get("modified_at", "")))
1782 line.append(str(ro_task.get("created_at", "")))
1783 line.append(str(ro_task.get("to_check_at", "")))
1784 line.append(str(ro_task.get("locked_by", "")))
1785 line.append(str(ro_task.get("target_id", "")))
1786 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1787 line.append(str(ro_task.get("vim_info", "")))
1788 line.append(str(ro_task.get("tasks", "")))
1789 if isinstance(t, dict):
1790 line.append(str(t.get("status", "")))
1791 line.append(str(t.get("action_id", "")))
1792 line.append(str(i))
1793 line.append(str(t.get("task_id", "")))
1794 line.append(str(t.get("action", "")))
1795 line.append(str(t.get("item", "")))
1796 line.append(str(t.get("find_params", "")))
1797 line.append(str(t.get("params", "")))
1798 else:
1799 line.extend([""] * 2)
1800 line.append(str(i))
1801 line.extend([""] * 5)
1802
1803 i += 1
1804 self.logger.debug(";".join(line))
1805 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1806 i = 0
1807 while True:
1808 st = "tasks.{}.status".format(i)
1809 if st not in db_ro_task_update:
1810 break
1811 line.clear()
1812 line.append(mark)
1813 line.append(event)
1814 line.append(db_ro_task_update.get("_id", ""))
1815 line.append(str(db_ro_task_update.get("locked_at", "")))
1816 line.append(str(db_ro_task_update.get("modified_at", "")))
1817 line.append("")
1818 line.append(str(db_ro_task_update.get("to_check_at", "")))
1819 line.append(str(db_ro_task_update.get("locked_by", "")))
1820 line.append("")
1821 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1822 line.append("")
1823 line.append(str(db_ro_task_update.get("vim_info", "")))
1824 line.append(str(str(db_ro_task_update).count(".status")))
1825 line.append(db_ro_task_update.get(st, ""))
1826 line.append("")
1827 line.append(str(i))
1828 line.extend([""] * 3)
1829 i += 1
1830 self.logger.debug(";".join(line))
1831
1832 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1833 line.clear()
1834 line.append(mark)
1835 line.append(event)
1836 line.append(db_ro_task_delete.get("_id", ""))
1837 line.append("")
1838 line.append(db_ro_task_delete.get("modified_at", ""))
1839 line.extend([""] * 13)
1840 self.logger.debug(";".join(line))
1841
1842 else:
1843 line.clear()
1844 line.append(mark)
1845 line.append(event)
1846 line.extend([""] * 16)
1847 self.logger.debug(";".join(line))
1848
1849 except Exception as e:
1850 self.logger.error("Error logging ro_task: {}".format(e))
1851
1852 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1853 """
1854 Determine if this task need to be done or superseded
1855 :return: None
1856 """
1857 my_task = ro_task["tasks"][task_index]
1858 task_id = my_task["task_id"]
1859 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1860 "created_items", False
1861 )
1862
1863 if my_task["status"] == "FAILED":
1864 return None, None # TODO need to be retry??
1865
1866 try:
1867 for index, task in enumerate(ro_task["tasks"]):
1868 if index == task_index or not task:
1869 continue # own task
1870
1871 if (
1872 my_task["target_record"] == task["target_record"]
1873 and task["action"] == "CREATE"
1874 ):
1875 # set to finished
1876 db_update["tasks.{}.status".format(index)] = task[
1877 "status"
1878 ] = "FINISHED"
1879 elif task["action"] == "CREATE" and task["status"] not in (
1880 "FINISHED",
1881 "SUPERSEDED",
1882 ):
1883 needed_delete = False
1884
1885 if needed_delete:
1886 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1887 else:
1888 return "SUPERSEDED", None
1889 except Exception as e:
1890 if not isinstance(e, NsWorkerException):
1891 self.logger.critical(
1892 "Unexpected exception at _delete_task task={}: {}".format(
1893 task_id, e
1894 ),
1895 exc_info=True,
1896 )
1897
1898 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1899
1900 def _create_task(self, ro_task, task_index, task_depends, db_update):
1901 """
1902 Determine if this task need to create something at VIM
1903 :return: None
1904 """
1905 my_task = ro_task["tasks"][task_index]
1906 task_id = my_task["task_id"]
1907 task_status = None
1908
1909 if my_task["status"] == "FAILED":
1910 return None, None # TODO need to be retry??
1911 elif my_task["status"] == "SCHEDULED":
1912 # check if already created by another task
1913 for index, task in enumerate(ro_task["tasks"]):
1914 if index == task_index or not task:
1915 continue # own task
1916
1917 if task["action"] == "CREATE" and task["status"] not in (
1918 "SCHEDULED",
1919 "FINISHED",
1920 "SUPERSEDED",
1921 ):
1922 return task["status"], "COPY_VIM_INFO"
1923
1924 try:
1925 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1926 ro_task, task_index, task_depends
1927 )
1928 # TODO update other CREATE tasks
1929 except Exception as e:
1930 if not isinstance(e, NsWorkerException):
1931 self.logger.error(
1932 "Error executing task={}: {}".format(task_id, e), exc_info=True
1933 )
1934
1935 task_status = "FAILED"
1936 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1937 # TODO update ro_vim_item_update
1938
1939 return task_status, ro_vim_item_update
1940 else:
1941 return None, None
1942
1943 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1944 """
1945 Look for dependency task
1946 :param task_id: Can be one of
1947 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1948 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1949 3. task.task_id: "<action_id>:number"
1950 :param ro_task:
1951 :param target_id:
1952 :return: database ro_task plus index of task
1953 """
1954 if (
1955 task_id.startswith("vim:")
1956 or task_id.startswith("sdn:")
1957 or task_id.startswith("wim:")
1958 ):
1959 target_id, _, task_id = task_id.partition(" ")
1960
1961 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1962 ro_task_dependency = self.db.get_one(
1963 "ro_tasks",
1964 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1965 fail_on_empty=False,
1966 )
1967
1968 if ro_task_dependency:
1969 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1970 if task["target_record_id"] == task_id:
1971 return ro_task_dependency, task_index
1972
1973 else:
1974 if ro_task:
1975 for task_index, task in enumerate(ro_task["tasks"]):
1976 if task and task["task_id"] == task_id:
1977 return ro_task, task_index
1978
1979 ro_task_dependency = self.db.get_one(
1980 "ro_tasks",
1981 q_filter={
1982 "tasks.ANYINDEX.task_id": task_id,
1983 "tasks.ANYINDEX.target_record.ne": None,
1984 },
1985 fail_on_empty=False,
1986 )
1987
1988 if ro_task_dependency:
1989 for task_index, task in ro_task_dependency["tasks"]:
1990 if task["task_id"] == task_id:
1991 return ro_task_dependency, task_index
1992 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1993
1994 def update_vm_refresh(self, ro_task):
1995 """Enables the VM status updates if self.refresh_config.active parameter
1996 is not -1 and then updates the DB accordingly
1997
1998 """
1999 try:
2000 self.logger.debug("Checking if VM status update config")
2001 next_refresh = time.time()
2002 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2003
2004 if next_refresh != -1:
2005 db_ro_task_update = {}
2006 now = time.time()
2007 next_check_at = now + (24 * 60 * 60)
2008 next_check_at = min(next_check_at, next_refresh)
2009 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2010 db_ro_task_update["to_check_at"] = next_check_at
2011
2012 self.logger.debug(
2013 "Finding tasks which to be updated to enable VM status updates"
2014 )
2015 refresh_tasks = self.db.get_list(
2016 "ro_tasks",
2017 q_filter={
2018 "tasks.status": "DONE",
2019 "to_check_at.lt": 0,
2020 },
2021 )
2022 self.logger.debug("Updating tasks to change the to_check_at status")
2023 for task in refresh_tasks:
2024 q_filter = {
2025 "_id": task["_id"],
2026 }
2027 self.db.set_one(
2028 "ro_tasks",
2029 q_filter=q_filter,
2030 update_dict=db_ro_task_update,
2031 fail_on_empty=True,
2032 )
2033
2034 except Exception as e:
2035 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2036
2037 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2038 """Decide the next_refresh according to vim type and refresh config period.
2039 Args:
2040 ro_task (dict): ro_task details
2041 next_refresh (float): next refresh time as epoch format
2042
2043 Returns:
2044 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2045 """
2046 target_vim = ro_task["target_id"]
2047 vim_type = self.db_vims[target_vim]["vim_type"]
2048 if self.refresh_config.active == -1 or vim_type == "openstack":
2049 next_refresh = -1
2050 else:
2051 next_refresh += self.refresh_config.active
2052 return next_refresh
2053
2054 def _process_pending_tasks(self, ro_task):
2055 ro_task_id = ro_task["_id"]
2056 now = time.time()
2057 # one day
2058 next_check_at = now + (24 * 60 * 60)
2059 db_ro_task_update = {}
2060
2061 def _update_refresh(new_status):
2062 # compute next_refresh
2063 nonlocal task
2064 nonlocal next_check_at
2065 nonlocal db_ro_task_update
2066 nonlocal ro_task
2067
2068 next_refresh = time.time()
2069
2070 if task["item"] in ("image", "flavor"):
2071 next_refresh += self.refresh_config.image
2072 elif new_status == "BUILD":
2073 next_refresh += self.refresh_config.build
2074 elif new_status == "DONE":
2075 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2076 else:
2077 next_refresh += self.refresh_config.error
2078
2079 next_check_at = min(next_check_at, next_refresh)
2080 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2081 ro_task["vim_info"]["refresh_at"] = next_refresh
2082
2083 try:
2084 """
2085 # Log RO tasks only when loglevel is DEBUG
2086 if self.logger.getEffectiveLevel() == logging.DEBUG:
2087 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2088 """
2089 # Check if vim status refresh is enabled again
2090 self.update_vm_refresh(ro_task)
2091 # 0: get task_status_create
2092 lock_object = None
2093 task_status_create = None
2094 task_create = next(
2095 (
2096 t
2097 for t in ro_task["tasks"]
2098 if t
2099 and t["action"] == "CREATE"
2100 and t["status"] in ("BUILD", "DONE")
2101 ),
2102 None,
2103 )
2104
2105 if task_create:
2106 task_status_create = task_create["status"]
2107
2108 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2109 for task_action in ("DELETE", "CREATE", "EXEC"):
2110 db_vim_update = None
2111 new_status = None
2112
2113 for task_index, task in enumerate(ro_task["tasks"]):
2114 if not task:
2115 continue # task deleted
2116
2117 task_depends = {}
2118 target_update = None
2119
2120 if (
2121 (
2122 task_action in ("DELETE", "EXEC")
2123 and task["status"] not in ("SCHEDULED", "BUILD")
2124 )
2125 or task["action"] != task_action
2126 or (
2127 task_action == "CREATE"
2128 and task["status"] in ("FINISHED", "SUPERSEDED")
2129 )
2130 ):
2131 continue
2132
2133 task_path = "tasks.{}.status".format(task_index)
2134 try:
2135 db_vim_info_update = None
2136
2137 if task["status"] == "SCHEDULED":
2138 # check if tasks that this depends on have been completed
2139 dependency_not_completed = False
2140
2141 for dependency_task_id in task.get("depends_on") or ():
2142 (
2143 dependency_ro_task,
2144 dependency_task_index,
2145 ) = self._get_dependency(
2146 dependency_task_id, target_id=ro_task["target_id"]
2147 )
2148 dependency_task = dependency_ro_task["tasks"][
2149 dependency_task_index
2150 ]
2151
2152 if dependency_task["status"] == "SCHEDULED":
2153 dependency_not_completed = True
2154 next_check_at = min(
2155 next_check_at, dependency_ro_task["to_check_at"]
2156 )
2157 # must allow dependent task to be processed first
2158 # to do this set time after last_task_processed
2159 next_check_at = max(
2160 self.time_last_task_processed, next_check_at
2161 )
2162 break
2163 elif dependency_task["status"] == "FAILED":
2164 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2165 task["action"],
2166 task["item"],
2167 dependency_task["action"],
2168 dependency_task["item"],
2169 dependency_task_id,
2170 dependency_ro_task["vim_info"].get(
2171 "vim_details"
2172 ),
2173 )
2174 self.logger.error(
2175 "task={} {}".format(task["task_id"], error_text)
2176 )
2177 raise NsWorkerException(error_text)
2178
2179 task_depends[dependency_task_id] = dependency_ro_task[
2180 "vim_info"
2181 ]["vim_id"]
2182 task_depends[
2183 "TASK-{}".format(dependency_task_id)
2184 ] = dependency_ro_task["vim_info"]["vim_id"]
2185
2186 if dependency_not_completed:
2187 # TODO set at vim_info.vim_details that it is waiting
2188 continue
2189
2190 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2191 # the task of renew this locking. It will update database locket_at periodically
2192 if not lock_object:
2193 lock_object = LockRenew.add_lock_object(
2194 "ro_tasks", ro_task, self
2195 )
2196
2197 if task["action"] == "DELETE":
2198 (
2199 new_status,
2200 db_vim_info_update,
2201 ) = self._delete_task(
2202 ro_task, task_index, task_depends, db_ro_task_update
2203 )
2204 new_status = (
2205 "FINISHED" if new_status == "DONE" else new_status
2206 )
2207 # ^with FINISHED instead of DONE it will not be refreshing
2208
2209 if new_status in ("FINISHED", "SUPERSEDED"):
2210 target_update = "DELETE"
2211 elif task["action"] == "EXEC":
2212 (
2213 new_status,
2214 db_vim_info_update,
2215 db_task_update,
2216 ) = self.item2class[task["item"]].exec(
2217 ro_task, task_index, task_depends
2218 )
2219 new_status = (
2220 "FINISHED" if new_status == "DONE" else new_status
2221 )
2222 # ^with FINISHED instead of DONE it will not be refreshing
2223
2224 if db_task_update:
2225 # load into database the modified db_task_update "retries" and "next_retry"
2226 if db_task_update.get("retries"):
2227 db_ro_task_update[
2228 "tasks.{}.retries".format(task_index)
2229 ] = db_task_update["retries"]
2230
2231 next_check_at = time.time() + db_task_update.get(
2232 "next_retry", 60
2233 )
2234 target_update = None
2235 elif task["action"] == "CREATE":
2236 if task["status"] == "SCHEDULED":
2237 if task_status_create:
2238 new_status = task_status_create
2239 target_update = "COPY_VIM_INFO"
2240 else:
2241 new_status, db_vim_info_update = self.item2class[
2242 task["item"]
2243 ].new(ro_task, task_index, task_depends)
2244 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2245 _update_refresh(new_status)
2246 else:
2247 refresh_at = ro_task["vim_info"]["refresh_at"]
2248 if refresh_at and refresh_at != -1 and now > refresh_at:
2249 new_status, db_vim_info_update = self.item2class[
2250 task["item"]
2251 ].refresh(ro_task)
2252 _update_refresh(new_status)
2253 else:
2254 # The refresh is updated to avoid set the value of "refresh_at" to
2255 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2256 # because it can happen that in this case the task is never processed
2257 _update_refresh(task["status"])
2258
2259 except Exception as e:
2260 new_status = "FAILED"
2261 db_vim_info_update = {
2262 "vim_status": "VIM_ERROR",
2263 "vim_details": str(e),
2264 }
2265
2266 if not isinstance(
2267 e, (NsWorkerException, vimconn.VimConnException)
2268 ):
2269 self.logger.error(
2270 "Unexpected exception at _delete_task task={}: {}".format(
2271 task["task_id"], e
2272 ),
2273 exc_info=True,
2274 )
2275
2276 try:
2277 if db_vim_info_update:
2278 db_vim_update = db_vim_info_update.copy()
2279 db_ro_task_update.update(
2280 {
2281 "vim_info." + k: v
2282 for k, v in db_vim_info_update.items()
2283 }
2284 )
2285 ro_task["vim_info"].update(db_vim_info_update)
2286
2287 if new_status:
2288 if task_action == "CREATE":
2289 task_status_create = new_status
2290 db_ro_task_update[task_path] = new_status
2291
2292 if target_update or db_vim_update:
2293 if target_update == "DELETE":
2294 self._update_target(task, None)
2295 elif target_update == "COPY_VIM_INFO":
2296 self._update_target(task, ro_task["vim_info"])
2297 else:
2298 self._update_target(task, db_vim_update)
2299
2300 except Exception as e:
2301 if (
2302 isinstance(e, DbException)
2303 and e.http_code == HTTPStatus.NOT_FOUND
2304 ):
2305 # if the vnfrs or nsrs has been removed from database, this task must be removed
2306 self.logger.debug(
2307 "marking to delete task={}".format(task["task_id"])
2308 )
2309 self.tasks_to_delete.append(task)
2310 else:
2311 self.logger.error(
2312 "Unexpected exception at _update_target task={}: {}".format(
2313 task["task_id"], e
2314 ),
2315 exc_info=True,
2316 )
2317
2318 locked_at = ro_task["locked_at"]
2319
2320 if lock_object:
2321 locked_at = [
2322 lock_object["locked_at"],
2323 lock_object["locked_at"] + self.task_locked_time,
2324 ]
2325 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2326 # contain exactly locked_at + self.task_locked_time
2327 LockRenew.remove_lock_object(lock_object)
2328
2329 q_filter = {
2330 "_id": ro_task["_id"],
2331 "to_check_at": ro_task["to_check_at"],
2332 "locked_at": locked_at,
2333 }
2334 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2335 # outside this task (by ro_nbi) do not update it
2336 db_ro_task_update["locked_by"] = None
2337 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2338 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2339 db_ro_task_update["modified_at"] = now
2340 db_ro_task_update["to_check_at"] = next_check_at
2341
2342 """
2343 # Log RO tasks only when loglevel is DEBUG
2344 if self.logger.getEffectiveLevel() == logging.DEBUG:
2345 db_ro_task_update_log = db_ro_task_update.copy()
2346 db_ro_task_update_log["_id"] = q_filter["_id"]
2347 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2348 """
2349
2350 if not self.db.set_one(
2351 "ro_tasks",
2352 update_dict=db_ro_task_update,
2353 q_filter=q_filter,
2354 fail_on_empty=False,
2355 ):
2356 del db_ro_task_update["to_check_at"]
2357 del q_filter["to_check_at"]
2358 """
2359 # Log RO tasks only when loglevel is DEBUG
2360 if self.logger.getEffectiveLevel() == logging.DEBUG:
2361 self._log_ro_task(
2362 None,
2363 db_ro_task_update_log,
2364 None,
2365 "TASK_WF",
2366 "SET_TASK " + str(q_filter),
2367 )
2368 """
2369 self.db.set_one(
2370 "ro_tasks",
2371 q_filter=q_filter,
2372 update_dict=db_ro_task_update,
2373 fail_on_empty=True,
2374 )
2375 except DbException as e:
2376 self.logger.error(
2377 "ro_task={} Error updating database {}".format(ro_task_id, e)
2378 )
2379 except Exception as e:
2380 self.logger.error(
2381 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2382 )
2383
2384 def _update_target(self, task, ro_vim_item_update):
2385 table, _, temp = task["target_record"].partition(":")
2386 _id, _, path_vim_status = temp.partition(":")
2387 path_item = path_vim_status[: path_vim_status.rfind(".")]
2388 path_item = path_item[: path_item.rfind(".")]
2389 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2390 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2391
2392 if ro_vim_item_update:
2393 update_dict = {
2394 path_vim_status + "." + k: v
2395 for k, v in ro_vim_item_update.items()
2396 if k
2397 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2398 }
2399
2400 if path_vim_status.startswith("vdur."):
2401 # for backward compatibility, add vdur.name apart from vdur.vim_name
2402 if ro_vim_item_update.get("vim_name"):
2403 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2404
2405 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2406 if ro_vim_item_update.get("vim_id"):
2407 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2408
2409 # update general status
2410 if ro_vim_item_update.get("vim_status"):
2411 update_dict[path_item + ".status"] = ro_vim_item_update[
2412 "vim_status"
2413 ]
2414
2415 if ro_vim_item_update.get("interfaces"):
2416 path_interfaces = path_item + ".interfaces"
2417
2418 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2419 if iface:
2420 update_dict.update(
2421 {
2422 path_interfaces + ".{}.".format(i) + k: v
2423 for k, v in iface.items()
2424 if k in ("vlan", "compute_node", "pci")
2425 }
2426 )
2427
2428 # put ip_address and mac_address with ip-address and mac-address
2429 if iface.get("ip_address"):
2430 update_dict[
2431 path_interfaces + ".{}.".format(i) + "ip-address"
2432 ] = iface["ip_address"]
2433
2434 if iface.get("mac_address"):
2435 update_dict[
2436 path_interfaces + ".{}.".format(i) + "mac-address"
2437 ] = iface["mac_address"]
2438
2439 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2440 update_dict["ip-address"] = iface.get("ip_address").split(
2441 ";"
2442 )[0]
2443
2444 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2445 update_dict[path_item + ".ip-address"] = iface.get(
2446 "ip_address"
2447 ).split(";")[0]
2448
2449 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2450 else:
2451 update_dict = {path_item + ".status": "DELETED"}
2452 self.db.set_one(
2453 table,
2454 q_filter={"_id": _id},
2455 update_dict=update_dict,
2456 unset={path_vim_status: None},
2457 )
2458
2459 def _process_delete_db_tasks(self):
2460 """
2461 Delete task from database because vnfrs or nsrs or both have been deleted
2462 :return: None. Uses and modify self.tasks_to_delete
2463 """
2464 while self.tasks_to_delete:
2465 task = self.tasks_to_delete[0]
2466 vnfrs_deleted = None
2467 nsr_id = task["nsr_id"]
2468
2469 if task["target_record"].startswith("vnfrs:"):
2470 # check if nsrs is present
2471 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2472 vnfrs_deleted = task["target_record"].split(":")[1]
2473
2474 try:
2475 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2476 except Exception as e:
2477 self.logger.error(
2478 "Error deleting task={}: {}".format(task["task_id"], e)
2479 )
2480 self.tasks_to_delete.pop(0)
2481
2482 @staticmethod
2483 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2484 """
2485 Static method because it is called from osm_ng_ro.ns
2486 :param db: instance of database to use
2487 :param nsr_id: affected nsrs id
2488 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2489 :return: None, exception is fails
2490 """
2491 retries = 5
2492 for retry in range(retries):
2493 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2494 now = time.time()
2495 conflict = False
2496
2497 for ro_task in ro_tasks:
2498 db_update = {}
2499 to_delete_ro_task = True
2500
2501 for index, task in enumerate(ro_task["tasks"]):
2502 if not task:
2503 pass
2504 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2505 vnfrs_deleted
2506 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2507 ):
2508 db_update["tasks.{}".format(index)] = None
2509 else:
2510 # used by other nsr, ro_task cannot be deleted
2511 to_delete_ro_task = False
2512
2513 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2514 if to_delete_ro_task:
2515 if not db.del_one(
2516 "ro_tasks",
2517 q_filter={
2518 "_id": ro_task["_id"],
2519 "modified_at": ro_task["modified_at"],
2520 },
2521 fail_on_empty=False,
2522 ):
2523 conflict = True
2524 elif db_update:
2525 db_update["modified_at"] = now
2526 if not db.set_one(
2527 "ro_tasks",
2528 q_filter={
2529 "_id": ro_task["_id"],
2530 "modified_at": ro_task["modified_at"],
2531 },
2532 update_dict=db_update,
2533 fail_on_empty=False,
2534 ):
2535 conflict = True
2536 if not conflict:
2537 return
2538 else:
2539 raise NsWorkerException("Exceeded {} retries".format(retries))
2540
2541 def run(self):
2542 # load database
2543 self.logger.info("Starting")
2544 while True:
2545 # step 1: get commands from queue
2546 try:
2547 if self.vim_targets:
2548 task = self.task_queue.get(block=False)
2549 else:
2550 if not self.idle:
2551 self.logger.debug("enters in idle state")
2552 self.idle = True
2553 task = self.task_queue.get(block=True)
2554 self.idle = False
2555
2556 if task[0] == "terminate":
2557 break
2558 elif task[0] == "load_vim":
2559 self.logger.info("order to load vim {}".format(task[1]))
2560 self._load_vim(task[1])
2561 elif task[0] == "unload_vim":
2562 self.logger.info("order to unload vim {}".format(task[1]))
2563 self._unload_vim(task[1])
2564 elif task[0] == "reload_vim":
2565 self._reload_vim(task[1])
2566 elif task[0] == "check_vim":
2567 self.logger.info("order to check vim {}".format(task[1]))
2568 self._check_vim(task[1])
2569 continue
2570 except Exception as e:
2571 if isinstance(e, queue.Empty):
2572 pass
2573 else:
2574 self.logger.critical(
2575 "Error processing task: {}".format(e), exc_info=True
2576 )
2577
2578 # step 2: process pending_tasks, delete not needed tasks
2579 try:
2580 if self.tasks_to_delete:
2581 self._process_delete_db_tasks()
2582 busy = False
2583 """
2584 # Log RO tasks only when loglevel is DEBUG
2585 if self.logger.getEffectiveLevel() == logging.DEBUG:
2586 _ = self._get_db_all_tasks()
2587 """
2588 ro_task = self._get_db_task()
2589 if ro_task:
2590 self._process_pending_tasks(ro_task)
2591 busy = True
2592 if not busy:
2593 time.sleep(5)
2594 except Exception as e:
2595 self.logger.critical(
2596 "Unexpected exception at run: " + str(e), exc_info=True
2597 )
2598
2599 self.logger.info("Finishing")