Fix Bug 2022: RO updating VIM details with null info when VM is in error state
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45 __author__ = "Alfonso Tierno"
46 __date__ = "$28-Sep-2017 12:07:15$"
47
48
49 def deep_get(target_dict, *args, **kwargs):
50 """
51 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
52 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
53 :param target_dict: dictionary to be read
54 :param args: list of keys to read from target_dict
55 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
56 :return: The wanted value if exist, None or default otherwise
57 """
58 for key in args:
59 if not isinstance(target_dict, dict) or key not in target_dict:
60 return kwargs.get("default")
61 target_dict = target_dict[key]
62 return target_dict
63
64
65 class NsWorkerException(Exception):
66 pass
67
68
69 class FailingConnector:
70 def __init__(self, error_msg):
71 self.error_msg = error_msg
72
73 for method in dir(vimconn.VimConnector):
74 if method[0] != "_":
75 setattr(
76 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
77 )
78
79 for method in dir(sdnconn.SdnConnectorBase):
80 if method[0] != "_":
81 setattr(
82 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
83 )
84
85
86 class NsWorkerExceptionNotFound(NsWorkerException):
87 pass
88
89
90 class VimInteractionBase:
91 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
92 It implements methods that does nothing and return ok"""
93
94 def __init__(self, db, my_vims, db_vims, logger):
95 self.db = db
96 self.logger = logger
97 self.my_vims = my_vims
98 self.db_vims = db_vims
99
100 def new(self, ro_task, task_index, task_depends):
101 return "BUILD", {}
102
103 def refresh(self, ro_task):
104 """skip calling VIM to get image, flavor status. Assumes ok"""
105 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
106 return "FAILED", {}
107
108 return "DONE", {}
109
110 def delete(self, ro_task, task_index):
111 """skip calling VIM to delete image. Assumes ok"""
112 return "DONE", {}
113
114 def exec(self, ro_task, task_index, task_depends):
115 return "DONE", None, None
116
117
118 class VimInteractionNet(VimInteractionBase):
119 def new(self, ro_task, task_index, task_depends):
120 vim_net_id = None
121 task = ro_task["tasks"][task_index]
122 task_id = task["task_id"]
123 created = False
124 created_items = {}
125 target_vim = self.my_vims[ro_task["target_id"]]
126 mgmtnet = False
127 mgmtnet_defined_in_vim = False
128
129 try:
130 # FIND
131 if task.get("find_params"):
132 # if management, get configuration of VIM
133 if task["find_params"].get("filter_dict"):
134 vim_filter = task["find_params"]["filter_dict"]
135 # management network
136 elif task["find_params"].get("mgmt"):
137 mgmtnet = True
138 if deep_get(
139 self.db_vims[ro_task["target_id"]],
140 "config",
141 "management_network_id",
142 ):
143 mgmtnet_defined_in_vim = True
144 vim_filter = {
145 "id": self.db_vims[ro_task["target_id"]]["config"][
146 "management_network_id"
147 ]
148 }
149 elif deep_get(
150 self.db_vims[ro_task["target_id"]],
151 "config",
152 "management_network_name",
153 ):
154 mgmtnet_defined_in_vim = True
155 vim_filter = {
156 "name": self.db_vims[ro_task["target_id"]]["config"][
157 "management_network_name"
158 ]
159 }
160 else:
161 vim_filter = {"name": task["find_params"]["name"]}
162 else:
163 raise NsWorkerExceptionNotFound(
164 "Invalid find_params for new_net {}".format(task["find_params"])
165 )
166
167 vim_nets = target_vim.get_network_list(vim_filter)
168 if not vim_nets and not task.get("params"):
169 # If there is mgmt-network in the descriptor,
170 # there is no mapping of that network to a VIM network in the descriptor,
171 # also there is no mapping in the "--config" parameter or at VIM creation;
172 # that mgmt-network will be created.
173 if mgmtnet and not mgmtnet_defined_in_vim:
174 net_name = (
175 vim_filter.get("name")
176 if vim_filter.get("name")
177 else vim_filter.get("id")[:16]
178 )
179 vim_net_id, created_items = target_vim.new_network(
180 net_name, None
181 )
182 self.logger.debug(
183 "Created mgmt network vim_net_id: {}".format(vim_net_id)
184 )
185 created = True
186 else:
187 raise NsWorkerExceptionNotFound(
188 "Network not found with this criteria: '{}'".format(
189 task.get("find_params")
190 )
191 )
192 elif len(vim_nets) > 1:
193 raise NsWorkerException(
194 "More than one network found with this criteria: '{}'".format(
195 task["find_params"]
196 )
197 )
198
199 if vim_nets:
200 vim_net_id = vim_nets[0]["id"]
201 else:
202 # CREATE
203 params = task["params"]
204 vim_net_id, created_items = target_vim.new_network(**params)
205 created = True
206
207 ro_vim_item_update = {
208 "vim_id": vim_net_id,
209 "vim_status": "BUILD",
210 "created": created,
211 "created_items": created_items,
212 "vim_details": None,
213 "vim_message": None,
214 }
215 self.logger.debug(
216 "task={} {} new-net={} created={}".format(
217 task_id, ro_task["target_id"], vim_net_id, created
218 )
219 )
220
221 return "BUILD", ro_vim_item_update
222 except (vimconn.VimConnException, NsWorkerException) as e:
223 self.logger.error(
224 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
225 )
226 ro_vim_item_update = {
227 "vim_status": "VIM_ERROR",
228 "created": created,
229 "vim_message": str(e),
230 }
231
232 return "FAILED", ro_vim_item_update
233
234 def refresh(self, ro_task):
235 """Call VIM to get network status"""
236 ro_task_id = ro_task["_id"]
237 target_vim = self.my_vims[ro_task["target_id"]]
238 vim_id = ro_task["vim_info"]["vim_id"]
239 net_to_refresh_list = [vim_id]
240
241 try:
242 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
243 vim_info = vim_dict[vim_id]
244
245 if vim_info["status"] == "ACTIVE":
246 task_status = "DONE"
247 elif vim_info["status"] == "BUILD":
248 task_status = "BUILD"
249 else:
250 task_status = "FAILED"
251 except vimconn.VimConnException as e:
252 # Mark all tasks at VIM_ERROR status
253 self.logger.error(
254 "ro_task={} vim={} get-net={}: {}".format(
255 ro_task_id, ro_task["target_id"], vim_id, e
256 )
257 )
258 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
259 task_status = "FAILED"
260
261 ro_vim_item_update = {}
262 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
263 ro_vim_item_update["vim_status"] = vim_info["status"]
264
265 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
266 ro_vim_item_update["vim_name"] = vim_info.get("name")
267
268 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
269 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
270 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
271 elif vim_info["status"] == "DELETED":
272 ro_vim_item_update["vim_id"] = None
273 ro_vim_item_update["vim_message"] = "Deleted externally"
274 else:
275 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
276 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
277
278 if ro_vim_item_update:
279 self.logger.debug(
280 "ro_task={} {} get-net={}: status={} {}".format(
281 ro_task_id,
282 ro_task["target_id"],
283 vim_id,
284 ro_vim_item_update.get("vim_status"),
285 ro_vim_item_update.get("vim_message")
286 if ro_vim_item_update.get("vim_status") != "ACTIVE"
287 else "",
288 )
289 )
290
291 return task_status, ro_vim_item_update
292
293 def delete(self, ro_task, task_index):
294 task = ro_task["tasks"][task_index]
295 task_id = task["task_id"]
296 net_vim_id = ro_task["vim_info"]["vim_id"]
297 ro_vim_item_update_ok = {
298 "vim_status": "DELETED",
299 "created": False,
300 "vim_message": "DELETED",
301 "vim_id": None,
302 }
303
304 try:
305 if net_vim_id or ro_task["vim_info"]["created_items"]:
306 target_vim = self.my_vims[ro_task["target_id"]]
307 target_vim.delete_network(
308 net_vim_id, ro_task["vim_info"]["created_items"]
309 )
310 except vimconn.VimConnNotFoundException:
311 ro_vim_item_update_ok["vim_message"] = "already deleted"
312 except vimconn.VimConnException as e:
313 self.logger.error(
314 "ro_task={} vim={} del-net={}: {}".format(
315 ro_task["_id"], ro_task["target_id"], net_vim_id, e
316 )
317 )
318 ro_vim_item_update = {
319 "vim_status": "VIM_ERROR",
320 "vim_message": "Error while deleting: {}".format(e),
321 }
322
323 return "FAILED", ro_vim_item_update
324
325 self.logger.debug(
326 "task={} {} del-net={} {}".format(
327 task_id,
328 ro_task["target_id"],
329 net_vim_id,
330 ro_vim_item_update_ok.get("vim_message", ""),
331 )
332 )
333
334 return "DONE", ro_vim_item_update_ok
335
336
337 class VimInteractionVdu(VimInteractionBase):
338 max_retries_inject_ssh_key = 20 # 20 times
339 time_retries_inject_ssh_key = 30 # wevery 30 seconds
340
341 def new(self, ro_task, task_index, task_depends):
342 task = ro_task["tasks"][task_index]
343 task_id = task["task_id"]
344 created = False
345 created_items = {}
346 target_vim = self.my_vims[ro_task["target_id"]]
347
348 try:
349 created = True
350 params = task["params"]
351 params_copy = deepcopy(params)
352 net_list = params_copy["net_list"]
353
354 for net in net_list:
355 # change task_id into network_id
356 if "net_id" in net and net["net_id"].startswith("TASK-"):
357 network_id = task_depends[net["net_id"]]
358
359 if not network_id:
360 raise NsWorkerException(
361 "Cannot create VM because depends on a network not created or found "
362 "for {}".format(net["net_id"])
363 )
364
365 net["net_id"] = network_id
366
367 if params_copy["image_id"].startswith("TASK-"):
368 params_copy["image_id"] = task_depends[params_copy["image_id"]]
369
370 if params_copy["flavor_id"].startswith("TASK-"):
371 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
372
373 affinity_group_list = params_copy["affinity_group_list"]
374 for affinity_group in affinity_group_list:
375 # change task_id into affinity_group_id
376 if "affinity_group_id" in affinity_group and affinity_group[
377 "affinity_group_id"
378 ].startswith("TASK-"):
379 affinity_group_id = task_depends[
380 affinity_group["affinity_group_id"]
381 ]
382
383 if not affinity_group_id:
384 raise NsWorkerException(
385 "found for {}".format(affinity_group["affinity_group_id"])
386 )
387
388 affinity_group["affinity_group_id"] = affinity_group_id
389
390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
392
393 ro_vim_item_update = {
394 "vim_id": vim_vm_id,
395 "vim_status": "BUILD",
396 "created": created,
397 "created_items": created_items,
398 "vim_details": None,
399 "vim_message": None,
400 "interfaces_vim_ids": interfaces,
401 "interfaces": [],
402 }
403 self.logger.debug(
404 "task={} {} new-vm={} created={}".format(
405 task_id, ro_task["target_id"], vim_vm_id, created
406 )
407 )
408
409 return "BUILD", ro_vim_item_update
410 except (vimconn.VimConnException, NsWorkerException) as e:
411 self.logger.error(
412 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
413 )
414 ro_vim_item_update = {
415 "vim_status": "VIM_ERROR",
416 "created": created,
417 "vim_message": str(e),
418 }
419
420 return "FAILED", ro_vim_item_update
421
422 def delete(self, ro_task, task_index):
423 task = ro_task["tasks"][task_index]
424 task_id = task["task_id"]
425 vm_vim_id = ro_task["vim_info"]["vim_id"]
426 ro_vim_item_update_ok = {
427 "vim_status": "DELETED",
428 "created": False,
429 "vim_message": "DELETED",
430 "vim_id": None,
431 }
432
433 try:
434 self.logger.debug(
435 "delete_vminstance: vm_vim_id={} created_items={}".format(
436 vm_vim_id, ro_task["vim_info"]["created_items"]
437 )
438 )
439 if vm_vim_id or ro_task["vim_info"]["created_items"]:
440 target_vim = self.my_vims[ro_task["target_id"]]
441 target_vim.delete_vminstance(
442 vm_vim_id,
443 ro_task["vim_info"]["created_items"],
444 ro_task["vim_info"].get("volumes_to_hold", []),
445 )
446 except vimconn.VimConnNotFoundException:
447 ro_vim_item_update_ok["vim_message"] = "already deleted"
448 except vimconn.VimConnException as e:
449 self.logger.error(
450 "ro_task={} vim={} del-vm={}: {}".format(
451 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
452 )
453 )
454 ro_vim_item_update = {
455 "vim_status": "VIM_ERROR",
456 "vim_message": "Error while deleting: {}".format(e),
457 }
458
459 return "FAILED", ro_vim_item_update
460
461 self.logger.debug(
462 "task={} {} del-vm={} {}".format(
463 task_id,
464 ro_task["target_id"],
465 vm_vim_id,
466 ro_vim_item_update_ok.get("vim_message", ""),
467 )
468 )
469
470 return "DONE", ro_vim_item_update_ok
471
472 def refresh(self, ro_task):
473 """Call VIM to get vm status"""
474 ro_task_id = ro_task["_id"]
475 target_vim = self.my_vims[ro_task["target_id"]]
476 vim_id = ro_task["vim_info"]["vim_id"]
477
478 if not vim_id:
479 return None, None
480
481 vm_to_refresh_list = [vim_id]
482 try:
483 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
484 vim_info = vim_dict[vim_id]
485
486 if vim_info["status"] == "ACTIVE":
487 task_status = "DONE"
488 elif vim_info["status"] == "BUILD":
489 task_status = "BUILD"
490 else:
491 task_status = "FAILED"
492
493 # try to load and parse vim_information
494 try:
495 vim_info_info = yaml.safe_load(vim_info["vim_info"])
496 if vim_info_info.get("name"):
497 vim_info["name"] = vim_info_info["name"]
498 except Exception:
499 pass
500 except vimconn.VimConnException as e:
501 # Mark all tasks at VIM_ERROR status
502 self.logger.error(
503 "ro_task={} vim={} get-vm={}: {}".format(
504 ro_task_id, ro_task["target_id"], vim_id, e
505 )
506 )
507 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
508 task_status = "FAILED"
509
510 ro_vim_item_update = {}
511
512 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
513 vim_interfaces = []
514 if vim_info.get("interfaces"):
515 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
516 iface = next(
517 (
518 iface
519 for iface in vim_info["interfaces"]
520 if vim_iface_id == iface["vim_interface_id"]
521 ),
522 None,
523 )
524 # if iface:
525 # iface.pop("vim_info", None)
526 vim_interfaces.append(iface)
527
528 task_create = next(
529 t
530 for t in ro_task["tasks"]
531 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
532 )
533 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
534 vim_interfaces[task_create["mgmt_vnf_interface"]][
535 "mgmt_vnf_interface"
536 ] = True
537
538 mgmt_vdu_iface = task_create.get(
539 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
540 )
541 if vim_interfaces:
542 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
543
544 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
545 ro_vim_item_update["interfaces"] = vim_interfaces
546
547 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
548 ro_vim_item_update["vim_status"] = vim_info["status"]
549
550 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
551 ro_vim_item_update["vim_name"] = vim_info.get("name")
552
553 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
554 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
555 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
556 elif vim_info["status"] == "DELETED":
557 ro_vim_item_update["vim_id"] = None
558 ro_vim_item_update["vim_message"] = "Deleted externally"
559 else:
560 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
561 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
562
563 if ro_vim_item_update:
564 self.logger.debug(
565 "ro_task={} {} get-vm={}: status={} {}".format(
566 ro_task_id,
567 ro_task["target_id"],
568 vim_id,
569 ro_vim_item_update.get("vim_status"),
570 ro_vim_item_update.get("vim_message")
571 if ro_vim_item_update.get("vim_status") != "ACTIVE"
572 else "",
573 )
574 )
575
576 return task_status, ro_vim_item_update
577
578 def exec(self, ro_task, task_index, task_depends):
579 task = ro_task["tasks"][task_index]
580 task_id = task["task_id"]
581 target_vim = self.my_vims[ro_task["target_id"]]
582 db_task_update = {"retries": 0}
583 retries = task.get("retries", 0)
584
585 try:
586 params = task["params"]
587 params_copy = deepcopy(params)
588 params_copy["ro_key"] = self.db.decrypt(
589 params_copy.pop("private_key"),
590 params_copy.pop("schema_version"),
591 params_copy.pop("salt"),
592 )
593 params_copy["ip_addr"] = params_copy.pop("ip_address")
594 target_vim.inject_user_key(**params_copy)
595 self.logger.debug(
596 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
597 )
598
599 return (
600 "DONE",
601 None,
602 db_task_update,
603 ) # params_copy["key"]
604 except (vimconn.VimConnException, NsWorkerException) as e:
605 retries += 1
606
607 if retries < self.max_retries_inject_ssh_key:
608 return (
609 "BUILD",
610 None,
611 {
612 "retries": retries,
613 "next_retry": self.time_retries_inject_ssh_key,
614 },
615 )
616
617 self.logger.error(
618 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
619 )
620 ro_vim_item_update = {"vim_message": str(e)}
621
622 return "FAILED", ro_vim_item_update, db_task_update
623
624
625 class VimInteractionImage(VimInteractionBase):
626 def new(self, ro_task, task_index, task_depends):
627 task = ro_task["tasks"][task_index]
628 task_id = task["task_id"]
629 created = False
630 created_items = {}
631 target_vim = self.my_vims[ro_task["target_id"]]
632
633 try:
634 # FIND
635 if task.get("find_params"):
636 vim_images = target_vim.get_image_list(**task["find_params"])
637
638 if not vim_images:
639 raise NsWorkerExceptionNotFound(
640 "Image not found with this criteria: '{}'".format(
641 task["find_params"]
642 )
643 )
644 elif len(vim_images) > 1:
645 raise NsWorkerException(
646 "More than one image found with this criteria: '{}'".format(
647 task["find_params"]
648 )
649 )
650 else:
651 vim_image_id = vim_images[0]["id"]
652
653 ro_vim_item_update = {
654 "vim_id": vim_image_id,
655 "vim_status": "DONE",
656 "created": created,
657 "created_items": created_items,
658 "vim_details": None,
659 "vim_message": None,
660 }
661 self.logger.debug(
662 "task={} {} new-image={} created={}".format(
663 task_id, ro_task["target_id"], vim_image_id, created
664 )
665 )
666
667 return "DONE", ro_vim_item_update
668 except (NsWorkerException, vimconn.VimConnException) as e:
669 self.logger.error(
670 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
671 )
672 ro_vim_item_update = {
673 "vim_status": "VIM_ERROR",
674 "created": created,
675 "vim_message": str(e),
676 }
677
678 return "FAILED", ro_vim_item_update
679
680
681 class VimInteractionFlavor(VimInteractionBase):
682 def delete(self, ro_task, task_index):
683 task = ro_task["tasks"][task_index]
684 task_id = task["task_id"]
685 flavor_vim_id = ro_task["vim_info"]["vim_id"]
686 ro_vim_item_update_ok = {
687 "vim_status": "DELETED",
688 "created": False,
689 "vim_message": "DELETED",
690 "vim_id": None,
691 }
692
693 try:
694 if flavor_vim_id:
695 target_vim = self.my_vims[ro_task["target_id"]]
696 target_vim.delete_flavor(flavor_vim_id)
697 except vimconn.VimConnNotFoundException:
698 ro_vim_item_update_ok["vim_message"] = "already deleted"
699 except vimconn.VimConnException as e:
700 self.logger.error(
701 "ro_task={} vim={} del-flavor={}: {}".format(
702 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
703 )
704 )
705 ro_vim_item_update = {
706 "vim_status": "VIM_ERROR",
707 "vim_message": "Error while deleting: {}".format(e),
708 }
709
710 return "FAILED", ro_vim_item_update
711
712 self.logger.debug(
713 "task={} {} del-flavor={} {}".format(
714 task_id,
715 ro_task["target_id"],
716 flavor_vim_id,
717 ro_vim_item_update_ok.get("vim_message", ""),
718 )
719 )
720
721 return "DONE", ro_vim_item_update_ok
722
723 def new(self, ro_task, task_index, task_depends):
724 task = ro_task["tasks"][task_index]
725 task_id = task["task_id"]
726 created = False
727 created_items = {}
728 target_vim = self.my_vims[ro_task["target_id"]]
729
730 try:
731 # FIND
732 vim_flavor_id = None
733
734 if task.get("find_params"):
735 try:
736 flavor_data = task["find_params"]["flavor_data"]
737 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
738 except vimconn.VimConnNotFoundException:
739 pass
740
741 if not vim_flavor_id and task.get("params"):
742 # CREATE
743 flavor_data = task["params"]["flavor_data"]
744 vim_flavor_id = target_vim.new_flavor(flavor_data)
745 created = True
746
747 ro_vim_item_update = {
748 "vim_id": vim_flavor_id,
749 "vim_status": "DONE",
750 "created": created,
751 "created_items": created_items,
752 "vim_details": None,
753 "vim_message": None,
754 }
755 self.logger.debug(
756 "task={} {} new-flavor={} created={}".format(
757 task_id, ro_task["target_id"], vim_flavor_id, created
758 )
759 )
760
761 return "DONE", ro_vim_item_update
762 except (vimconn.VimConnException, NsWorkerException) as e:
763 self.logger.error(
764 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
765 )
766 ro_vim_item_update = {
767 "vim_status": "VIM_ERROR",
768 "created": created,
769 "vim_message": str(e),
770 }
771
772 return "FAILED", ro_vim_item_update
773
774
775 class VimInteractionAffinityGroup(VimInteractionBase):
776 def delete(self, ro_task, task_index):
777 task = ro_task["tasks"][task_index]
778 task_id = task["task_id"]
779 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
780 ro_vim_item_update_ok = {
781 "vim_status": "DELETED",
782 "created": False,
783 "vim_message": "DELETED",
784 "vim_id": None,
785 }
786
787 try:
788 if affinity_group_vim_id:
789 target_vim = self.my_vims[ro_task["target_id"]]
790 target_vim.delete_affinity_group(affinity_group_vim_id)
791 except vimconn.VimConnNotFoundException:
792 ro_vim_item_update_ok["vim_message"] = "already deleted"
793 except vimconn.VimConnException as e:
794 self.logger.error(
795 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
796 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
797 )
798 )
799 ro_vim_item_update = {
800 "vim_status": "VIM_ERROR",
801 "vim_message": "Error while deleting: {}".format(e),
802 }
803
804 return "FAILED", ro_vim_item_update
805
806 self.logger.debug(
807 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
808 task_id,
809 ro_task["target_id"],
810 affinity_group_vim_id,
811 ro_vim_item_update_ok.get("vim_message", ""),
812 )
813 )
814
815 return "DONE", ro_vim_item_update_ok
816
817 def new(self, ro_task, task_index, task_depends):
818 task = ro_task["tasks"][task_index]
819 task_id = task["task_id"]
820 created = False
821 created_items = {}
822 target_vim = self.my_vims[ro_task["target_id"]]
823
824 try:
825 affinity_group_vim_id = None
826 affinity_group_data = None
827
828 if task.get("params"):
829 affinity_group_data = task["params"].get("affinity_group_data")
830
831 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
832 try:
833 param_affinity_group_id = task["params"]["affinity_group_data"].get(
834 "vim-affinity-group-id"
835 )
836 affinity_group_vim_id = target_vim.get_affinity_group(
837 param_affinity_group_id
838 ).get("id")
839 except vimconn.VimConnNotFoundException:
840 self.logger.error(
841 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
842 "could not be found at VIM. Creating a new one.".format(
843 task_id, ro_task["target_id"], param_affinity_group_id
844 )
845 )
846
847 if not affinity_group_vim_id and affinity_group_data:
848 affinity_group_vim_id = target_vim.new_affinity_group(
849 affinity_group_data
850 )
851 created = True
852
853 ro_vim_item_update = {
854 "vim_id": affinity_group_vim_id,
855 "vim_status": "DONE",
856 "created": created,
857 "created_items": created_items,
858 "vim_details": None,
859 "vim_message": None,
860 }
861 self.logger.debug(
862 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
863 task_id, ro_task["target_id"], affinity_group_vim_id, created
864 )
865 )
866
867 return "DONE", ro_vim_item_update
868 except (vimconn.VimConnException, NsWorkerException) as e:
869 self.logger.error(
870 "task={} vim={} new-affinity-or-anti-affinity-group:"
871 " {}".format(task_id, ro_task["target_id"], e)
872 )
873 ro_vim_item_update = {
874 "vim_status": "VIM_ERROR",
875 "created": created,
876 "vim_message": str(e),
877 }
878
879 return "FAILED", ro_vim_item_update
880
881
882 class VimInteractionSdnNet(VimInteractionBase):
883 @staticmethod
884 def _match_pci(port_pci, mapping):
885 """
886 Check if port_pci matches with mapping
887 mapping can have brackets to indicate that several chars are accepted. e.g
888 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
889 :param port_pci: text
890 :param mapping: text, can contain brackets to indicate several chars are available
891 :return: True if matches, False otherwise
892 """
893 if not port_pci or not mapping:
894 return False
895 if port_pci == mapping:
896 return True
897
898 mapping_index = 0
899 pci_index = 0
900 while True:
901 bracket_start = mapping.find("[", mapping_index)
902
903 if bracket_start == -1:
904 break
905
906 bracket_end = mapping.find("]", bracket_start)
907 if bracket_end == -1:
908 break
909
910 length = bracket_start - mapping_index
911 if (
912 length
913 and port_pci[pci_index : pci_index + length]
914 != mapping[mapping_index:bracket_start]
915 ):
916 return False
917
918 if (
919 port_pci[pci_index + length]
920 not in mapping[bracket_start + 1 : bracket_end]
921 ):
922 return False
923
924 pci_index += length + 1
925 mapping_index = bracket_end + 1
926
927 if port_pci[pci_index:] != mapping[mapping_index:]:
928 return False
929
930 return True
931
932 def _get_interfaces(self, vlds_to_connect, vim_account_id):
933 """
934 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
935 :param vim_account_id:
936 :return:
937 """
938 interfaces = []
939
940 for vld in vlds_to_connect:
941 table, _, db_id = vld.partition(":")
942 db_id, _, vld = db_id.partition(":")
943 _, _, vld_id = vld.partition(".")
944
945 if table == "vnfrs":
946 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
947 iface_key = "vnf-vld-id"
948 else: # table == "nsrs"
949 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
950 iface_key = "ns-vld-id"
951
952 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
953
954 for db_vnfr in db_vnfrs:
955 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
956 for iface_index, interface in enumerate(vdur["interfaces"]):
957 if interface.get(iface_key) == vld_id and interface.get(
958 "type"
959 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
960 # only SR-IOV o PT
961 interface_ = interface.copy()
962 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
963 db_vnfr["_id"], vdu_index, iface_index
964 )
965
966 if vdur.get("status") == "ERROR":
967 interface_["status"] = "ERROR"
968
969 interfaces.append(interface_)
970
971 return interfaces
972
973 def refresh(self, ro_task):
974 # look for task create
975 task_create_index, _ = next(
976 i_t
977 for i_t in enumerate(ro_task["tasks"])
978 if i_t[1]
979 and i_t[1]["action"] == "CREATE"
980 and i_t[1]["status"] != "FINISHED"
981 )
982
983 return self.new(ro_task, task_create_index, None)
984
985 def new(self, ro_task, task_index, task_depends):
986
987 task = ro_task["tasks"][task_index]
988 task_id = task["task_id"]
989 target_vim = self.my_vims[ro_task["target_id"]]
990
991 sdn_net_id = ro_task["vim_info"]["vim_id"]
992
993 created_items = ro_task["vim_info"].get("created_items")
994 connected_ports = ro_task["vim_info"].get("connected_ports", [])
995 new_connected_ports = []
996 last_update = ro_task["vim_info"].get("last_update", 0)
997 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
998 error_list = []
999 created = ro_task["vim_info"].get("created", False)
1000
1001 try:
1002 # CREATE
1003 params = task["params"]
1004 vlds_to_connect = params["vlds"]
1005 associated_vim = params["target_vim"]
1006 # external additional ports
1007 additional_ports = params.get("sdn-ports") or ()
1008 _, _, vim_account_id = associated_vim.partition(":")
1009
1010 if associated_vim:
1011 # get associated VIM
1012 if associated_vim not in self.db_vims:
1013 self.db_vims[associated_vim] = self.db.get_one(
1014 "vim_accounts", {"_id": vim_account_id}
1015 )
1016
1017 db_vim = self.db_vims[associated_vim]
1018
1019 # look for ports to connect
1020 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1021 # print(ports)
1022
1023 sdn_ports = []
1024 pending_ports = error_ports = 0
1025 vlan_used = None
1026 sdn_need_update = False
1027
1028 for port in ports:
1029 vlan_used = port.get("vlan") or vlan_used
1030
1031 # TODO. Do not connect if already done
1032 if not port.get("compute_node") or not port.get("pci"):
1033 if port.get("status") == "ERROR":
1034 error_ports += 1
1035 else:
1036 pending_ports += 1
1037 continue
1038
1039 pmap = None
1040 compute_node_mappings = next(
1041 (
1042 c
1043 for c in db_vim["config"].get("sdn-port-mapping", ())
1044 if c and c["compute_node"] == port["compute_node"]
1045 ),
1046 None,
1047 )
1048
1049 if compute_node_mappings:
1050 # process port_mapping pci of type 0000:af:1[01].[1357]
1051 pmap = next(
1052 (
1053 p
1054 for p in compute_node_mappings["ports"]
1055 if self._match_pci(port["pci"], p.get("pci"))
1056 ),
1057 None,
1058 )
1059
1060 if not pmap:
1061 if not db_vim["config"].get("mapping_not_needed"):
1062 error_list.append(
1063 "Port mapping not found for compute_node={} pci={}".format(
1064 port["compute_node"], port["pci"]
1065 )
1066 )
1067 continue
1068
1069 pmap = {}
1070
1071 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1072 new_port = {
1073 "service_endpoint_id": pmap.get("service_endpoint_id")
1074 or service_endpoint_id,
1075 "service_endpoint_encapsulation_type": "dot1q"
1076 if port["type"] == "SR-IOV"
1077 else None,
1078 "service_endpoint_encapsulation_info": {
1079 "vlan": port.get("vlan"),
1080 "mac": port.get("mac-address"),
1081 "device_id": pmap.get("device_id") or port["compute_node"],
1082 "device_interface_id": pmap.get("device_interface_id")
1083 or port["pci"],
1084 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1085 "switch_port": pmap.get("switch_port"),
1086 "service_mapping_info": pmap.get("service_mapping_info"),
1087 },
1088 }
1089
1090 # TODO
1091 # if port["modified_at"] > last_update:
1092 # sdn_need_update = True
1093 new_connected_ports.append(port["id"]) # TODO
1094 sdn_ports.append(new_port)
1095
1096 if error_ports:
1097 error_list.append(
1098 "{} interfaces have not been created as VDU is on ERROR status".format(
1099 error_ports
1100 )
1101 )
1102
1103 # connect external ports
1104 for index, additional_port in enumerate(additional_ports):
1105 additional_port_id = additional_port.get(
1106 "service_endpoint_id"
1107 ) or "external-{}".format(index)
1108 sdn_ports.append(
1109 {
1110 "service_endpoint_id": additional_port_id,
1111 "service_endpoint_encapsulation_type": additional_port.get(
1112 "service_endpoint_encapsulation_type", "dot1q"
1113 ),
1114 "service_endpoint_encapsulation_info": {
1115 "vlan": additional_port.get("vlan") or vlan_used,
1116 "mac": additional_port.get("mac_address"),
1117 "device_id": additional_port.get("device_id"),
1118 "device_interface_id": additional_port.get(
1119 "device_interface_id"
1120 ),
1121 "switch_dpid": additional_port.get("switch_dpid")
1122 or additional_port.get("switch_id"),
1123 "switch_port": additional_port.get("switch_port"),
1124 "service_mapping_info": additional_port.get(
1125 "service_mapping_info"
1126 ),
1127 },
1128 }
1129 )
1130 new_connected_ports.append(additional_port_id)
1131 sdn_info = ""
1132
1133 # if there are more ports to connect or they have been modified, call create/update
1134 if error_list:
1135 sdn_status = "ERROR"
1136 sdn_info = "; ".join(error_list)
1137 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1138 last_update = time.time()
1139
1140 if not sdn_net_id:
1141 if len(sdn_ports) < 2:
1142 sdn_status = "ACTIVE"
1143
1144 if not pending_ports:
1145 self.logger.debug(
1146 "task={} {} new-sdn-net done, less than 2 ports".format(
1147 task_id, ro_task["target_id"]
1148 )
1149 )
1150 else:
1151 net_type = params.get("type") or "ELAN"
1152 (
1153 sdn_net_id,
1154 created_items,
1155 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1156 created = True
1157 self.logger.debug(
1158 "task={} {} new-sdn-net={} created={}".format(
1159 task_id, ro_task["target_id"], sdn_net_id, created
1160 )
1161 )
1162 else:
1163 created_items = target_vim.edit_connectivity_service(
1164 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1165 )
1166 created = True
1167 self.logger.debug(
1168 "task={} {} update-sdn-net={} created={}".format(
1169 task_id, ro_task["target_id"], sdn_net_id, created
1170 )
1171 )
1172
1173 connected_ports = new_connected_ports
1174 elif sdn_net_id:
1175 wim_status_dict = target_vim.get_connectivity_service_status(
1176 sdn_net_id, conn_info=created_items
1177 )
1178 sdn_status = wim_status_dict["sdn_status"]
1179
1180 if wim_status_dict.get("sdn_info"):
1181 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1182
1183 if wim_status_dict.get("error_msg"):
1184 sdn_info = wim_status_dict.get("error_msg") or ""
1185
1186 if pending_ports:
1187 if sdn_status != "ERROR":
1188 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1189 len(ports) - pending_ports, len(ports)
1190 )
1191
1192 if sdn_status == "ACTIVE":
1193 sdn_status = "BUILD"
1194
1195 ro_vim_item_update = {
1196 "vim_id": sdn_net_id,
1197 "vim_status": sdn_status,
1198 "created": created,
1199 "created_items": created_items,
1200 "connected_ports": connected_ports,
1201 "vim_details": sdn_info,
1202 "vim_message": None,
1203 "last_update": last_update,
1204 }
1205
1206 return sdn_status, ro_vim_item_update
1207 except Exception as e:
1208 self.logger.error(
1209 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1210 exc_info=not isinstance(
1211 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1212 ),
1213 )
1214 ro_vim_item_update = {
1215 "vim_status": "VIM_ERROR",
1216 "created": created,
1217 "vim_message": str(e),
1218 }
1219
1220 return "FAILED", ro_vim_item_update
1221
1222 def delete(self, ro_task, task_index):
1223 task = ro_task["tasks"][task_index]
1224 task_id = task["task_id"]
1225 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1226 ro_vim_item_update_ok = {
1227 "vim_status": "DELETED",
1228 "created": False,
1229 "vim_message": "DELETED",
1230 "vim_id": None,
1231 }
1232
1233 try:
1234 if sdn_vim_id:
1235 target_vim = self.my_vims[ro_task["target_id"]]
1236 target_vim.delete_connectivity_service(
1237 sdn_vim_id, ro_task["vim_info"].get("created_items")
1238 )
1239
1240 except Exception as e:
1241 if (
1242 isinstance(e, sdnconn.SdnConnectorError)
1243 and e.http_code == HTTPStatus.NOT_FOUND.value
1244 ):
1245 ro_vim_item_update_ok["vim_message"] = "already deleted"
1246 else:
1247 self.logger.error(
1248 "ro_task={} vim={} del-sdn-net={}: {}".format(
1249 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1250 ),
1251 exc_info=not isinstance(
1252 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1253 ),
1254 )
1255 ro_vim_item_update = {
1256 "vim_status": "VIM_ERROR",
1257 "vim_message": "Error while deleting: {}".format(e),
1258 }
1259
1260 return "FAILED", ro_vim_item_update
1261
1262 self.logger.debug(
1263 "task={} {} del-sdn-net={} {}".format(
1264 task_id,
1265 ro_task["target_id"],
1266 sdn_vim_id,
1267 ro_vim_item_update_ok.get("vim_message", ""),
1268 )
1269 )
1270
1271 return "DONE", ro_vim_item_update_ok
1272
1273
1274 class NsWorker(threading.Thread):
1275 REFRESH_BUILD = 5 # 5 seconds
1276 REFRESH_ACTIVE = 60 # 1 minute
1277 REFRESH_ERROR = 600
1278 REFRESH_IMAGE = 3600 * 10
1279 REFRESH_DELETE = 3600 * 10
1280 QUEUE_SIZE = 100
1281 terminate = False
1282
1283 def __init__(self, worker_index, config, plugins, db):
1284 """
1285
1286 :param worker_index: thread index
1287 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1288 :param plugins: global shared dict with the loaded plugins
1289 :param db: database class instance to use
1290 """
1291 threading.Thread.__init__(self)
1292 self.config = config
1293 self.plugins = plugins
1294 self.plugin_name = "unknown"
1295 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1296 self.worker_index = worker_index
1297 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1298 # targetvim: vimplugin class
1299 self.my_vims = {}
1300 # targetvim: vim information from database
1301 self.db_vims = {}
1302 # targetvim list
1303 self.vim_targets = []
1304 self.my_id = config["process_id"] + ":" + str(worker_index)
1305 self.db = db
1306 self.item2class = {
1307 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1308 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1309 "image": VimInteractionImage(
1310 self.db, self.my_vims, self.db_vims, self.logger
1311 ),
1312 "flavor": VimInteractionFlavor(
1313 self.db, self.my_vims, self.db_vims, self.logger
1314 ),
1315 "sdn_net": VimInteractionSdnNet(
1316 self.db, self.my_vims, self.db_vims, self.logger
1317 ),
1318 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1319 self.db, self.my_vims, self.db_vims, self.logger
1320 ),
1321 }
1322 self.time_last_task_processed = None
1323 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1324 self.tasks_to_delete = []
1325 # it is idle when there are not vim_targets associated
1326 self.idle = True
1327 self.task_locked_time = config["global"]["task_locked_time"]
1328
1329 def insert_task(self, task):
1330 try:
1331 self.task_queue.put(task, False)
1332 return None
1333 except queue.Full:
1334 raise NsWorkerException("timeout inserting a task")
1335
1336 def terminate(self):
1337 self.insert_task("exit")
1338
1339 def del_task(self, task):
1340 with self.task_lock:
1341 if task["status"] == "SCHEDULED":
1342 task["status"] = "SUPERSEDED"
1343 return True
1344 else: # task["status"] == "processing"
1345 self.task_lock.release()
1346 return False
1347
1348 def _process_vim_config(self, target_id, db_vim):
1349 """
1350 Process vim config, creating vim configuration files as ca_cert
1351 :param target_id: vim/sdn/wim + id
1352 :param db_vim: Vim dictionary obtained from database
1353 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1354 """
1355 if not db_vim.get("config"):
1356 return
1357
1358 file_name = ""
1359
1360 try:
1361 if db_vim["config"].get("ca_cert_content"):
1362 file_name = "{}:{}".format(target_id, self.worker_index)
1363
1364 try:
1365 mkdir(file_name)
1366 except FileExistsError:
1367 pass
1368
1369 file_name = file_name + "/ca_cert"
1370
1371 with open(file_name, "w") as f:
1372 f.write(db_vim["config"]["ca_cert_content"])
1373 del db_vim["config"]["ca_cert_content"]
1374 db_vim["config"]["ca_cert"] = file_name
1375 except Exception as e:
1376 raise NsWorkerException(
1377 "Error writing to file '{}': {}".format(file_name, e)
1378 )
1379
1380 def _load_plugin(self, name, type="vim"):
1381 # type can be vim or sdn
1382 if "rovim_dummy" not in self.plugins:
1383 self.plugins["rovim_dummy"] = VimDummyConnector
1384
1385 if "rosdn_dummy" not in self.plugins:
1386 self.plugins["rosdn_dummy"] = SdnDummyConnector
1387
1388 if name in self.plugins:
1389 return self.plugins[name]
1390
1391 try:
1392 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1393 self.plugins[name] = ep.load()
1394 except Exception as e:
1395 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1396
1397 if name and name not in self.plugins:
1398 raise NsWorkerException(
1399 "Plugin 'osm_{n}' has not been installed".format(n=name)
1400 )
1401
1402 return self.plugins[name]
1403
1404 def _unload_vim(self, target_id):
1405 """
1406 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1407 :param target_id: Contains type:_id; where type can be 'vim', ...
1408 :return: None.
1409 """
1410 try:
1411 self.db_vims.pop(target_id, None)
1412 self.my_vims.pop(target_id, None)
1413
1414 if target_id in self.vim_targets:
1415 self.vim_targets.remove(target_id)
1416
1417 self.logger.info("Unloaded {}".format(target_id))
1418 rmtree("{}:{}".format(target_id, self.worker_index))
1419 except FileNotFoundError:
1420 pass # this is raised by rmtree if folder does not exist
1421 except Exception as e:
1422 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1423
1424 def _check_vim(self, target_id):
1425 """
1426 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1427 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1428 :return: None.
1429 """
1430 target, _, _id = target_id.partition(":")
1431 now = time.time()
1432 update_dict = {}
1433 unset_dict = {}
1434 op_text = ""
1435 step = ""
1436 loaded = target_id in self.vim_targets
1437 target_database = (
1438 "vim_accounts"
1439 if target == "vim"
1440 else "wim_accounts"
1441 if target == "wim"
1442 else "sdns"
1443 )
1444
1445 try:
1446 step = "Getting {} from db".format(target_id)
1447 db_vim = self.db.get_one(target_database, {"_id": _id})
1448
1449 for op_index, operation in enumerate(
1450 db_vim["_admin"].get("operations", ())
1451 ):
1452 if operation["operationState"] != "PROCESSING":
1453 continue
1454
1455 locked_at = operation.get("locked_at")
1456
1457 if locked_at is not None and locked_at >= now - self.task_locked_time:
1458 # some other thread is doing this operation
1459 return
1460
1461 # lock
1462 op_text = "_admin.operations.{}.".format(op_index)
1463
1464 if not self.db.set_one(
1465 target_database,
1466 q_filter={
1467 "_id": _id,
1468 op_text + "operationState": "PROCESSING",
1469 op_text + "locked_at": locked_at,
1470 },
1471 update_dict={
1472 op_text + "locked_at": now,
1473 "admin.current_operation": op_index,
1474 },
1475 fail_on_empty=False,
1476 ):
1477 return
1478
1479 unset_dict[op_text + "locked_at"] = None
1480 unset_dict["current_operation"] = None
1481 step = "Loading " + target_id
1482 error_text = self._load_vim(target_id)
1483
1484 if not error_text:
1485 step = "Checking connectivity"
1486
1487 if target == "vim":
1488 self.my_vims[target_id].check_vim_connectivity()
1489 else:
1490 self.my_vims[target_id].check_credentials()
1491
1492 update_dict["_admin.operationalState"] = "ENABLED"
1493 update_dict["_admin.detailed-status"] = ""
1494 unset_dict[op_text + "detailed-status"] = None
1495 update_dict[op_text + "operationState"] = "COMPLETED"
1496
1497 return
1498
1499 except Exception as e:
1500 error_text = "{}: {}".format(step, e)
1501 self.logger.error("{} for {}: {}".format(step, target_id, e))
1502
1503 finally:
1504 if update_dict or unset_dict:
1505 if error_text:
1506 update_dict[op_text + "operationState"] = "FAILED"
1507 update_dict[op_text + "detailed-status"] = error_text
1508 unset_dict.pop(op_text + "detailed-status", None)
1509 update_dict["_admin.operationalState"] = "ERROR"
1510 update_dict["_admin.detailed-status"] = error_text
1511
1512 if op_text:
1513 update_dict[op_text + "statusEnteredTime"] = now
1514
1515 self.db.set_one(
1516 target_database,
1517 q_filter={"_id": _id},
1518 update_dict=update_dict,
1519 unset=unset_dict,
1520 fail_on_empty=False,
1521 )
1522
1523 if not loaded:
1524 self._unload_vim(target_id)
1525
1526 def _reload_vim(self, target_id):
1527 if target_id in self.vim_targets:
1528 self._load_vim(target_id)
1529 else:
1530 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1531 # just remove it to force load again next time it is needed
1532 self.db_vims.pop(target_id, None)
1533
1534 def _load_vim(self, target_id):
1535 """
1536 Load or reload a vim_account, sdn_controller or wim_account.
1537 Read content from database, load the plugin if not loaded.
1538 In case of error loading the plugin, it load a failing VIM_connector
1539 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1540 :param target_id: Contains type:_id; where type can be 'vim', ...
1541 :return: None if ok, descriptive text if error
1542 """
1543 target, _, _id = target_id.partition(":")
1544 target_database = (
1545 "vim_accounts"
1546 if target == "vim"
1547 else "wim_accounts"
1548 if target == "wim"
1549 else "sdns"
1550 )
1551 plugin_name = ""
1552 vim = None
1553
1554 try:
1555 step = "Getting {}={} from db".format(target, _id)
1556 # TODO process for wim, sdnc, ...
1557 vim = self.db.get_one(target_database, {"_id": _id})
1558
1559 # if deep_get(vim, "config", "sdn-controller"):
1560 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1561 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1562
1563 step = "Decrypting password"
1564 schema_version = vim.get("schema_version")
1565 self.db.encrypt_decrypt_fields(
1566 vim,
1567 "decrypt",
1568 fields=("password", "secret"),
1569 schema_version=schema_version,
1570 salt=_id,
1571 )
1572 self._process_vim_config(target_id, vim)
1573
1574 if target == "vim":
1575 plugin_name = "rovim_" + vim["vim_type"]
1576 step = "Loading plugin '{}'".format(plugin_name)
1577 vim_module_conn = self._load_plugin(plugin_name)
1578 step = "Loading {}'".format(target_id)
1579 self.my_vims[target_id] = vim_module_conn(
1580 uuid=vim["_id"],
1581 name=vim["name"],
1582 tenant_id=vim.get("vim_tenant_id"),
1583 tenant_name=vim.get("vim_tenant_name"),
1584 url=vim["vim_url"],
1585 url_admin=None,
1586 user=vim["vim_user"],
1587 passwd=vim["vim_password"],
1588 config=vim.get("config") or {},
1589 persistent_info={},
1590 )
1591 else: # sdn
1592 plugin_name = "rosdn_" + vim["type"]
1593 step = "Loading plugin '{}'".format(plugin_name)
1594 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1595 step = "Loading {}'".format(target_id)
1596 wim = deepcopy(vim)
1597 wim_config = wim.pop("config", {}) or {}
1598 wim["uuid"] = wim["_id"]
1599 wim["wim_url"] = wim["url"]
1600
1601 if wim.get("dpid"):
1602 wim_config["dpid"] = wim.pop("dpid")
1603
1604 if wim.get("switch_id"):
1605 wim_config["switch_id"] = wim.pop("switch_id")
1606
1607 # wim, wim_account, config
1608 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1609 self.db_vims[target_id] = vim
1610 self.error_status = None
1611
1612 self.logger.info(
1613 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1614 )
1615 except Exception as e:
1616 self.logger.error(
1617 "Cannot load {} plugin={}: {} {}".format(
1618 target_id, plugin_name, step, e
1619 )
1620 )
1621
1622 self.db_vims[target_id] = vim or {}
1623 self.db_vims[target_id] = FailingConnector(str(e))
1624 error_status = "{} Error: {}".format(step, e)
1625
1626 return error_status
1627 finally:
1628 if target_id not in self.vim_targets:
1629 self.vim_targets.append(target_id)
1630
1631 def _get_db_task(self):
1632 """
1633 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1634 :return: None
1635 """
1636 now = time.time()
1637
1638 if not self.time_last_task_processed:
1639 self.time_last_task_processed = now
1640
1641 try:
1642 while True:
1643 """
1644 # Log RO tasks only when loglevel is DEBUG
1645 if self.logger.getEffectiveLevel() == logging.DEBUG:
1646 self._log_ro_task(
1647 None,
1648 None,
1649 None,
1650 "TASK_WF",
1651 "task_locked_time="
1652 + str(self.task_locked_time)
1653 + " "
1654 + "time_last_task_processed="
1655 + str(self.time_last_task_processed)
1656 + " "
1657 + "now="
1658 + str(now),
1659 )
1660 """
1661 locked = self.db.set_one(
1662 "ro_tasks",
1663 q_filter={
1664 "target_id": self.vim_targets,
1665 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1666 "locked_at.lt": now - self.task_locked_time,
1667 "to_check_at.lt": self.time_last_task_processed,
1668 },
1669 update_dict={"locked_by": self.my_id, "locked_at": now},
1670 fail_on_empty=False,
1671 )
1672
1673 if locked:
1674 # read and return
1675 ro_task = self.db.get_one(
1676 "ro_tasks",
1677 q_filter={
1678 "target_id": self.vim_targets,
1679 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1680 "locked_at": now,
1681 },
1682 )
1683 return ro_task
1684
1685 if self.time_last_task_processed == now:
1686 self.time_last_task_processed = None
1687 return None
1688 else:
1689 self.time_last_task_processed = now
1690 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1691
1692 except DbException as e:
1693 self.logger.error("Database exception at _get_db_task: {}".format(e))
1694 except Exception as e:
1695 self.logger.critical(
1696 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1697 )
1698
1699 return None
1700
1701 def _get_db_all_tasks(self):
1702 """
1703 Read all content of table ro_tasks to log it
1704 :return: None
1705 """
1706 try:
1707 # Checking the content of the BD:
1708
1709 # read and return
1710 ro_task = self.db.get_list("ro_tasks")
1711 for rt in ro_task:
1712 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1713 return ro_task
1714
1715 except DbException as e:
1716 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1717 except Exception as e:
1718 self.logger.critical(
1719 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1720 )
1721
1722 return None
1723
1724 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1725 """
1726 Generate a log with the following format:
1727
1728 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1729 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1730 task_array_index;task_id;task_action;task_item;task_args
1731
1732 Example:
1733
1734 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1735 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1736 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1737 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1738 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1739 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1740 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1741 """
1742 try:
1743 line = []
1744 i = 0
1745 if ro_task is not None and isinstance(ro_task, dict):
1746 for t in ro_task["tasks"]:
1747 line.clear()
1748 line.append(mark)
1749 line.append(event)
1750 line.append(ro_task.get("_id", ""))
1751 line.append(str(ro_task.get("locked_at", "")))
1752 line.append(str(ro_task.get("modified_at", "")))
1753 line.append(str(ro_task.get("created_at", "")))
1754 line.append(str(ro_task.get("to_check_at", "")))
1755 line.append(str(ro_task.get("locked_by", "")))
1756 line.append(str(ro_task.get("target_id", "")))
1757 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1758 line.append(str(ro_task.get("vim_info", "")))
1759 line.append(str(ro_task.get("tasks", "")))
1760 if isinstance(t, dict):
1761 line.append(str(t.get("status", "")))
1762 line.append(str(t.get("action_id", "")))
1763 line.append(str(i))
1764 line.append(str(t.get("task_id", "")))
1765 line.append(str(t.get("action", "")))
1766 line.append(str(t.get("item", "")))
1767 line.append(str(t.get("find_params", "")))
1768 line.append(str(t.get("params", "")))
1769 else:
1770 line.extend([""] * 2)
1771 line.append(str(i))
1772 line.extend([""] * 5)
1773
1774 i += 1
1775 self.logger.debug(";".join(line))
1776 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1777 i = 0
1778 while True:
1779 st = "tasks.{}.status".format(i)
1780 if st not in db_ro_task_update:
1781 break
1782 line.clear()
1783 line.append(mark)
1784 line.append(event)
1785 line.append(db_ro_task_update.get("_id", ""))
1786 line.append(str(db_ro_task_update.get("locked_at", "")))
1787 line.append(str(db_ro_task_update.get("modified_at", "")))
1788 line.append("")
1789 line.append(str(db_ro_task_update.get("to_check_at", "")))
1790 line.append(str(db_ro_task_update.get("locked_by", "")))
1791 line.append("")
1792 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1793 line.append("")
1794 line.append(str(db_ro_task_update.get("vim_info", "")))
1795 line.append(str(str(db_ro_task_update).count(".status")))
1796 line.append(db_ro_task_update.get(st, ""))
1797 line.append("")
1798 line.append(str(i))
1799 line.extend([""] * 3)
1800 i += 1
1801 self.logger.debug(";".join(line))
1802
1803 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1804 line.clear()
1805 line.append(mark)
1806 line.append(event)
1807 line.append(db_ro_task_delete.get("_id", ""))
1808 line.append("")
1809 line.append(db_ro_task_delete.get("modified_at", ""))
1810 line.extend([""] * 13)
1811 self.logger.debug(";".join(line))
1812
1813 else:
1814 line.clear()
1815 line.append(mark)
1816 line.append(event)
1817 line.extend([""] * 16)
1818 self.logger.debug(";".join(line))
1819
1820 except Exception as e:
1821 self.logger.error("Error logging ro_task: {}".format(e))
1822
1823 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1824 """
1825 Determine if this task need to be done or superseded
1826 :return: None
1827 """
1828 my_task = ro_task["tasks"][task_index]
1829 task_id = my_task["task_id"]
1830 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1831 "created_items", False
1832 )
1833
1834 self.logger.warning("Needed delete: {}".format(needed_delete))
1835 if my_task["status"] == "FAILED":
1836 return None, None # TODO need to be retry??
1837
1838 try:
1839 for index, task in enumerate(ro_task["tasks"]):
1840 if index == task_index or not task:
1841 continue # own task
1842
1843 if (
1844 my_task["target_record"] == task["target_record"]
1845 and task["action"] == "CREATE"
1846 ):
1847 # set to finished
1848 db_update["tasks.{}.status".format(index)] = task[
1849 "status"
1850 ] = "FINISHED"
1851 elif task["action"] == "CREATE" and task["status"] not in (
1852 "FINISHED",
1853 "SUPERSEDED",
1854 ):
1855 needed_delete = False
1856
1857 if needed_delete:
1858 self.logger.warning(
1859 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
1860 )
1861 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1862 else:
1863 return "SUPERSEDED", None
1864 except Exception as e:
1865 if not isinstance(e, NsWorkerException):
1866 self.logger.critical(
1867 "Unexpected exception at _delete_task task={}: {}".format(
1868 task_id, e
1869 ),
1870 exc_info=True,
1871 )
1872
1873 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
1874
1875 def _create_task(self, ro_task, task_index, task_depends, db_update):
1876 """
1877 Determine if this task need to create something at VIM
1878 :return: None
1879 """
1880 my_task = ro_task["tasks"][task_index]
1881 task_id = my_task["task_id"]
1882 task_status = None
1883
1884 if my_task["status"] == "FAILED":
1885 return None, None # TODO need to be retry??
1886 elif my_task["status"] == "SCHEDULED":
1887 # check if already created by another task
1888 for index, task in enumerate(ro_task["tasks"]):
1889 if index == task_index or not task:
1890 continue # own task
1891
1892 if task["action"] == "CREATE" and task["status"] not in (
1893 "SCHEDULED",
1894 "FINISHED",
1895 "SUPERSEDED",
1896 ):
1897 return task["status"], "COPY_VIM_INFO"
1898
1899 try:
1900 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1901 ro_task, task_index, task_depends
1902 )
1903 # TODO update other CREATE tasks
1904 except Exception as e:
1905 if not isinstance(e, NsWorkerException):
1906 self.logger.error(
1907 "Error executing task={}: {}".format(task_id, e), exc_info=True
1908 )
1909
1910 task_status = "FAILED"
1911 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
1912 # TODO update ro_vim_item_update
1913
1914 return task_status, ro_vim_item_update
1915 else:
1916 return None, None
1917
1918 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1919 """
1920 Look for dependency task
1921 :param task_id: Can be one of
1922 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1923 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1924 3. task.task_id: "<action_id>:number"
1925 :param ro_task:
1926 :param target_id:
1927 :return: database ro_task plus index of task
1928 """
1929 if (
1930 task_id.startswith("vim:")
1931 or task_id.startswith("sdn:")
1932 or task_id.startswith("wim:")
1933 ):
1934 target_id, _, task_id = task_id.partition(" ")
1935
1936 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1937 ro_task_dependency = self.db.get_one(
1938 "ro_tasks",
1939 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1940 fail_on_empty=False,
1941 )
1942
1943 if ro_task_dependency:
1944 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1945 if task["target_record_id"] == task_id:
1946 return ro_task_dependency, task_index
1947
1948 else:
1949 if ro_task:
1950 for task_index, task in enumerate(ro_task["tasks"]):
1951 if task and task["task_id"] == task_id:
1952 return ro_task, task_index
1953
1954 ro_task_dependency = self.db.get_one(
1955 "ro_tasks",
1956 q_filter={
1957 "tasks.ANYINDEX.task_id": task_id,
1958 "tasks.ANYINDEX.target_record.ne": None,
1959 },
1960 fail_on_empty=False,
1961 )
1962
1963 self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
1964 if ro_task_dependency:
1965 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1966 if task["task_id"] == task_id:
1967 return ro_task_dependency, task_index
1968 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1969
1970 def _process_pending_tasks(self, ro_task):
1971 ro_task_id = ro_task["_id"]
1972 now = time.time()
1973 # one day
1974 next_check_at = now + (24 * 60 * 60)
1975 db_ro_task_update = {}
1976
1977 def _update_refresh(new_status):
1978 # compute next_refresh
1979 nonlocal task
1980 nonlocal next_check_at
1981 nonlocal db_ro_task_update
1982 nonlocal ro_task
1983
1984 next_refresh = time.time()
1985
1986 if task["item"] in ("image", "flavor"):
1987 next_refresh += self.REFRESH_IMAGE
1988 elif new_status == "BUILD":
1989 next_refresh += self.REFRESH_BUILD
1990 elif new_status == "DONE":
1991 next_refresh += self.REFRESH_ACTIVE
1992 else:
1993 next_refresh += self.REFRESH_ERROR
1994
1995 next_check_at = min(next_check_at, next_refresh)
1996 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1997 ro_task["vim_info"]["refresh_at"] = next_refresh
1998
1999 try:
2000 """
2001 # Log RO tasks only when loglevel is DEBUG
2002 if self.logger.getEffectiveLevel() == logging.DEBUG:
2003 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2004 """
2005 # 0: get task_status_create
2006 lock_object = None
2007 task_status_create = None
2008 task_create = next(
2009 (
2010 t
2011 for t in ro_task["tasks"]
2012 if t
2013 and t["action"] == "CREATE"
2014 and t["status"] in ("BUILD", "DONE")
2015 ),
2016 None,
2017 )
2018
2019 if task_create:
2020 task_status_create = task_create["status"]
2021
2022 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2023 for task_action in ("DELETE", "CREATE", "EXEC"):
2024 db_vim_update = None
2025 new_status = None
2026
2027 for task_index, task in enumerate(ro_task["tasks"]):
2028 if not task:
2029 continue # task deleted
2030
2031 task_depends = {}
2032 target_update = None
2033
2034 if (
2035 (
2036 task_action in ("DELETE", "EXEC")
2037 and task["status"] not in ("SCHEDULED", "BUILD")
2038 )
2039 or task["action"] != task_action
2040 or (
2041 task_action == "CREATE"
2042 and task["status"] in ("FINISHED", "SUPERSEDED")
2043 )
2044 ):
2045 continue
2046
2047 task_path = "tasks.{}.status".format(task_index)
2048 try:
2049 db_vim_info_update = None
2050
2051 if task["status"] == "SCHEDULED":
2052 # check if tasks that this depends on have been completed
2053 dependency_not_completed = False
2054
2055 for dependency_task_id in task.get("depends_on") or ():
2056 (
2057 dependency_ro_task,
2058 dependency_task_index,
2059 ) = self._get_dependency(
2060 dependency_task_id, target_id=ro_task["target_id"]
2061 )
2062 dependency_task = dependency_ro_task["tasks"][
2063 dependency_task_index
2064 ]
2065 self.logger.warning(
2066 "dependency_ro_task={} dependency_task_index={}".format(
2067 dependency_ro_task, dependency_task_index
2068 )
2069 )
2070
2071 if dependency_task["status"] == "SCHEDULED":
2072 dependency_not_completed = True
2073 next_check_at = min(
2074 next_check_at, dependency_ro_task["to_check_at"]
2075 )
2076 # must allow dependent task to be processed first
2077 # to do this set time after last_task_processed
2078 next_check_at = max(
2079 self.time_last_task_processed, next_check_at
2080 )
2081 break
2082 elif dependency_task["status"] == "FAILED":
2083 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2084 task["action"],
2085 task["item"],
2086 dependency_task["action"],
2087 dependency_task["item"],
2088 dependency_task_id,
2089 dependency_ro_task["vim_info"].get(
2090 "vim_message"
2091 ),
2092 )
2093 self.logger.error(
2094 "task={} {}".format(task["task_id"], error_text)
2095 )
2096 raise NsWorkerException(error_text)
2097
2098 task_depends[dependency_task_id] = dependency_ro_task[
2099 "vim_info"
2100 ]["vim_id"]
2101 task_depends[
2102 "TASK-{}".format(dependency_task_id)
2103 ] = dependency_ro_task["vim_info"]["vim_id"]
2104
2105 if dependency_not_completed:
2106 self.logger.warning(
2107 "DEPENDENCY NOT COMPLETED {}".format(
2108 dependency_ro_task["vim_info"]["vim_id"]
2109 )
2110 )
2111 # TODO set at vim_info.vim_details that it is waiting
2112 continue
2113
2114 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2115 # the task of renew this locking. It will update database locket_at periodically
2116 if not lock_object:
2117 lock_object = LockRenew.add_lock_object(
2118 "ro_tasks", ro_task, self
2119 )
2120
2121 if task["action"] == "DELETE":
2122 (new_status, db_vim_info_update,) = self._delete_task(
2123 ro_task, task_index, task_depends, db_ro_task_update
2124 )
2125 new_status = (
2126 "FINISHED" if new_status == "DONE" else new_status
2127 )
2128 # ^with FINISHED instead of DONE it will not be refreshing
2129
2130 if new_status in ("FINISHED", "SUPERSEDED"):
2131 target_update = "DELETE"
2132 elif task["action"] == "EXEC":
2133 (
2134 new_status,
2135 db_vim_info_update,
2136 db_task_update,
2137 ) = self.item2class[task["item"]].exec(
2138 ro_task, task_index, task_depends
2139 )
2140 new_status = (
2141 "FINISHED" if new_status == "DONE" else new_status
2142 )
2143 # ^with FINISHED instead of DONE it will not be refreshing
2144
2145 if db_task_update:
2146 # load into database the modified db_task_update "retries" and "next_retry"
2147 if db_task_update.get("retries"):
2148 db_ro_task_update[
2149 "tasks.{}.retries".format(task_index)
2150 ] = db_task_update["retries"]
2151
2152 next_check_at = time.time() + db_task_update.get(
2153 "next_retry", 60
2154 )
2155 target_update = None
2156 elif task["action"] == "CREATE":
2157 if task["status"] == "SCHEDULED":
2158 if task_status_create:
2159 new_status = task_status_create
2160 target_update = "COPY_VIM_INFO"
2161 else:
2162 new_status, db_vim_info_update = self.item2class[
2163 task["item"]
2164 ].new(ro_task, task_index, task_depends)
2165 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2166 _update_refresh(new_status)
2167 else:
2168 if (
2169 ro_task["vim_info"]["refresh_at"]
2170 and now > ro_task["vim_info"]["refresh_at"]
2171 ):
2172 new_status, db_vim_info_update = self.item2class[
2173 task["item"]
2174 ].refresh(ro_task)
2175 _update_refresh(new_status)
2176 else:
2177 # The refresh is updated to avoid set the value of "refresh_at" to
2178 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2179 # because it can happen that in this case the task is never processed
2180 _update_refresh(task["status"])
2181
2182 except Exception as e:
2183 new_status = "FAILED"
2184 db_vim_info_update = {
2185 "vim_status": "VIM_ERROR",
2186 "vim_message": str(e),
2187 }
2188
2189 if not isinstance(
2190 e, (NsWorkerException, vimconn.VimConnException)
2191 ):
2192 self.logger.error(
2193 "Unexpected exception at _delete_task task={}: {}".format(
2194 task["task_id"], e
2195 ),
2196 exc_info=True,
2197 )
2198
2199 try:
2200 if db_vim_info_update:
2201 db_vim_update = db_vim_info_update.copy()
2202 db_ro_task_update.update(
2203 {
2204 "vim_info." + k: v
2205 for k, v in db_vim_info_update.items()
2206 }
2207 )
2208 ro_task["vim_info"].update(db_vim_info_update)
2209
2210 if new_status:
2211 if task_action == "CREATE":
2212 task_status_create = new_status
2213 db_ro_task_update[task_path] = new_status
2214
2215 if target_update or db_vim_update:
2216 if target_update == "DELETE":
2217 self._update_target(task, None)
2218 elif target_update == "COPY_VIM_INFO":
2219 self._update_target(task, ro_task["vim_info"])
2220 else:
2221 self._update_target(task, db_vim_update)
2222
2223 except Exception as e:
2224 if (
2225 isinstance(e, DbException)
2226 and e.http_code == HTTPStatus.NOT_FOUND
2227 ):
2228 # if the vnfrs or nsrs has been removed from database, this task must be removed
2229 self.logger.debug(
2230 "marking to delete task={}".format(task["task_id"])
2231 )
2232 self.tasks_to_delete.append(task)
2233 else:
2234 self.logger.error(
2235 "Unexpected exception at _update_target task={}: {}".format(
2236 task["task_id"], e
2237 ),
2238 exc_info=True,
2239 )
2240
2241 locked_at = ro_task["locked_at"]
2242
2243 if lock_object:
2244 locked_at = [
2245 lock_object["locked_at"],
2246 lock_object["locked_at"] + self.task_locked_time,
2247 ]
2248 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2249 # contain exactly locked_at + self.task_locked_time
2250 LockRenew.remove_lock_object(lock_object)
2251
2252 q_filter = {
2253 "_id": ro_task["_id"],
2254 "to_check_at": ro_task["to_check_at"],
2255 "locked_at": locked_at,
2256 }
2257 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2258 # outside this task (by ro_nbi) do not update it
2259 db_ro_task_update["locked_by"] = None
2260 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2261 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2262 db_ro_task_update["modified_at"] = now
2263 db_ro_task_update["to_check_at"] = next_check_at
2264
2265 """
2266 # Log RO tasks only when loglevel is DEBUG
2267 if self.logger.getEffectiveLevel() == logging.DEBUG:
2268 db_ro_task_update_log = db_ro_task_update.copy()
2269 db_ro_task_update_log["_id"] = q_filter["_id"]
2270 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2271 """
2272
2273 if not self.db.set_one(
2274 "ro_tasks",
2275 update_dict=db_ro_task_update,
2276 q_filter=q_filter,
2277 fail_on_empty=False,
2278 ):
2279 del db_ro_task_update["to_check_at"]
2280 del q_filter["to_check_at"]
2281 """
2282 # Log RO tasks only when loglevel is DEBUG
2283 if self.logger.getEffectiveLevel() == logging.DEBUG:
2284 self._log_ro_task(
2285 None,
2286 db_ro_task_update_log,
2287 None,
2288 "TASK_WF",
2289 "SET_TASK " + str(q_filter),
2290 )
2291 """
2292 self.db.set_one(
2293 "ro_tasks",
2294 q_filter=q_filter,
2295 update_dict=db_ro_task_update,
2296 fail_on_empty=True,
2297 )
2298 except DbException as e:
2299 self.logger.error(
2300 "ro_task={} Error updating database {}".format(ro_task_id, e)
2301 )
2302 except Exception as e:
2303 self.logger.error(
2304 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2305 )
2306
2307 def _update_target(self, task, ro_vim_item_update):
2308 table, _, temp = task["target_record"].partition(":")
2309 _id, _, path_vim_status = temp.partition(":")
2310 path_item = path_vim_status[: path_vim_status.rfind(".")]
2311 path_item = path_item[: path_item.rfind(".")]
2312 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2313 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2314
2315 if ro_vim_item_update:
2316 update_dict = {
2317 path_vim_status + "." + k: v
2318 for k, v in ro_vim_item_update.items()
2319 if k
2320 in (
2321 "vim_id",
2322 "vim_details",
2323 "vim_message",
2324 "vim_name",
2325 "vim_status",
2326 "interfaces",
2327 )
2328 }
2329
2330 if path_vim_status.startswith("vdur."):
2331 # for backward compatibility, add vdur.name apart from vdur.vim_name
2332 if ro_vim_item_update.get("vim_name"):
2333 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2334
2335 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2336 if ro_vim_item_update.get("vim_id"):
2337 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2338
2339 # update general status
2340 if ro_vim_item_update.get("vim_status"):
2341 update_dict[path_item + ".status"] = ro_vim_item_update[
2342 "vim_status"
2343 ]
2344
2345 if ro_vim_item_update.get("interfaces"):
2346 path_interfaces = path_item + ".interfaces"
2347
2348 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2349 if iface:
2350 update_dict.update(
2351 {
2352 path_interfaces + ".{}.".format(i) + k: v
2353 for k, v in iface.items()
2354 if k in ("vlan", "compute_node", "pci")
2355 }
2356 )
2357
2358 # put ip_address and mac_address with ip-address and mac-address
2359 if iface.get("ip_address"):
2360 update_dict[
2361 path_interfaces + ".{}.".format(i) + "ip-address"
2362 ] = iface["ip_address"]
2363
2364 if iface.get("mac_address"):
2365 update_dict[
2366 path_interfaces + ".{}.".format(i) + "mac-address"
2367 ] = iface["mac_address"]
2368
2369 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2370 update_dict["ip-address"] = iface.get("ip_address").split(
2371 ";"
2372 )[0]
2373
2374 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2375 update_dict[path_item + ".ip-address"] = iface.get(
2376 "ip_address"
2377 ).split(";")[0]
2378
2379 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2380 else:
2381 update_dict = {path_item + ".status": "DELETED"}
2382 self.db.set_one(
2383 table,
2384 q_filter={"_id": _id},
2385 update_dict=update_dict,
2386 unset={path_vim_status: None},
2387 )
2388
2389 def _process_delete_db_tasks(self):
2390 """
2391 Delete task from database because vnfrs or nsrs or both have been deleted
2392 :return: None. Uses and modify self.tasks_to_delete
2393 """
2394 while self.tasks_to_delete:
2395 task = self.tasks_to_delete[0]
2396 vnfrs_deleted = None
2397 nsr_id = task["nsr_id"]
2398
2399 if task["target_record"].startswith("vnfrs:"):
2400 # check if nsrs is present
2401 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2402 vnfrs_deleted = task["target_record"].split(":")[1]
2403
2404 try:
2405 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2406 except Exception as e:
2407 self.logger.error(
2408 "Error deleting task={}: {}".format(task["task_id"], e)
2409 )
2410 self.tasks_to_delete.pop(0)
2411
2412 @staticmethod
2413 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2414 """
2415 Static method because it is called from osm_ng_ro.ns
2416 :param db: instance of database to use
2417 :param nsr_id: affected nsrs id
2418 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2419 :return: None, exception is fails
2420 """
2421 retries = 5
2422 for retry in range(retries):
2423 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2424 now = time.time()
2425 conflict = False
2426
2427 for ro_task in ro_tasks:
2428 db_update = {}
2429 to_delete_ro_task = True
2430
2431 for index, task in enumerate(ro_task["tasks"]):
2432 if not task:
2433 pass
2434 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2435 vnfrs_deleted
2436 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2437 ):
2438 db_update["tasks.{}".format(index)] = None
2439 else:
2440 # used by other nsr, ro_task cannot be deleted
2441 to_delete_ro_task = False
2442
2443 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2444 if to_delete_ro_task:
2445 if not db.del_one(
2446 "ro_tasks",
2447 q_filter={
2448 "_id": ro_task["_id"],
2449 "modified_at": ro_task["modified_at"],
2450 },
2451 fail_on_empty=False,
2452 ):
2453 conflict = True
2454 elif db_update:
2455 db_update["modified_at"] = now
2456 if not db.set_one(
2457 "ro_tasks",
2458 q_filter={
2459 "_id": ro_task["_id"],
2460 "modified_at": ro_task["modified_at"],
2461 },
2462 update_dict=db_update,
2463 fail_on_empty=False,
2464 ):
2465 conflict = True
2466 if not conflict:
2467 return
2468 else:
2469 raise NsWorkerException("Exceeded {} retries".format(retries))
2470
2471 def run(self):
2472 # load database
2473 self.logger.info("Starting")
2474 while True:
2475 # step 1: get commands from queue
2476 try:
2477 if self.vim_targets:
2478 task = self.task_queue.get(block=False)
2479 else:
2480 if not self.idle:
2481 self.logger.debug("enters in idle state")
2482 self.idle = True
2483 task = self.task_queue.get(block=True)
2484 self.idle = False
2485
2486 if task[0] == "terminate":
2487 break
2488 elif task[0] == "load_vim":
2489 self.logger.info("order to load vim {}".format(task[1]))
2490 self._load_vim(task[1])
2491 elif task[0] == "unload_vim":
2492 self.logger.info("order to unload vim {}".format(task[1]))
2493 self._unload_vim(task[1])
2494 elif task[0] == "reload_vim":
2495 self._reload_vim(task[1])
2496 elif task[0] == "check_vim":
2497 self.logger.info("order to check vim {}".format(task[1]))
2498 self._check_vim(task[1])
2499 continue
2500 except Exception as e:
2501 if isinstance(e, queue.Empty):
2502 pass
2503 else:
2504 self.logger.critical(
2505 "Error processing task: {}".format(e), exc_info=True
2506 )
2507
2508 # step 2: process pending_tasks, delete not needed tasks
2509 try:
2510 if self.tasks_to_delete:
2511 self._process_delete_db_tasks()
2512 busy = False
2513 """
2514 # Log RO tasks only when loglevel is DEBUG
2515 if self.logger.getEffectiveLevel() == logging.DEBUG:
2516 _ = self._get_db_all_tasks()
2517 """
2518 ro_task = self._get_db_task()
2519 if ro_task:
2520 self.logger.warning("Task to process: {}".format(ro_task))
2521 time.sleep(1)
2522 self._process_pending_tasks(ro_task)
2523 busy = True
2524 if not busy:
2525 time.sleep(5)
2526 except Exception as e:
2527 self.logger.critical(
2528 "Unexpected exception at run: " + str(e), exc_info=True
2529 )
2530
2531 self.logger.info("Finishing")