Logging traceback for key injection error in robot tests
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from unittest.mock import Mock
37
38 from importlib_metadata import entry_points
39 from osm_common.dbbase import DbException
40 from osm_ng_ro.vim_admin import LockRenew
41 from osm_ro_plugin import sdnconn, vimconn
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin.vim_dummy import VimDummyConnector
44 import yaml
45
46
47 __author__ = "Alfonso Tierno"
48 __date__ = "$28-Sep-2017 12:07:15$"
49
50
51 def deep_get(target_dict, *args, **kwargs):
52 """
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
59 """
60 for key in args:
61 if not isinstance(target_dict, dict) or key not in target_dict:
62 return kwargs.get("default")
63 target_dict = target_dict[key]
64 return target_dict
65
66
67 class NsWorkerException(Exception):
68 pass
69
70
71 class FailingConnector:
72 def __init__(self, error_msg):
73 self.error_msg = error_msg
74
75 for method in dir(vimconn.VimConnector):
76 if method[0] != "_":
77 setattr(
78 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
79 )
80
81 for method in dir(sdnconn.SdnConnectorBase):
82 if method[0] != "_":
83 setattr(
84 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
85 )
86
87
88 class NsWorkerExceptionNotFound(NsWorkerException):
89 pass
90
91
92 class VimInteractionBase:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
95
96 def __init__(self, db, my_vims, db_vims, logger):
97 self.db = db
98 self.logger = logger
99 self.my_vims = my_vims
100 self.db_vims = db_vims
101
102 def new(self, ro_task, task_index, task_depends):
103 return "BUILD", {}
104
105 def refresh(self, ro_task):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
108 return "FAILED", {}
109
110 return "DONE", {}
111
112 def delete(self, ro_task, task_index):
113 """skip calling VIM to delete image. Assumes ok"""
114 return "DONE", {}
115
116 def exec(self, ro_task, task_index, task_depends):
117 return "DONE", None, None
118
119
120 class VimInteractionNet(VimInteractionBase):
121 def new(self, ro_task, task_index, task_depends):
122 vim_net_id = None
123 task = ro_task["tasks"][task_index]
124 task_id = task["task_id"]
125 created = False
126 created_items = {}
127 target_vim = self.my_vims[ro_task["target_id"]]
128 mgmtnet = False
129 mgmtnet_defined_in_vim = False
130
131 try:
132 # FIND
133 if task.get("find_params"):
134 # if management, get configuration of VIM
135 if task["find_params"].get("filter_dict"):
136 vim_filter = task["find_params"]["filter_dict"]
137 # management network
138 elif task["find_params"].get("mgmt"):
139 mgmtnet = True
140 if deep_get(
141 self.db_vims[ro_task["target_id"]],
142 "config",
143 "management_network_id",
144 ):
145 mgmtnet_defined_in_vim = True
146 vim_filter = {
147 "id": self.db_vims[ro_task["target_id"]]["config"][
148 "management_network_id"
149 ]
150 }
151 elif deep_get(
152 self.db_vims[ro_task["target_id"]],
153 "config",
154 "management_network_name",
155 ):
156 mgmtnet_defined_in_vim = True
157 vim_filter = {
158 "name": self.db_vims[ro_task["target_id"]]["config"][
159 "management_network_name"
160 ]
161 }
162 else:
163 vim_filter = {"name": task["find_params"]["name"]}
164 else:
165 raise NsWorkerExceptionNotFound(
166 "Invalid find_params for new_net {}".format(task["find_params"])
167 )
168
169 vim_nets = target_vim.get_network_list(vim_filter)
170 if not vim_nets and not task.get("params"):
171 # If there is mgmt-network in the descriptor,
172 # there is no mapping of that network to a VIM network in the descriptor,
173 # also there is no mapping in the "--config" parameter or at VIM creation;
174 # that mgmt-network will be created.
175 if mgmtnet and not mgmtnet_defined_in_vim:
176 net_name = (
177 vim_filter.get("name")
178 if vim_filter.get("name")
179 else vim_filter.get("id")[:16]
180 )
181 vim_net_id, created_items = target_vim.new_network(
182 net_name, None
183 )
184 self.logger.debug(
185 "Created mgmt network vim_net_id: {}".format(vim_net_id)
186 )
187 created = True
188 else:
189 raise NsWorkerExceptionNotFound(
190 "Network not found with this criteria: '{}'".format(
191 task.get("find_params")
192 )
193 )
194 elif len(vim_nets) > 1:
195 raise NsWorkerException(
196 "More than one network found with this criteria: '{}'".format(
197 task["find_params"]
198 )
199 )
200
201 if vim_nets:
202 vim_net_id = vim_nets[0]["id"]
203 else:
204 # CREATE
205 params = task["params"]
206 vim_net_id, created_items = target_vim.new_network(**params)
207 created = True
208
209 ro_vim_item_update = {
210 "vim_id": vim_net_id,
211 "vim_status": "BUILD",
212 "created": created,
213 "created_items": created_items,
214 "vim_details": None,
215 }
216 self.logger.debug(
217 "task={} {} new-net={} created={}".format(
218 task_id, ro_task["target_id"], vim_net_id, created
219 )
220 )
221
222 return "BUILD", ro_vim_item_update
223 except (vimconn.VimConnException, NsWorkerException) as e:
224 self.logger.error(
225 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
226 )
227 ro_vim_item_update = {
228 "vim_status": "VIM_ERROR",
229 "created": created,
230 "vim_details": str(e),
231 }
232
233 return "FAILED", ro_vim_item_update
234
235 def refresh(self, ro_task):
236 """Call VIM to get network status"""
237 ro_task_id = ro_task["_id"]
238 target_vim = self.my_vims[ro_task["target_id"]]
239 vim_id = ro_task["vim_info"]["vim_id"]
240 net_to_refresh_list = [vim_id]
241
242 try:
243 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
244 vim_info = vim_dict[vim_id]
245
246 if vim_info["status"] == "ACTIVE":
247 task_status = "DONE"
248 elif vim_info["status"] == "BUILD":
249 task_status = "BUILD"
250 else:
251 task_status = "FAILED"
252 except vimconn.VimConnException as e:
253 # Mark all tasks at VIM_ERROR status
254 self.logger.error(
255 "ro_task={} vim={} get-net={}: {}".format(
256 ro_task_id, ro_task["target_id"], vim_id, e
257 )
258 )
259 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
260 task_status = "FAILED"
261
262 ro_vim_item_update = {}
263 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
264 ro_vim_item_update["vim_status"] = vim_info["status"]
265
266 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
267 ro_vim_item_update["vim_name"] = vim_info.get("name")
268
269 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
270 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
271 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
272 elif vim_info["status"] == "DELETED":
273 ro_vim_item_update["vim_id"] = None
274 ro_vim_item_update["vim_details"] = "Deleted externally"
275 else:
276 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
277 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
278
279 if ro_vim_item_update:
280 self.logger.debug(
281 "ro_task={} {} get-net={}: status={} {}".format(
282 ro_task_id,
283 ro_task["target_id"],
284 vim_id,
285 ro_vim_item_update.get("vim_status"),
286 ro_vim_item_update.get("vim_details")
287 if ro_vim_item_update.get("vim_status") != "ACTIVE"
288 else "",
289 )
290 )
291
292 return task_status, ro_vim_item_update
293
294 def delete(self, ro_task, task_index):
295 task = ro_task["tasks"][task_index]
296 task_id = task["task_id"]
297 net_vim_id = ro_task["vim_info"]["vim_id"]
298 ro_vim_item_update_ok = {
299 "vim_status": "DELETED",
300 "created": False,
301 "vim_details": "DELETED",
302 "vim_id": None,
303 }
304
305 try:
306 if net_vim_id or ro_task["vim_info"]["created_items"]:
307 target_vim = self.my_vims[ro_task["target_id"]]
308 target_vim.delete_network(
309 net_vim_id, ro_task["vim_info"]["created_items"]
310 )
311 except vimconn.VimConnNotFoundException:
312 ro_vim_item_update_ok["vim_details"] = "already deleted"
313 except vimconn.VimConnException as e:
314 self.logger.error(
315 "ro_task={} vim={} del-net={}: {}".format(
316 ro_task["_id"], ro_task["target_id"], net_vim_id, e
317 )
318 )
319 ro_vim_item_update = {
320 "vim_status": "VIM_ERROR",
321 "vim_details": "Error while deleting: {}".format(e),
322 }
323
324 return "FAILED", ro_vim_item_update
325
326 self.logger.debug(
327 "task={} {} del-net={} {}".format(
328 task_id,
329 ro_task["target_id"],
330 net_vim_id,
331 ro_vim_item_update_ok.get("vim_details", ""),
332 )
333 )
334
335 return "DONE", ro_vim_item_update_ok
336
337
338 class VimInteractionVdu(VimInteractionBase):
339 max_retries_inject_ssh_key = 20 # 20 times
340 time_retries_inject_ssh_key = 30 # wevery 30 seconds
341
342 def new(self, ro_task, task_index, task_depends):
343 task = ro_task["tasks"][task_index]
344 task_id = task["task_id"]
345 created = False
346 created_items = {}
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390
391 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
392 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393
394 ro_vim_item_update = {
395 "vim_id": vim_vm_id,
396 "vim_status": "BUILD",
397 "created": created,
398 "created_items": created_items,
399 "vim_details": None,
400 "interfaces_vim_ids": interfaces,
401 "interfaces": [],
402 }
403 self.logger.debug(
404 "task={} {} new-vm={} created={}".format(
405 task_id, ro_task["target_id"], vim_vm_id, created
406 )
407 )
408
409 return "BUILD", ro_vim_item_update
410 except (vimconn.VimConnException, NsWorkerException) as e:
411 self.logger.error(
412 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
413 )
414 ro_vim_item_update = {
415 "vim_status": "VIM_ERROR",
416 "created": created,
417 "vim_details": str(e),
418 }
419
420 return "FAILED", ro_vim_item_update
421
422 def delete(self, ro_task, task_index):
423 task = ro_task["tasks"][task_index]
424 task_id = task["task_id"]
425 vm_vim_id = ro_task["vim_info"]["vim_id"]
426 ro_vim_item_update_ok = {
427 "vim_status": "DELETED",
428 "created": False,
429 "vim_details": "DELETED",
430 "vim_id": None,
431 }
432
433 try:
434 if vm_vim_id or ro_task["vim_info"]["created_items"]:
435 target_vim = self.my_vims[ro_task["target_id"]]
436 target_vim.delete_vminstance(
437 vm_vim_id, ro_task["vim_info"]["created_items"]
438 )
439 except vimconn.VimConnNotFoundException:
440 ro_vim_item_update_ok["vim_details"] = "already deleted"
441 except vimconn.VimConnException as e:
442 self.logger.error(
443 "ro_task={} vim={} del-vm={}: {}".format(
444 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
445 )
446 )
447 ro_vim_item_update = {
448 "vim_status": "VIM_ERROR",
449 "vim_details": "Error while deleting: {}".format(e),
450 }
451
452 return "FAILED", ro_vim_item_update
453
454 self.logger.debug(
455 "task={} {} del-vm={} {}".format(
456 task_id,
457 ro_task["target_id"],
458 vm_vim_id,
459 ro_vim_item_update_ok.get("vim_details", ""),
460 )
461 )
462
463 return "DONE", ro_vim_item_update_ok
464
465 def refresh(self, ro_task):
466 """Call VIM to get vm status"""
467 ro_task_id = ro_task["_id"]
468 target_vim = self.my_vims[ro_task["target_id"]]
469 vim_id = ro_task["vim_info"]["vim_id"]
470
471 if not vim_id:
472 return None, None
473
474 vm_to_refresh_list = [vim_id]
475 try:
476 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
477 vim_info = vim_dict[vim_id]
478
479 if vim_info["status"] == "ACTIVE":
480 task_status = "DONE"
481 elif vim_info["status"] == "BUILD":
482 task_status = "BUILD"
483 else:
484 task_status = "FAILED"
485
486 # try to load and parse vim_information
487 try:
488 vim_info_info = yaml.safe_load(vim_info["vim_info"])
489 if vim_info_info.get("name"):
490 vim_info["name"] = vim_info_info["name"]
491 except Exception:
492 pass
493 except vimconn.VimConnException as e:
494 # Mark all tasks at VIM_ERROR status
495 self.logger.error(
496 "ro_task={} vim={} get-vm={}: {}".format(
497 ro_task_id, ro_task["target_id"], vim_id, e
498 )
499 )
500 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
501 task_status = "FAILED"
502
503 ro_vim_item_update = {}
504
505 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
506 vim_interfaces = []
507 if vim_info.get("interfaces"):
508 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
509 iface = next(
510 (
511 iface
512 for iface in vim_info["interfaces"]
513 if vim_iface_id == iface["vim_interface_id"]
514 ),
515 None,
516 )
517 # if iface:
518 # iface.pop("vim_info", None)
519 vim_interfaces.append(iface)
520
521 task_create = next(
522 t
523 for t in ro_task["tasks"]
524 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
525 )
526 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
527 vim_interfaces[task_create["mgmt_vnf_interface"]][
528 "mgmt_vnf_interface"
529 ] = True
530
531 mgmt_vdu_iface = task_create.get(
532 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
533 )
534 if vim_interfaces:
535 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
536
537 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
538 ro_vim_item_update["interfaces"] = vim_interfaces
539
540 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
541 ro_vim_item_update["vim_status"] = vim_info["status"]
542
543 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
544 ro_vim_item_update["vim_name"] = vim_info.get("name")
545
546 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
547 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
548 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
549 elif vim_info["status"] == "DELETED":
550 ro_vim_item_update["vim_id"] = None
551 ro_vim_item_update["vim_details"] = "Deleted externally"
552 else:
553 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
554 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
555
556 if ro_vim_item_update:
557 self.logger.debug(
558 "ro_task={} {} get-vm={}: status={} {}".format(
559 ro_task_id,
560 ro_task["target_id"],
561 vim_id,
562 ro_vim_item_update.get("vim_status"),
563 ro_vim_item_update.get("vim_details")
564 if ro_vim_item_update.get("vim_status") != "ACTIVE"
565 else "",
566 )
567 )
568
569 return task_status, ro_vim_item_update
570
571 def exec(self, ro_task, task_index, task_depends):
572 task = ro_task["tasks"][task_index]
573 task_id = task["task_id"]
574 target_vim = self.my_vims[ro_task["target_id"]]
575 db_task_update = {"retries": 0}
576 retries = task.get("retries", 0)
577
578 try:
579 params = task["params"]
580 params_copy = deepcopy(params)
581 params_copy["ro_key"] = self.db.decrypt(
582 params_copy.pop("private_key"),
583 params_copy.pop("schema_version"),
584 params_copy.pop("salt"),
585 )
586 params_copy["ip_addr"] = params_copy.pop("ip_address")
587 target_vim.inject_user_key(**params_copy)
588 self.logger.debug(
589 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
590 )
591
592 return (
593 "DONE",
594 None,
595 db_task_update,
596 ) # params_copy["key"]
597 except (vimconn.VimConnException, NsWorkerException) as e:
598 retries += 1
599
600 self.logger.debug(traceback.format_exc())
601 if retries < self.max_retries_inject_ssh_key:
602 return (
603 "BUILD",
604 None,
605 {
606 "retries": retries,
607 "next_retry": self.time_retries_inject_ssh_key,
608 },
609 )
610
611 self.logger.error(
612 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
613 )
614 ro_vim_item_update = {"vim_details": str(e)}
615
616 return "FAILED", ro_vim_item_update, db_task_update
617
618
619 class VimInteractionImage(VimInteractionBase):
620 def new(self, ro_task, task_index, task_depends):
621 task = ro_task["tasks"][task_index]
622 task_id = task["task_id"]
623 created = False
624 created_items = {}
625 target_vim = self.my_vims[ro_task["target_id"]]
626
627 try:
628 # FIND
629 if task.get("find_params"):
630 vim_images = target_vim.get_image_list(**task["find_params"])
631
632 if not vim_images:
633 raise NsWorkerExceptionNotFound(
634 "Image not found with this criteria: '{}'".format(
635 task["find_params"]
636 )
637 )
638 elif len(vim_images) > 1:
639 raise NsWorkerException(
640 "More than one image found with this criteria: '{}'".format(
641 task["find_params"]
642 )
643 )
644 else:
645 vim_image_id = vim_images[0]["id"]
646
647 ro_vim_item_update = {
648 "vim_id": vim_image_id,
649 "vim_status": "DONE",
650 "created": created,
651 "created_items": created_items,
652 "vim_details": None,
653 }
654 self.logger.debug(
655 "task={} {} new-image={} created={}".format(
656 task_id, ro_task["target_id"], vim_image_id, created
657 )
658 )
659
660 return "DONE", ro_vim_item_update
661 except (NsWorkerException, vimconn.VimConnException) as e:
662 self.logger.error(
663 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
664 )
665 ro_vim_item_update = {
666 "vim_status": "VIM_ERROR",
667 "created": created,
668 "vim_details": str(e),
669 }
670
671 return "FAILED", ro_vim_item_update
672
673
674 class VimInteractionFlavor(VimInteractionBase):
675 def delete(self, ro_task, task_index):
676 task = ro_task["tasks"][task_index]
677 task_id = task["task_id"]
678 flavor_vim_id = ro_task["vim_info"]["vim_id"]
679 ro_vim_item_update_ok = {
680 "vim_status": "DELETED",
681 "created": False,
682 "vim_details": "DELETED",
683 "vim_id": None,
684 }
685
686 try:
687 if flavor_vim_id:
688 target_vim = self.my_vims[ro_task["target_id"]]
689 target_vim.delete_flavor(flavor_vim_id)
690 except vimconn.VimConnNotFoundException:
691 ro_vim_item_update_ok["vim_details"] = "already deleted"
692 except vimconn.VimConnException as e:
693 self.logger.error(
694 "ro_task={} vim={} del-flavor={}: {}".format(
695 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
696 )
697 )
698 ro_vim_item_update = {
699 "vim_status": "VIM_ERROR",
700 "vim_details": "Error while deleting: {}".format(e),
701 }
702
703 return "FAILED", ro_vim_item_update
704
705 self.logger.debug(
706 "task={} {} del-flavor={} {}".format(
707 task_id,
708 ro_task["target_id"],
709 flavor_vim_id,
710 ro_vim_item_update_ok.get("vim_details", ""),
711 )
712 )
713
714 return "DONE", ro_vim_item_update_ok
715
716 def new(self, ro_task, task_index, task_depends):
717 task = ro_task["tasks"][task_index]
718 task_id = task["task_id"]
719 created = False
720 created_items = {}
721 target_vim = self.my_vims[ro_task["target_id"]]
722
723 try:
724 # FIND
725 vim_flavor_id = None
726
727 if task.get("find_params"):
728 try:
729 flavor_data = task["find_params"]["flavor_data"]
730 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
731 except vimconn.VimConnNotFoundException:
732 pass
733
734 if not vim_flavor_id and task.get("params"):
735 # CREATE
736 flavor_data = task["params"]["flavor_data"]
737 vim_flavor_id = target_vim.new_flavor(flavor_data)
738 created = True
739
740 ro_vim_item_update = {
741 "vim_id": vim_flavor_id,
742 "vim_status": "DONE",
743 "created": created,
744 "created_items": created_items,
745 "vim_details": None,
746 }
747 self.logger.debug(
748 "task={} {} new-flavor={} created={}".format(
749 task_id, ro_task["target_id"], vim_flavor_id, created
750 )
751 )
752
753 return "DONE", ro_vim_item_update
754 except (vimconn.VimConnException, NsWorkerException) as e:
755 self.logger.error(
756 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
757 )
758 ro_vim_item_update = {
759 "vim_status": "VIM_ERROR",
760 "created": created,
761 "vim_details": str(e),
762 }
763
764 return "FAILED", ro_vim_item_update
765
766
767 class VimInteractionAffinityGroup(VimInteractionBase):
768 def delete(self, ro_task, task_index):
769 task = ro_task["tasks"][task_index]
770 task_id = task["task_id"]
771 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
772 ro_vim_item_update_ok = {
773 "vim_status": "DELETED",
774 "created": False,
775 "vim_details": "DELETED",
776 "vim_id": None,
777 }
778
779 try:
780 if affinity_group_vim_id:
781 target_vim = self.my_vims[ro_task["target_id"]]
782 target_vim.delete_affinity_group(affinity_group_vim_id)
783 except vimconn.VimConnNotFoundException:
784 ro_vim_item_update_ok["vim_details"] = "already deleted"
785 except vimconn.VimConnException as e:
786 self.logger.error(
787 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
788 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
789 )
790 )
791 ro_vim_item_update = {
792 "vim_status": "VIM_ERROR",
793 "vim_details": "Error while deleting: {}".format(e),
794 }
795
796 return "FAILED", ro_vim_item_update
797
798 self.logger.debug(
799 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
800 task_id,
801 ro_task["target_id"],
802 affinity_group_vim_id,
803 ro_vim_item_update_ok.get("vim_details", ""),
804 )
805 )
806
807 return "DONE", ro_vim_item_update_ok
808
809 def new(self, ro_task, task_index, task_depends):
810 task = ro_task["tasks"][task_index]
811 task_id = task["task_id"]
812 created = False
813 created_items = {}
814 target_vim = self.my_vims[ro_task["target_id"]]
815
816 try:
817 affinity_group_vim_id = None
818 affinity_group_data = None
819
820 if task.get("params"):
821 affinity_group_data = task["params"].get("affinity_group_data")
822
823 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
824 try:
825 param_affinity_group_id = task["params"]["affinity_group_data"].get(
826 "vim-affinity-group-id"
827 )
828 affinity_group_vim_id = target_vim.get_affinity_group(
829 param_affinity_group_id
830 ).get("id")
831 except vimconn.VimConnNotFoundException:
832 self.logger.error(
833 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
834 "could not be found at VIM. Creating a new one.".format(
835 task_id, ro_task["target_id"], param_affinity_group_id
836 )
837 )
838
839 if not affinity_group_vim_id and affinity_group_data:
840 affinity_group_vim_id = target_vim.new_affinity_group(
841 affinity_group_data
842 )
843 created = True
844
845 ro_vim_item_update = {
846 "vim_id": affinity_group_vim_id,
847 "vim_status": "DONE",
848 "created": created,
849 "created_items": created_items,
850 "vim_details": None,
851 }
852 self.logger.debug(
853 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
854 task_id, ro_task["target_id"], affinity_group_vim_id, created
855 )
856 )
857
858 return "DONE", ro_vim_item_update
859 except (vimconn.VimConnException, NsWorkerException) as e:
860 self.logger.error(
861 "task={} vim={} new-affinity-or-anti-affinity-group:"
862 " {}".format(task_id, ro_task["target_id"], e)
863 )
864 ro_vim_item_update = {
865 "vim_status": "VIM_ERROR",
866 "created": created,
867 "vim_details": str(e),
868 }
869
870 return "FAILED", ro_vim_item_update
871
872
873 class VimInteractionSdnNet(VimInteractionBase):
874 @staticmethod
875 def _match_pci(port_pci, mapping):
876 """
877 Check if port_pci matches with mapping
878 mapping can have brackets to indicate that several chars are accepted. e.g
879 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
880 :param port_pci: text
881 :param mapping: text, can contain brackets to indicate several chars are available
882 :return: True if matches, False otherwise
883 """
884 if not port_pci or not mapping:
885 return False
886 if port_pci == mapping:
887 return True
888
889 mapping_index = 0
890 pci_index = 0
891 while True:
892 bracket_start = mapping.find("[", mapping_index)
893
894 if bracket_start == -1:
895 break
896
897 bracket_end = mapping.find("]", bracket_start)
898 if bracket_end == -1:
899 break
900
901 length = bracket_start - mapping_index
902 if (
903 length
904 and port_pci[pci_index : pci_index + length]
905 != mapping[mapping_index:bracket_start]
906 ):
907 return False
908
909 if (
910 port_pci[pci_index + length]
911 not in mapping[bracket_start + 1 : bracket_end]
912 ):
913 return False
914
915 pci_index += length + 1
916 mapping_index = bracket_end + 1
917
918 if port_pci[pci_index:] != mapping[mapping_index:]:
919 return False
920
921 return True
922
923 def _get_interfaces(self, vlds_to_connect, vim_account_id):
924 """
925 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
926 :param vim_account_id:
927 :return:
928 """
929 interfaces = []
930
931 for vld in vlds_to_connect:
932 table, _, db_id = vld.partition(":")
933 db_id, _, vld = db_id.partition(":")
934 _, _, vld_id = vld.partition(".")
935
936 if table == "vnfrs":
937 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
938 iface_key = "vnf-vld-id"
939 else: # table == "nsrs"
940 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
941 iface_key = "ns-vld-id"
942
943 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
944
945 for db_vnfr in db_vnfrs:
946 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
947 for iface_index, interface in enumerate(vdur["interfaces"]):
948 if interface.get(iface_key) == vld_id and interface.get(
949 "type"
950 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
951 # only SR-IOV o PT
952 interface_ = interface.copy()
953 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
954 db_vnfr["_id"], vdu_index, iface_index
955 )
956
957 if vdur.get("status") == "ERROR":
958 interface_["status"] = "ERROR"
959
960 interfaces.append(interface_)
961
962 return interfaces
963
964 def refresh(self, ro_task):
965 # look for task create
966 task_create_index, _ = next(
967 i_t
968 for i_t in enumerate(ro_task["tasks"])
969 if i_t[1]
970 and i_t[1]["action"] == "CREATE"
971 and i_t[1]["status"] != "FINISHED"
972 )
973
974 return self.new(ro_task, task_create_index, None)
975
976 def new(self, ro_task, task_index, task_depends):
977
978 task = ro_task["tasks"][task_index]
979 task_id = task["task_id"]
980 target_vim = self.my_vims[ro_task["target_id"]]
981
982 sdn_net_id = ro_task["vim_info"]["vim_id"]
983
984 created_items = ro_task["vim_info"].get("created_items")
985 connected_ports = ro_task["vim_info"].get("connected_ports", [])
986 new_connected_ports = []
987 last_update = ro_task["vim_info"].get("last_update", 0)
988 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
989 error_list = []
990 created = ro_task["vim_info"].get("created", False)
991
992 try:
993 # CREATE
994 params = task["params"]
995 vlds_to_connect = params.get("vlds", [])
996 associated_vim = params.get("target_vim")
997 # external additional ports
998 additional_ports = params.get("sdn-ports") or ()
999 _, _, vim_account_id = (
1000 (None, None, None)
1001 if associated_vim is None
1002 else associated_vim.partition(":")
1003 )
1004
1005 if associated_vim:
1006 # get associated VIM
1007 if associated_vim not in self.db_vims:
1008 self.db_vims[associated_vim] = self.db.get_one(
1009 "vim_accounts", {"_id": vim_account_id}
1010 )
1011
1012 db_vim = self.db_vims[associated_vim]
1013
1014 # look for ports to connect
1015 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1016 # print(ports)
1017
1018 sdn_ports = []
1019 pending_ports = error_ports = 0
1020 vlan_used = None
1021 sdn_need_update = False
1022
1023 for port in ports:
1024 vlan_used = port.get("vlan") or vlan_used
1025
1026 # TODO. Do not connect if already done
1027 if not port.get("compute_node") or not port.get("pci"):
1028 if port.get("status") == "ERROR":
1029 error_ports += 1
1030 else:
1031 pending_ports += 1
1032 continue
1033
1034 pmap = None
1035 compute_node_mappings = next(
1036 (
1037 c
1038 for c in db_vim["config"].get("sdn-port-mapping", ())
1039 if c and c["compute_node"] == port["compute_node"]
1040 ),
1041 None,
1042 )
1043
1044 if compute_node_mappings:
1045 # process port_mapping pci of type 0000:af:1[01].[1357]
1046 pmap = next(
1047 (
1048 p
1049 for p in compute_node_mappings["ports"]
1050 if self._match_pci(port["pci"], p.get("pci"))
1051 ),
1052 None,
1053 )
1054
1055 if not pmap:
1056 if not db_vim["config"].get("mapping_not_needed"):
1057 error_list.append(
1058 "Port mapping not found for compute_node={} pci={}".format(
1059 port["compute_node"], port["pci"]
1060 )
1061 )
1062 continue
1063
1064 pmap = {}
1065
1066 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1067 new_port = {
1068 "service_endpoint_id": pmap.get("service_endpoint_id")
1069 or service_endpoint_id,
1070 "service_endpoint_encapsulation_type": "dot1q"
1071 if port["type"] == "SR-IOV"
1072 else None,
1073 "service_endpoint_encapsulation_info": {
1074 "vlan": port.get("vlan"),
1075 "mac": port.get("mac-address"),
1076 "device_id": pmap.get("device_id") or port["compute_node"],
1077 "device_interface_id": pmap.get("device_interface_id")
1078 or port["pci"],
1079 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1080 "switch_port": pmap.get("switch_port"),
1081 "service_mapping_info": pmap.get("service_mapping_info"),
1082 },
1083 }
1084
1085 # TODO
1086 # if port["modified_at"] > last_update:
1087 # sdn_need_update = True
1088 new_connected_ports.append(port["id"]) # TODO
1089 sdn_ports.append(new_port)
1090
1091 if error_ports:
1092 error_list.append(
1093 "{} interfaces have not been created as VDU is on ERROR status".format(
1094 error_ports
1095 )
1096 )
1097
1098 # connect external ports
1099 for index, additional_port in enumerate(additional_ports):
1100 additional_port_id = additional_port.get(
1101 "service_endpoint_id"
1102 ) or "external-{}".format(index)
1103 sdn_ports.append(
1104 {
1105 "service_endpoint_id": additional_port_id,
1106 "service_endpoint_encapsulation_type": additional_port.get(
1107 "service_endpoint_encapsulation_type", "dot1q"
1108 ),
1109 "service_endpoint_encapsulation_info": {
1110 "vlan": additional_port.get("vlan") or vlan_used,
1111 "mac": additional_port.get("mac_address"),
1112 "device_id": additional_port.get("device_id"),
1113 "device_interface_id": additional_port.get(
1114 "device_interface_id"
1115 ),
1116 "switch_dpid": additional_port.get("switch_dpid")
1117 or additional_port.get("switch_id"),
1118 "switch_port": additional_port.get("switch_port"),
1119 "service_mapping_info": additional_port.get(
1120 "service_mapping_info"
1121 ),
1122 },
1123 }
1124 )
1125 new_connected_ports.append(additional_port_id)
1126 sdn_info = ""
1127
1128 # if there are more ports to connect or they have been modified, call create/update
1129 if error_list:
1130 sdn_status = "ERROR"
1131 sdn_info = "; ".join(error_list)
1132 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1133 last_update = time.time()
1134
1135 if not sdn_net_id:
1136 if len(sdn_ports) < 2:
1137 sdn_status = "ACTIVE"
1138
1139 if not pending_ports:
1140 self.logger.debug(
1141 "task={} {} new-sdn-net done, less than 2 ports".format(
1142 task_id, ro_task["target_id"]
1143 )
1144 )
1145 else:
1146 net_type = params.get("type") or "ELAN"
1147 (
1148 sdn_net_id,
1149 created_items,
1150 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1151 created = True
1152 self.logger.debug(
1153 "task={} {} new-sdn-net={} created={}".format(
1154 task_id, ro_task["target_id"], sdn_net_id, created
1155 )
1156 )
1157 else:
1158 created_items = target_vim.edit_connectivity_service(
1159 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1160 )
1161 created = True
1162 self.logger.debug(
1163 "task={} {} update-sdn-net={} created={}".format(
1164 task_id, ro_task["target_id"], sdn_net_id, created
1165 )
1166 )
1167
1168 connected_ports = new_connected_ports
1169 elif sdn_net_id:
1170 wim_status_dict = target_vim.get_connectivity_service_status(
1171 sdn_net_id, conn_info=created_items
1172 )
1173 sdn_status = wim_status_dict["sdn_status"]
1174
1175 if wim_status_dict.get("sdn_info"):
1176 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1177
1178 if wim_status_dict.get("error_msg"):
1179 sdn_info = wim_status_dict.get("error_msg") or ""
1180
1181 if pending_ports:
1182 if sdn_status != "ERROR":
1183 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1184 len(ports) - pending_ports, len(ports)
1185 )
1186
1187 if sdn_status == "ACTIVE":
1188 sdn_status = "BUILD"
1189
1190 ro_vim_item_update = {
1191 "vim_id": sdn_net_id,
1192 "vim_status": sdn_status,
1193 "created": created,
1194 "created_items": created_items,
1195 "connected_ports": connected_ports,
1196 "vim_details": sdn_info,
1197 "last_update": last_update,
1198 }
1199
1200 return sdn_status, ro_vim_item_update
1201 except Exception as e:
1202 self.logger.error(
1203 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1204 exc_info=not isinstance(
1205 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1206 ),
1207 )
1208 ro_vim_item_update = {
1209 "vim_status": "VIM_ERROR",
1210 "created": created,
1211 "vim_details": str(e),
1212 }
1213
1214 return "FAILED", ro_vim_item_update
1215
1216 def delete(self, ro_task, task_index):
1217 task = ro_task["tasks"][task_index]
1218 task_id = task["task_id"]
1219 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1220 ro_vim_item_update_ok = {
1221 "vim_status": "DELETED",
1222 "created": False,
1223 "vim_details": "DELETED",
1224 "vim_id": None,
1225 }
1226
1227 try:
1228 if sdn_vim_id:
1229 target_vim = self.my_vims[ro_task["target_id"]]
1230 target_vim.delete_connectivity_service(
1231 sdn_vim_id, ro_task["vim_info"].get("created_items")
1232 )
1233
1234 except Exception as e:
1235 if (
1236 isinstance(e, sdnconn.SdnConnectorError)
1237 and e.http_code == HTTPStatus.NOT_FOUND.value
1238 ):
1239 ro_vim_item_update_ok["vim_details"] = "already deleted"
1240 else:
1241 self.logger.error(
1242 "ro_task={} vim={} del-sdn-net={}: {}".format(
1243 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1244 ),
1245 exc_info=not isinstance(
1246 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1247 ),
1248 )
1249 ro_vim_item_update = {
1250 "vim_status": "VIM_ERROR",
1251 "vim_details": "Error while deleting: {}".format(e),
1252 }
1253
1254 return "FAILED", ro_vim_item_update
1255
1256 self.logger.debug(
1257 "task={} {} del-sdn-net={} {}".format(
1258 task_id,
1259 ro_task["target_id"],
1260 sdn_vim_id,
1261 ro_vim_item_update_ok.get("vim_details", ""),
1262 )
1263 )
1264
1265 return "DONE", ro_vim_item_update_ok
1266
1267
1268 class NsWorker(threading.Thread):
1269 REFRESH_BUILD = 5 # 5 seconds
1270 REFRESH_ACTIVE = 60 # 1 minute
1271 REFRESH_ERROR = 600
1272 REFRESH_IMAGE = 3600 * 10
1273 REFRESH_DELETE = 3600 * 10
1274 QUEUE_SIZE = 100
1275 terminate = False
1276
1277 def __init__(self, worker_index, config, plugins, db):
1278 """
1279
1280 :param worker_index: thread index
1281 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1282 :param plugins: global shared dict with the loaded plugins
1283 :param db: database class instance to use
1284 """
1285 threading.Thread.__init__(self)
1286 self.config = config
1287 self.plugins = plugins
1288 self.plugin_name = "unknown"
1289 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1290 self.worker_index = worker_index
1291 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1292 # targetvim: vimplugin class
1293 self.my_vims = {}
1294 # targetvim: vim information from database
1295 self.db_vims = {}
1296 # targetvim list
1297 self.vim_targets = []
1298 self.my_id = config["process_id"] + ":" + str(worker_index)
1299 self.db = db
1300 self.item2class = {
1301 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1302 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1303 "image": VimInteractionImage(
1304 self.db, self.my_vims, self.db_vims, self.logger
1305 ),
1306 "flavor": VimInteractionFlavor(
1307 self.db, self.my_vims, self.db_vims, self.logger
1308 ),
1309 "sdn_net": VimInteractionSdnNet(
1310 self.db, self.my_vims, self.db_vims, self.logger
1311 ),
1312 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1313 self.db, self.my_vims, self.db_vims, self.logger
1314 ),
1315 }
1316 self.time_last_task_processed = None
1317 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1318 self.tasks_to_delete = []
1319 # it is idle when there are not vim_targets associated
1320 self.idle = True
1321 self.task_locked_time = config["global"]["task_locked_time"]
1322
1323 def insert_task(self, task):
1324 try:
1325 self.task_queue.put(task, False)
1326 return None
1327 except queue.Full:
1328 raise NsWorkerException("timeout inserting a task")
1329
1330 def terminate(self):
1331 self.insert_task("exit")
1332
1333 def del_task(self, task):
1334 with self.task_lock:
1335 if task["status"] == "SCHEDULED":
1336 task["status"] = "SUPERSEDED"
1337 return True
1338 else: # task["status"] == "processing"
1339 self.task_lock.release()
1340 return False
1341
1342 def _process_vim_config(self, target_id, db_vim):
1343 """
1344 Process vim config, creating vim configuration files as ca_cert
1345 :param target_id: vim/sdn/wim + id
1346 :param db_vim: Vim dictionary obtained from database
1347 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1348 """
1349 if not db_vim.get("config"):
1350 return
1351
1352 file_name = ""
1353
1354 try:
1355 if db_vim["config"].get("ca_cert_content"):
1356 file_name = "{}:{}".format(target_id, self.worker_index)
1357
1358 try:
1359 mkdir(file_name)
1360 except FileExistsError:
1361 pass
1362
1363 file_name = file_name + "/ca_cert"
1364
1365 with open(file_name, "w") as f:
1366 f.write(db_vim["config"]["ca_cert_content"])
1367 del db_vim["config"]["ca_cert_content"]
1368 db_vim["config"]["ca_cert"] = file_name
1369 except Exception as e:
1370 raise NsWorkerException(
1371 "Error writing to file '{}': {}".format(file_name, e)
1372 )
1373
1374 def _load_plugin(self, name, type="vim"):
1375 # type can be vim or sdn
1376 if "rovim_dummy" not in self.plugins:
1377 self.plugins["rovim_dummy"] = VimDummyConnector
1378
1379 if "rosdn_dummy" not in self.plugins:
1380 self.plugins["rosdn_dummy"] = SdnDummyConnector
1381
1382 if name in self.plugins:
1383 return self.plugins[name]
1384
1385 try:
1386 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1387 self.plugins[name] = ep.load()
1388 except Exception as e:
1389 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1390
1391 if name and name not in self.plugins:
1392 raise NsWorkerException(
1393 "Plugin 'osm_{n}' has not been installed".format(n=name)
1394 )
1395
1396 return self.plugins[name]
1397
1398 def _unload_vim(self, target_id):
1399 """
1400 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1401 :param target_id: Contains type:_id; where type can be 'vim', ...
1402 :return: None.
1403 """
1404 try:
1405 self.db_vims.pop(target_id, None)
1406 self.my_vims.pop(target_id, None)
1407
1408 if target_id in self.vim_targets:
1409 self.vim_targets.remove(target_id)
1410
1411 self.logger.info("Unloaded {}".format(target_id))
1412 rmtree("{}:{}".format(target_id, self.worker_index))
1413 except FileNotFoundError:
1414 pass # this is raised by rmtree if folder does not exist
1415 except Exception as e:
1416 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1417
1418 def _check_vim(self, target_id):
1419 """
1420 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1421 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1422 :return: None.
1423 """
1424 target, _, _id = target_id.partition(":")
1425 now = time.time()
1426 update_dict = {}
1427 unset_dict = {}
1428 op_text = ""
1429 step = ""
1430 loaded = target_id in self.vim_targets
1431 target_database = (
1432 "vim_accounts"
1433 if target == "vim"
1434 else "wim_accounts"
1435 if target == "wim"
1436 else "sdns"
1437 )
1438
1439 try:
1440 step = "Getting {} from db".format(target_id)
1441 db_vim = self.db.get_one(target_database, {"_id": _id})
1442
1443 for op_index, operation in enumerate(
1444 db_vim["_admin"].get("operations", ())
1445 ):
1446 if operation["operationState"] != "PROCESSING":
1447 continue
1448
1449 locked_at = operation.get("locked_at")
1450
1451 if locked_at is not None and locked_at >= now - self.task_locked_time:
1452 # some other thread is doing this operation
1453 return
1454
1455 # lock
1456 op_text = "_admin.operations.{}.".format(op_index)
1457
1458 if not self.db.set_one(
1459 target_database,
1460 q_filter={
1461 "_id": _id,
1462 op_text + "operationState": "PROCESSING",
1463 op_text + "locked_at": locked_at,
1464 },
1465 update_dict={
1466 op_text + "locked_at": now,
1467 "admin.current_operation": op_index,
1468 },
1469 fail_on_empty=False,
1470 ):
1471 return
1472
1473 unset_dict[op_text + "locked_at"] = None
1474 unset_dict["current_operation"] = None
1475 step = "Loading " + target_id
1476 error_text = self._load_vim(target_id)
1477
1478 if not error_text:
1479 step = "Checking connectivity"
1480
1481 if target == "vim":
1482 self.my_vims[target_id].check_vim_connectivity()
1483 else:
1484 self.my_vims[target_id].check_credentials()
1485
1486 update_dict["_admin.operationalState"] = "ENABLED"
1487 update_dict["_admin.detailed-status"] = ""
1488 unset_dict[op_text + "detailed-status"] = None
1489 update_dict[op_text + "operationState"] = "COMPLETED"
1490
1491 return
1492
1493 except Exception as e:
1494 error_text = "{}: {}".format(step, e)
1495 self.logger.error("{} for {}: {}".format(step, target_id, e))
1496
1497 finally:
1498 if update_dict or unset_dict:
1499 if error_text:
1500 update_dict[op_text + "operationState"] = "FAILED"
1501 update_dict[op_text + "detailed-status"] = error_text
1502 unset_dict.pop(op_text + "detailed-status", None)
1503 update_dict["_admin.operationalState"] = "ERROR"
1504 update_dict["_admin.detailed-status"] = error_text
1505
1506 if op_text:
1507 update_dict[op_text + "statusEnteredTime"] = now
1508
1509 self.db.set_one(
1510 target_database,
1511 q_filter={"_id": _id},
1512 update_dict=update_dict,
1513 unset=unset_dict,
1514 fail_on_empty=False,
1515 )
1516
1517 if not loaded:
1518 self._unload_vim(target_id)
1519
1520 def _reload_vim(self, target_id):
1521 if target_id in self.vim_targets:
1522 self._load_vim(target_id)
1523 else:
1524 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1525 # just remove it to force load again next time it is needed
1526 self.db_vims.pop(target_id, None)
1527
1528 def _load_vim(self, target_id):
1529 """
1530 Load or reload a vim_account, sdn_controller or wim_account.
1531 Read content from database, load the plugin if not loaded.
1532 In case of error loading the plugin, it load a failing VIM_connector
1533 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1534 :param target_id: Contains type:_id; where type can be 'vim', ...
1535 :return: None if ok, descriptive text if error
1536 """
1537 target, _, _id = target_id.partition(":")
1538 target_database = (
1539 "vim_accounts"
1540 if target == "vim"
1541 else "wim_accounts"
1542 if target == "wim"
1543 else "sdns"
1544 )
1545 plugin_name = ""
1546 vim = None
1547
1548 try:
1549 step = "Getting {}={} from db".format(target, _id)
1550 # TODO process for wim, sdnc, ...
1551 vim = self.db.get_one(target_database, {"_id": _id})
1552
1553 # if deep_get(vim, "config", "sdn-controller"):
1554 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1555 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1556
1557 step = "Decrypting password"
1558 schema_version = vim.get("schema_version")
1559 self.db.encrypt_decrypt_fields(
1560 vim,
1561 "decrypt",
1562 fields=("password", "secret"),
1563 schema_version=schema_version,
1564 salt=_id,
1565 )
1566 self._process_vim_config(target_id, vim)
1567
1568 if target == "vim":
1569 plugin_name = "rovim_" + vim["vim_type"]
1570 step = "Loading plugin '{}'".format(plugin_name)
1571 vim_module_conn = self._load_plugin(plugin_name)
1572 step = "Loading {}'".format(target_id)
1573 self.my_vims[target_id] = vim_module_conn(
1574 uuid=vim["_id"],
1575 name=vim["name"],
1576 tenant_id=vim.get("vim_tenant_id"),
1577 tenant_name=vim.get("vim_tenant_name"),
1578 url=vim["vim_url"],
1579 url_admin=None,
1580 user=vim["vim_user"],
1581 passwd=vim["vim_password"],
1582 config=vim.get("config") or {},
1583 persistent_info={},
1584 )
1585 else: # sdn
1586 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1587 step = "Loading plugin '{}'".format(plugin_name)
1588 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1589 step = "Loading {}'".format(target_id)
1590 wim = deepcopy(vim)
1591 wim_config = wim.pop("config", {}) or {}
1592 wim["uuid"] = wim["_id"]
1593 if "url" in wim and "wim_url" not in wim:
1594 wim["wim_url"] = wim["url"]
1595 elif "url" not in wim and "wim_url" in wim:
1596 wim["url"] = wim["wim_url"]
1597
1598 if wim.get("dpid"):
1599 wim_config["dpid"] = wim.pop("dpid")
1600
1601 if wim.get("switch_id"):
1602 wim_config["switch_id"] = wim.pop("switch_id")
1603
1604 # wim, wim_account, config
1605 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1606 self.db_vims[target_id] = vim
1607 self.error_status = None
1608
1609 self.logger.info(
1610 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1611 )
1612 except Exception as e:
1613 self.logger.error(
1614 "Cannot load {} plugin={}: {} {}".format(
1615 target_id, plugin_name, step, e
1616 )
1617 )
1618
1619 self.db_vims[target_id] = vim or {}
1620 self.db_vims[target_id] = FailingConnector(str(e))
1621 error_status = "{} Error: {}".format(step, e)
1622
1623 return error_status
1624 finally:
1625 if target_id not in self.vim_targets:
1626 self.vim_targets.append(target_id)
1627
1628 def _get_db_task(self):
1629 """
1630 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1631 :return: None
1632 """
1633 now = time.time()
1634
1635 if not self.time_last_task_processed:
1636 self.time_last_task_processed = now
1637
1638 try:
1639 while True:
1640 """
1641 # Log RO tasks only when loglevel is DEBUG
1642 if self.logger.getEffectiveLevel() == logging.DEBUG:
1643 self._log_ro_task(
1644 None,
1645 None,
1646 None,
1647 "TASK_WF",
1648 "task_locked_time="
1649 + str(self.task_locked_time)
1650 + " "
1651 + "time_last_task_processed="
1652 + str(self.time_last_task_processed)
1653 + " "
1654 + "now="
1655 + str(now),
1656 )
1657 """
1658 locked = self.db.set_one(
1659 "ro_tasks",
1660 q_filter={
1661 "target_id": self.vim_targets,
1662 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1663 "locked_at.lt": now - self.task_locked_time,
1664 "to_check_at.lt": self.time_last_task_processed,
1665 },
1666 update_dict={"locked_by": self.my_id, "locked_at": now},
1667 fail_on_empty=False,
1668 )
1669
1670 if locked:
1671 # read and return
1672 ro_task = self.db.get_one(
1673 "ro_tasks",
1674 q_filter={
1675 "target_id": self.vim_targets,
1676 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1677 "locked_at": now,
1678 },
1679 )
1680 return ro_task
1681
1682 if self.time_last_task_processed == now:
1683 self.time_last_task_processed = None
1684 return None
1685 else:
1686 self.time_last_task_processed = now
1687 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1688
1689 except DbException as e:
1690 self.logger.error("Database exception at _get_db_task: {}".format(e))
1691 except Exception as e:
1692 self.logger.critical(
1693 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1694 )
1695
1696 return None
1697
1698 def _get_db_all_tasks(self):
1699 """
1700 Read all content of table ro_tasks to log it
1701 :return: None
1702 """
1703 try:
1704 # Checking the content of the BD:
1705
1706 # read and return
1707 ro_task = self.db.get_list("ro_tasks")
1708 for rt in ro_task:
1709 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1710 return ro_task
1711
1712 except DbException as e:
1713 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1714 except Exception as e:
1715 self.logger.critical(
1716 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1717 )
1718
1719 return None
1720
1721 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1722 """
1723 Generate a log with the following format:
1724
1725 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1726 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1727 task_array_index;task_id;task_action;task_item;task_args
1728
1729 Example:
1730
1731 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1732 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1733 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1734 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1735 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1736 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1737 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1738 """
1739 try:
1740 line = []
1741 i = 0
1742 if ro_task is not None and isinstance(ro_task, dict):
1743 for t in ro_task["tasks"]:
1744 line.clear()
1745 line.append(mark)
1746 line.append(event)
1747 line.append(ro_task.get("_id", ""))
1748 line.append(str(ro_task.get("locked_at", "")))
1749 line.append(str(ro_task.get("modified_at", "")))
1750 line.append(str(ro_task.get("created_at", "")))
1751 line.append(str(ro_task.get("to_check_at", "")))
1752 line.append(str(ro_task.get("locked_by", "")))
1753 line.append(str(ro_task.get("target_id", "")))
1754 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1755 line.append(str(ro_task.get("vim_info", "")))
1756 line.append(str(ro_task.get("tasks", "")))
1757 if isinstance(t, dict):
1758 line.append(str(t.get("status", "")))
1759 line.append(str(t.get("action_id", "")))
1760 line.append(str(i))
1761 line.append(str(t.get("task_id", "")))
1762 line.append(str(t.get("action", "")))
1763 line.append(str(t.get("item", "")))
1764 line.append(str(t.get("find_params", "")))
1765 line.append(str(t.get("params", "")))
1766 else:
1767 line.extend([""] * 2)
1768 line.append(str(i))
1769 line.extend([""] * 5)
1770
1771 i += 1
1772 self.logger.debug(";".join(line))
1773 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1774 i = 0
1775 while True:
1776 st = "tasks.{}.status".format(i)
1777 if st not in db_ro_task_update:
1778 break
1779 line.clear()
1780 line.append(mark)
1781 line.append(event)
1782 line.append(db_ro_task_update.get("_id", ""))
1783 line.append(str(db_ro_task_update.get("locked_at", "")))
1784 line.append(str(db_ro_task_update.get("modified_at", "")))
1785 line.append("")
1786 line.append(str(db_ro_task_update.get("to_check_at", "")))
1787 line.append(str(db_ro_task_update.get("locked_by", "")))
1788 line.append("")
1789 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1790 line.append("")
1791 line.append(str(db_ro_task_update.get("vim_info", "")))
1792 line.append(str(str(db_ro_task_update).count(".status")))
1793 line.append(db_ro_task_update.get(st, ""))
1794 line.append("")
1795 line.append(str(i))
1796 line.extend([""] * 3)
1797 i += 1
1798 self.logger.debug(";".join(line))
1799
1800 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1801 line.clear()
1802 line.append(mark)
1803 line.append(event)
1804 line.append(db_ro_task_delete.get("_id", ""))
1805 line.append("")
1806 line.append(db_ro_task_delete.get("modified_at", ""))
1807 line.extend([""] * 13)
1808 self.logger.debug(";".join(line))
1809
1810 else:
1811 line.clear()
1812 line.append(mark)
1813 line.append(event)
1814 line.extend([""] * 16)
1815 self.logger.debug(";".join(line))
1816
1817 except Exception as e:
1818 self.logger.error("Error logging ro_task: {}".format(e))
1819
1820 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1821 """
1822 Determine if this task need to be done or superseded
1823 :return: None
1824 """
1825 my_task = ro_task["tasks"][task_index]
1826 task_id = my_task["task_id"]
1827 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1828 "created_items", False
1829 )
1830
1831 if my_task["status"] == "FAILED":
1832 return None, None # TODO need to be retry??
1833
1834 try:
1835 for index, task in enumerate(ro_task["tasks"]):
1836 if index == task_index or not task:
1837 continue # own task
1838
1839 if (
1840 my_task["target_record"] == task["target_record"]
1841 and task["action"] == "CREATE"
1842 ):
1843 # set to finished
1844 db_update["tasks.{}.status".format(index)] = task[
1845 "status"
1846 ] = "FINISHED"
1847 elif task["action"] == "CREATE" and task["status"] not in (
1848 "FINISHED",
1849 "SUPERSEDED",
1850 ):
1851 needed_delete = False
1852
1853 if needed_delete:
1854 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1855 else:
1856 return "SUPERSEDED", None
1857 except Exception as e:
1858 if not isinstance(e, NsWorkerException):
1859 self.logger.critical(
1860 "Unexpected exception at _delete_task task={}: {}".format(
1861 task_id, e
1862 ),
1863 exc_info=True,
1864 )
1865
1866 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1867
1868 def _create_task(self, ro_task, task_index, task_depends, db_update):
1869 """
1870 Determine if this task need to create something at VIM
1871 :return: None
1872 """
1873 my_task = ro_task["tasks"][task_index]
1874 task_id = my_task["task_id"]
1875 task_status = None
1876
1877 if my_task["status"] == "FAILED":
1878 return None, None # TODO need to be retry??
1879 elif my_task["status"] == "SCHEDULED":
1880 # check if already created by another task
1881 for index, task in enumerate(ro_task["tasks"]):
1882 if index == task_index or not task:
1883 continue # own task
1884
1885 if task["action"] == "CREATE" and task["status"] not in (
1886 "SCHEDULED",
1887 "FINISHED",
1888 "SUPERSEDED",
1889 ):
1890 return task["status"], "COPY_VIM_INFO"
1891
1892 try:
1893 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1894 ro_task, task_index, task_depends
1895 )
1896 # TODO update other CREATE tasks
1897 except Exception as e:
1898 if not isinstance(e, NsWorkerException):
1899 self.logger.error(
1900 "Error executing task={}: {}".format(task_id, e), exc_info=True
1901 )
1902
1903 task_status = "FAILED"
1904 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1905 # TODO update ro_vim_item_update
1906
1907 return task_status, ro_vim_item_update
1908 else:
1909 return None, None
1910
1911 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1912 """
1913 Look for dependency task
1914 :param task_id: Can be one of
1915 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1916 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1917 3. task.task_id: "<action_id>:number"
1918 :param ro_task:
1919 :param target_id:
1920 :return: database ro_task plus index of task
1921 """
1922 if (
1923 task_id.startswith("vim:")
1924 or task_id.startswith("sdn:")
1925 or task_id.startswith("wim:")
1926 ):
1927 target_id, _, task_id = task_id.partition(" ")
1928
1929 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1930 ro_task_dependency = self.db.get_one(
1931 "ro_tasks",
1932 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1933 fail_on_empty=False,
1934 )
1935
1936 if ro_task_dependency:
1937 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1938 if task["target_record_id"] == task_id:
1939 return ro_task_dependency, task_index
1940
1941 else:
1942 if ro_task:
1943 for task_index, task in enumerate(ro_task["tasks"]):
1944 if task and task["task_id"] == task_id:
1945 return ro_task, task_index
1946
1947 ro_task_dependency = self.db.get_one(
1948 "ro_tasks",
1949 q_filter={
1950 "tasks.ANYINDEX.task_id": task_id,
1951 "tasks.ANYINDEX.target_record.ne": None,
1952 },
1953 fail_on_empty=False,
1954 )
1955
1956 if ro_task_dependency:
1957 for task_index, task in ro_task_dependency["tasks"]:
1958 if task["task_id"] == task_id:
1959 return ro_task_dependency, task_index
1960 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1961
1962 def _process_pending_tasks(self, ro_task):
1963 ro_task_id = ro_task["_id"]
1964 now = time.time()
1965 # one day
1966 next_check_at = now + (24 * 60 * 60)
1967 db_ro_task_update = {}
1968
1969 def _update_refresh(new_status):
1970 # compute next_refresh
1971 nonlocal task
1972 nonlocal next_check_at
1973 nonlocal db_ro_task_update
1974 nonlocal ro_task
1975
1976 next_refresh = time.time()
1977
1978 if task["item"] in ("image", "flavor"):
1979 next_refresh += self.REFRESH_IMAGE
1980 elif new_status == "BUILD":
1981 next_refresh += self.REFRESH_BUILD
1982 elif new_status == "DONE":
1983 next_refresh += self.REFRESH_ACTIVE
1984 else:
1985 next_refresh += self.REFRESH_ERROR
1986
1987 next_check_at = min(next_check_at, next_refresh)
1988 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1989 ro_task["vim_info"]["refresh_at"] = next_refresh
1990
1991 try:
1992 """
1993 # Log RO tasks only when loglevel is DEBUG
1994 if self.logger.getEffectiveLevel() == logging.DEBUG:
1995 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1996 """
1997 # 0: get task_status_create
1998 lock_object = None
1999 task_status_create = None
2000 task_create = next(
2001 (
2002 t
2003 for t in ro_task["tasks"]
2004 if t
2005 and t["action"] == "CREATE"
2006 and t["status"] in ("BUILD", "DONE")
2007 ),
2008 None,
2009 )
2010
2011 if task_create:
2012 task_status_create = task_create["status"]
2013
2014 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2015 for task_action in ("DELETE", "CREATE", "EXEC"):
2016 db_vim_update = None
2017 new_status = None
2018
2019 for task_index, task in enumerate(ro_task["tasks"]):
2020 if not task:
2021 continue # task deleted
2022
2023 task_depends = {}
2024 target_update = None
2025
2026 if (
2027 (
2028 task_action in ("DELETE", "EXEC")
2029 and task["status"] not in ("SCHEDULED", "BUILD")
2030 )
2031 or task["action"] != task_action
2032 or (
2033 task_action == "CREATE"
2034 and task["status"] in ("FINISHED", "SUPERSEDED")
2035 )
2036 ):
2037 continue
2038
2039 task_path = "tasks.{}.status".format(task_index)
2040 try:
2041 db_vim_info_update = None
2042
2043 if task["status"] == "SCHEDULED":
2044 # check if tasks that this depends on have been completed
2045 dependency_not_completed = False
2046
2047 for dependency_task_id in task.get("depends_on") or ():
2048 (
2049 dependency_ro_task,
2050 dependency_task_index,
2051 ) = self._get_dependency(
2052 dependency_task_id, target_id=ro_task["target_id"]
2053 )
2054 dependency_task = dependency_ro_task["tasks"][
2055 dependency_task_index
2056 ]
2057
2058 if dependency_task["status"] == "SCHEDULED":
2059 dependency_not_completed = True
2060 next_check_at = min(
2061 next_check_at, dependency_ro_task["to_check_at"]
2062 )
2063 # must allow dependent task to be processed first
2064 # to do this set time after last_task_processed
2065 next_check_at = max(
2066 self.time_last_task_processed, next_check_at
2067 )
2068 break
2069 elif dependency_task["status"] == "FAILED":
2070 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2071 task["action"],
2072 task["item"],
2073 dependency_task["action"],
2074 dependency_task["item"],
2075 dependency_task_id,
2076 dependency_ro_task["vim_info"].get(
2077 "vim_details"
2078 ),
2079 )
2080 self.logger.error(
2081 "task={} {}".format(task["task_id"], error_text)
2082 )
2083 raise NsWorkerException(error_text)
2084
2085 task_depends[dependency_task_id] = dependency_ro_task[
2086 "vim_info"
2087 ]["vim_id"]
2088 task_depends[
2089 "TASK-{}".format(dependency_task_id)
2090 ] = dependency_ro_task["vim_info"]["vim_id"]
2091
2092 if dependency_not_completed:
2093 # TODO set at vim_info.vim_details that it is waiting
2094 continue
2095
2096 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2097 # the task of renew this locking. It will update database locket_at periodically
2098 if not lock_object:
2099 lock_object = LockRenew.add_lock_object(
2100 "ro_tasks", ro_task, self
2101 )
2102
2103 if task["action"] == "DELETE":
2104 (new_status, db_vim_info_update,) = self._delete_task(
2105 ro_task, task_index, task_depends, db_ro_task_update
2106 )
2107 new_status = (
2108 "FINISHED" if new_status == "DONE" else new_status
2109 )
2110 # ^with FINISHED instead of DONE it will not be refreshing
2111
2112 if new_status in ("FINISHED", "SUPERSEDED"):
2113 target_update = "DELETE"
2114 elif task["action"] == "EXEC":
2115 (
2116 new_status,
2117 db_vim_info_update,
2118 db_task_update,
2119 ) = self.item2class[task["item"]].exec(
2120 ro_task, task_index, task_depends
2121 )
2122 new_status = (
2123 "FINISHED" if new_status == "DONE" else new_status
2124 )
2125 # ^with FINISHED instead of DONE it will not be refreshing
2126
2127 if db_task_update:
2128 # load into database the modified db_task_update "retries" and "next_retry"
2129 if db_task_update.get("retries"):
2130 db_ro_task_update[
2131 "tasks.{}.retries".format(task_index)
2132 ] = db_task_update["retries"]
2133
2134 next_check_at = time.time() + db_task_update.get(
2135 "next_retry", 60
2136 )
2137 target_update = None
2138 elif task["action"] == "CREATE":
2139 if task["status"] == "SCHEDULED":
2140 if task_status_create:
2141 new_status = task_status_create
2142 target_update = "COPY_VIM_INFO"
2143 else:
2144 new_status, db_vim_info_update = self.item2class[
2145 task["item"]
2146 ].new(ro_task, task_index, task_depends)
2147 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2148 _update_refresh(new_status)
2149 else:
2150 if (
2151 ro_task["vim_info"]["refresh_at"]
2152 and now > ro_task["vim_info"]["refresh_at"]
2153 ):
2154 new_status, db_vim_info_update = self.item2class[
2155 task["item"]
2156 ].refresh(ro_task)
2157 _update_refresh(new_status)
2158 else:
2159 # The refresh is updated to avoid set the value of "refresh_at" to
2160 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2161 # because it can happen that in this case the task is never processed
2162 _update_refresh(task["status"])
2163
2164 except Exception as e:
2165 new_status = "FAILED"
2166 db_vim_info_update = {
2167 "vim_status": "VIM_ERROR",
2168 "vim_details": str(e),
2169 }
2170
2171 if not isinstance(
2172 e, (NsWorkerException, vimconn.VimConnException)
2173 ):
2174 self.logger.error(
2175 "Unexpected exception at _delete_task task={}: {}".format(
2176 task["task_id"], e
2177 ),
2178 exc_info=True,
2179 )
2180
2181 try:
2182 if db_vim_info_update:
2183 db_vim_update = db_vim_info_update.copy()
2184 db_ro_task_update.update(
2185 {
2186 "vim_info." + k: v
2187 for k, v in db_vim_info_update.items()
2188 }
2189 )
2190 ro_task["vim_info"].update(db_vim_info_update)
2191
2192 if new_status:
2193 if task_action == "CREATE":
2194 task_status_create = new_status
2195 db_ro_task_update[task_path] = new_status
2196
2197 if target_update or db_vim_update:
2198 if target_update == "DELETE":
2199 self._update_target(task, None)
2200 elif target_update == "COPY_VIM_INFO":
2201 self._update_target(task, ro_task["vim_info"])
2202 else:
2203 self._update_target(task, db_vim_update)
2204
2205 except Exception as e:
2206 if (
2207 isinstance(e, DbException)
2208 and e.http_code == HTTPStatus.NOT_FOUND
2209 ):
2210 # if the vnfrs or nsrs has been removed from database, this task must be removed
2211 self.logger.debug(
2212 "marking to delete task={}".format(task["task_id"])
2213 )
2214 self.tasks_to_delete.append(task)
2215 else:
2216 self.logger.error(
2217 "Unexpected exception at _update_target task={}: {}".format(
2218 task["task_id"], e
2219 ),
2220 exc_info=True,
2221 )
2222
2223 locked_at = ro_task["locked_at"]
2224
2225 if lock_object:
2226 locked_at = [
2227 lock_object["locked_at"],
2228 lock_object["locked_at"] + self.task_locked_time,
2229 ]
2230 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2231 # contain exactly locked_at + self.task_locked_time
2232 LockRenew.remove_lock_object(lock_object)
2233
2234 q_filter = {
2235 "_id": ro_task["_id"],
2236 "to_check_at": ro_task["to_check_at"],
2237 "locked_at": locked_at,
2238 }
2239 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2240 # outside this task (by ro_nbi) do not update it
2241 db_ro_task_update["locked_by"] = None
2242 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2243 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2244 db_ro_task_update["modified_at"] = now
2245 db_ro_task_update["to_check_at"] = next_check_at
2246
2247 """
2248 # Log RO tasks only when loglevel is DEBUG
2249 if self.logger.getEffectiveLevel() == logging.DEBUG:
2250 db_ro_task_update_log = db_ro_task_update.copy()
2251 db_ro_task_update_log["_id"] = q_filter["_id"]
2252 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2253 """
2254
2255 if not self.db.set_one(
2256 "ro_tasks",
2257 update_dict=db_ro_task_update,
2258 q_filter=q_filter,
2259 fail_on_empty=False,
2260 ):
2261 del db_ro_task_update["to_check_at"]
2262 del q_filter["to_check_at"]
2263 """
2264 # Log RO tasks only when loglevel is DEBUG
2265 if self.logger.getEffectiveLevel() == logging.DEBUG:
2266 self._log_ro_task(
2267 None,
2268 db_ro_task_update_log,
2269 None,
2270 "TASK_WF",
2271 "SET_TASK " + str(q_filter),
2272 )
2273 """
2274 self.db.set_one(
2275 "ro_tasks",
2276 q_filter=q_filter,
2277 update_dict=db_ro_task_update,
2278 fail_on_empty=True,
2279 )
2280 except DbException as e:
2281 self.logger.error(
2282 "ro_task={} Error updating database {}".format(ro_task_id, e)
2283 )
2284 except Exception as e:
2285 self.logger.error(
2286 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2287 )
2288
2289 def _update_target(self, task, ro_vim_item_update):
2290 table, _, temp = task["target_record"].partition(":")
2291 _id, _, path_vim_status = temp.partition(":")
2292 path_item = path_vim_status[: path_vim_status.rfind(".")]
2293 path_item = path_item[: path_item.rfind(".")]
2294 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2295 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2296
2297 if ro_vim_item_update:
2298 update_dict = {
2299 path_vim_status + "." + k: v
2300 for k, v in ro_vim_item_update.items()
2301 if k
2302 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2303 }
2304
2305 if path_vim_status.startswith("vdur."):
2306 # for backward compatibility, add vdur.name apart from vdur.vim_name
2307 if ro_vim_item_update.get("vim_name"):
2308 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2309
2310 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2311 if ro_vim_item_update.get("vim_id"):
2312 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2313
2314 # update general status
2315 if ro_vim_item_update.get("vim_status"):
2316 update_dict[path_item + ".status"] = ro_vim_item_update[
2317 "vim_status"
2318 ]
2319
2320 if ro_vim_item_update.get("interfaces"):
2321 path_interfaces = path_item + ".interfaces"
2322
2323 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2324 if iface:
2325 update_dict.update(
2326 {
2327 path_interfaces + ".{}.".format(i) + k: v
2328 for k, v in iface.items()
2329 if k in ("vlan", "compute_node", "pci")
2330 }
2331 )
2332
2333 # put ip_address and mac_address with ip-address and mac-address
2334 if iface.get("ip_address"):
2335 update_dict[
2336 path_interfaces + ".{}.".format(i) + "ip-address"
2337 ] = iface["ip_address"]
2338
2339 if iface.get("mac_address"):
2340 update_dict[
2341 path_interfaces + ".{}.".format(i) + "mac-address"
2342 ] = iface["mac_address"]
2343
2344 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2345 update_dict["ip-address"] = iface.get("ip_address").split(
2346 ";"
2347 )[0]
2348
2349 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2350 update_dict[path_item + ".ip-address"] = iface.get(
2351 "ip_address"
2352 ).split(";")[0]
2353
2354 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2355 else:
2356 update_dict = {path_item + ".status": "DELETED"}
2357 self.db.set_one(
2358 table,
2359 q_filter={"_id": _id},
2360 update_dict=update_dict,
2361 unset={path_vim_status: None},
2362 )
2363
2364 def _process_delete_db_tasks(self):
2365 """
2366 Delete task from database because vnfrs or nsrs or both have been deleted
2367 :return: None. Uses and modify self.tasks_to_delete
2368 """
2369 while self.tasks_to_delete:
2370 task = self.tasks_to_delete[0]
2371 vnfrs_deleted = None
2372 nsr_id = task["nsr_id"]
2373
2374 if task["target_record"].startswith("vnfrs:"):
2375 # check if nsrs is present
2376 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2377 vnfrs_deleted = task["target_record"].split(":")[1]
2378
2379 try:
2380 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2381 except Exception as e:
2382 self.logger.error(
2383 "Error deleting task={}: {}".format(task["task_id"], e)
2384 )
2385 self.tasks_to_delete.pop(0)
2386
2387 @staticmethod
2388 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2389 """
2390 Static method because it is called from osm_ng_ro.ns
2391 :param db: instance of database to use
2392 :param nsr_id: affected nsrs id
2393 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2394 :return: None, exception is fails
2395 """
2396 retries = 5
2397 for retry in range(retries):
2398 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2399 now = time.time()
2400 conflict = False
2401
2402 for ro_task in ro_tasks:
2403 db_update = {}
2404 to_delete_ro_task = True
2405
2406 for index, task in enumerate(ro_task["tasks"]):
2407 if not task:
2408 pass
2409 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2410 vnfrs_deleted
2411 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2412 ):
2413 db_update["tasks.{}".format(index)] = None
2414 else:
2415 # used by other nsr, ro_task cannot be deleted
2416 to_delete_ro_task = False
2417
2418 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2419 if to_delete_ro_task:
2420 if not db.del_one(
2421 "ro_tasks",
2422 q_filter={
2423 "_id": ro_task["_id"],
2424 "modified_at": ro_task["modified_at"],
2425 },
2426 fail_on_empty=False,
2427 ):
2428 conflict = True
2429 elif db_update:
2430 db_update["modified_at"] = now
2431 if not db.set_one(
2432 "ro_tasks",
2433 q_filter={
2434 "_id": ro_task["_id"],
2435 "modified_at": ro_task["modified_at"],
2436 },
2437 update_dict=db_update,
2438 fail_on_empty=False,
2439 ):
2440 conflict = True
2441 if not conflict:
2442 return
2443 else:
2444 raise NsWorkerException("Exceeded {} retries".format(retries))
2445
2446 def run(self):
2447 # load database
2448 self.logger.info("Starting")
2449 while True:
2450 # step 1: get commands from queue
2451 try:
2452 if self.vim_targets:
2453 task = self.task_queue.get(block=False)
2454 else:
2455 if not self.idle:
2456 self.logger.debug("enters in idle state")
2457 self.idle = True
2458 task = self.task_queue.get(block=True)
2459 self.idle = False
2460
2461 if task[0] == "terminate":
2462 break
2463 elif task[0] == "load_vim":
2464 self.logger.info("order to load vim {}".format(task[1]))
2465 self._load_vim(task[1])
2466 elif task[0] == "unload_vim":
2467 self.logger.info("order to unload vim {}".format(task[1]))
2468 self._unload_vim(task[1])
2469 elif task[0] == "reload_vim":
2470 self._reload_vim(task[1])
2471 elif task[0] == "check_vim":
2472 self.logger.info("order to check vim {}".format(task[1]))
2473 self._check_vim(task[1])
2474 continue
2475 except Exception as e:
2476 if isinstance(e, queue.Empty):
2477 pass
2478 else:
2479 self.logger.critical(
2480 "Error processing task: {}".format(e), exc_info=True
2481 )
2482
2483 # step 2: process pending_tasks, delete not needed tasks
2484 try:
2485 if self.tasks_to_delete:
2486 self._process_delete_db_tasks()
2487 busy = False
2488 """
2489 # Log RO tasks only when loglevel is DEBUG
2490 if self.logger.getEffectiveLevel() == logging.DEBUG:
2491 _ = self._get_db_all_tasks()
2492 """
2493 ro_task = self._get_db_task()
2494 if ro_task:
2495 self._process_pending_tasks(ro_task)
2496 busy = True
2497 if not busy:
2498 time.sleep(5)
2499 except Exception as e:
2500 self.logger.critical(
2501 "Unexpected exception at run: " + str(e), exc_info=True
2502 )
2503
2504 self.logger.info("Finishing")