Fix Bug 2086 Updating VNF status configurable
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn, vimconn
43 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
44 from osm_ro_plugin.vim_dummy import VimDummyConnector
45 import yaml
46
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 }
217 self.logger.debug(
218 "task={} {} new-net={} created={}".format(
219 task_id, ro_task["target_id"], vim_net_id, created
220 )
221 )
222
223 return "BUILD", ro_vim_item_update
224 except (vimconn.VimConnException, NsWorkerException) as e:
225 self.logger.error(
226 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
227 )
228 ro_vim_item_update = {
229 "vim_status": "VIM_ERROR",
230 "created": created,
231 "vim_details": str(e),
232 }
233
234 return "FAILED", ro_vim_item_update
235
236 def refresh(self, ro_task):
237 """Call VIM to get network status"""
238 ro_task_id = ro_task["_id"]
239 target_vim = self.my_vims[ro_task["target_id"]]
240 vim_id = ro_task["vim_info"]["vim_id"]
241 net_to_refresh_list = [vim_id]
242
243 try:
244 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
245 vim_info = vim_dict[vim_id]
246
247 if vim_info["status"] == "ACTIVE":
248 task_status = "DONE"
249 elif vim_info["status"] == "BUILD":
250 task_status = "BUILD"
251 else:
252 task_status = "FAILED"
253 except vimconn.VimConnException as e:
254 # Mark all tasks at VIM_ERROR status
255 self.logger.error(
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id, ro_task["target_id"], vim_id, e
258 )
259 )
260 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
261 task_status = "FAILED"
262
263 ro_vim_item_update = {}
264 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
265 ro_vim_item_update["vim_status"] = vim_info["status"]
266
267 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
268 ro_vim_item_update["vim_name"] = vim_info.get("name")
269
270 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
272 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
273 elif vim_info["status"] == "DELETED":
274 ro_vim_item_update["vim_id"] = None
275 ro_vim_item_update["vim_details"] = "Deleted externally"
276 else:
277 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
278 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
279
280 if ro_vim_item_update:
281 self.logger.debug(
282 "ro_task={} {} get-net={}: status={} {}".format(
283 ro_task_id,
284 ro_task["target_id"],
285 vim_id,
286 ro_vim_item_update.get("vim_status"),
287 ro_vim_item_update.get("vim_details")
288 if ro_vim_item_update.get("vim_status") != "ACTIVE"
289 else "",
290 )
291 )
292
293 return task_status, ro_vim_item_update
294
295 def delete(self, ro_task, task_index):
296 task = ro_task["tasks"][task_index]
297 task_id = task["task_id"]
298 net_vim_id = ro_task["vim_info"]["vim_id"]
299 ro_vim_item_update_ok = {
300 "vim_status": "DELETED",
301 "created": False,
302 "vim_details": "DELETED",
303 "vim_id": None,
304 }
305
306 try:
307 if net_vim_id or ro_task["vim_info"]["created_items"]:
308 target_vim = self.my_vims[ro_task["target_id"]]
309 target_vim.delete_network(
310 net_vim_id, ro_task["vim_info"]["created_items"]
311 )
312 except vimconn.VimConnNotFoundException:
313 ro_vim_item_update_ok["vim_details"] = "already deleted"
314 except vimconn.VimConnException as e:
315 self.logger.error(
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task["_id"], ro_task["target_id"], net_vim_id, e
318 )
319 )
320 ro_vim_item_update = {
321 "vim_status": "VIM_ERROR",
322 "vim_details": "Error while deleting: {}".format(e),
323 }
324
325 return "FAILED", ro_vim_item_update
326
327 self.logger.debug(
328 "task={} {} del-net={} {}".format(
329 task_id,
330 ro_task["target_id"],
331 net_vim_id,
332 ro_vim_item_update_ok.get("vim_details", ""),
333 )
334 )
335
336 return "DONE", ro_vim_item_update_ok
337
338
339 class VimInteractionVdu(VimInteractionBase):
340 max_retries_inject_ssh_key = 20 # 20 times
341 time_retries_inject_ssh_key = 30 # wevery 30 seconds
342
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 created_items = {}
348 target_vim = self.my_vims[ro_task["target_id"]]
349
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391
392 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
393 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
394
395 ro_vim_item_update = {
396 "vim_id": vim_vm_id,
397 "vim_status": "BUILD",
398 "created": created,
399 "created_items": created_items,
400 "vim_details": None,
401 "interfaces_vim_ids": interfaces,
402 "interfaces": [],
403 }
404 self.logger.debug(
405 "task={} {} new-vm={} created={}".format(
406 task_id, ro_task["target_id"], vim_vm_id, created
407 )
408 )
409
410 return "BUILD", ro_vim_item_update
411 except (vimconn.VimConnException, NsWorkerException) as e:
412 self.logger.error(
413 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
414 )
415 ro_vim_item_update = {
416 "vim_status": "VIM_ERROR",
417 "created": created,
418 "vim_details": str(e),
419 }
420
421 return "FAILED", ro_vim_item_update
422
423 def delete(self, ro_task, task_index):
424 task = ro_task["tasks"][task_index]
425 task_id = task["task_id"]
426 vm_vim_id = ro_task["vim_info"]["vim_id"]
427 ro_vim_item_update_ok = {
428 "vim_status": "DELETED",
429 "created": False,
430 "vim_details": "DELETED",
431 "vim_id": None,
432 }
433
434 try:
435 if vm_vim_id or ro_task["vim_info"]["created_items"]:
436 target_vim = self.my_vims[ro_task["target_id"]]
437 target_vim.delete_vminstance(
438 vm_vim_id, ro_task["vim_info"]["created_items"]
439 )
440 except vimconn.VimConnNotFoundException:
441 ro_vim_item_update_ok["vim_details"] = "already deleted"
442 except vimconn.VimConnException as e:
443 self.logger.error(
444 "ro_task={} vim={} del-vm={}: {}".format(
445 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
446 )
447 )
448 ro_vim_item_update = {
449 "vim_status": "VIM_ERROR",
450 "vim_details": "Error while deleting: {}".format(e),
451 }
452
453 return "FAILED", ro_vim_item_update
454
455 self.logger.debug(
456 "task={} {} del-vm={} {}".format(
457 task_id,
458 ro_task["target_id"],
459 vm_vim_id,
460 ro_vim_item_update_ok.get("vim_details", ""),
461 )
462 )
463
464 return "DONE", ro_vim_item_update_ok
465
466 def refresh(self, ro_task):
467 """Call VIM to get vm status"""
468 ro_task_id = ro_task["_id"]
469 target_vim = self.my_vims[ro_task["target_id"]]
470 vim_id = ro_task["vim_info"]["vim_id"]
471
472 if not vim_id:
473 return None, None
474
475 vm_to_refresh_list = [vim_id]
476 try:
477 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
478 vim_info = vim_dict[vim_id]
479
480 if vim_info["status"] == "ACTIVE":
481 task_status = "DONE"
482 elif vim_info["status"] == "BUILD":
483 task_status = "BUILD"
484 else:
485 task_status = "FAILED"
486
487 # try to load and parse vim_information
488 try:
489 vim_info_info = yaml.safe_load(vim_info["vim_info"])
490 if vim_info_info.get("name"):
491 vim_info["name"] = vim_info_info["name"]
492 except Exception:
493 pass
494 except vimconn.VimConnException as e:
495 # Mark all tasks at VIM_ERROR status
496 self.logger.error(
497 "ro_task={} vim={} get-vm={}: {}".format(
498 ro_task_id, ro_task["target_id"], vim_id, e
499 )
500 )
501 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
502 task_status = "FAILED"
503
504 ro_vim_item_update = {}
505
506 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
507 vim_interfaces = []
508 if vim_info.get("interfaces"):
509 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
510 iface = next(
511 (
512 iface
513 for iface in vim_info["interfaces"]
514 if vim_iface_id == iface["vim_interface_id"]
515 ),
516 None,
517 )
518 # if iface:
519 # iface.pop("vim_info", None)
520 vim_interfaces.append(iface)
521
522 task_create = next(
523 t
524 for t in ro_task["tasks"]
525 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
526 )
527 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
528 vim_interfaces[task_create["mgmt_vnf_interface"]][
529 "mgmt_vnf_interface"
530 ] = True
531
532 mgmt_vdu_iface = task_create.get(
533 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
534 )
535 if vim_interfaces:
536 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
537
538 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
539 ro_vim_item_update["interfaces"] = vim_interfaces
540
541 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
542 ro_vim_item_update["vim_status"] = vim_info["status"]
543
544 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
545 ro_vim_item_update["vim_name"] = vim_info.get("name")
546
547 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
548 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
549 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
550 elif vim_info["status"] == "DELETED":
551 ro_vim_item_update["vim_id"] = None
552 ro_vim_item_update["vim_details"] = "Deleted externally"
553 else:
554 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
555 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
556
557 if ro_vim_item_update:
558 self.logger.debug(
559 "ro_task={} {} get-vm={}: status={} {}".format(
560 ro_task_id,
561 ro_task["target_id"],
562 vim_id,
563 ro_vim_item_update.get("vim_status"),
564 ro_vim_item_update.get("vim_details")
565 if ro_vim_item_update.get("vim_status") != "ACTIVE"
566 else "",
567 )
568 )
569
570 return task_status, ro_vim_item_update
571
572 def exec(self, ro_task, task_index, task_depends):
573 task = ro_task["tasks"][task_index]
574 task_id = task["task_id"]
575 target_vim = self.my_vims[ro_task["target_id"]]
576 db_task_update = {"retries": 0}
577 retries = task.get("retries", 0)
578
579 try:
580 params = task["params"]
581 params_copy = deepcopy(params)
582 params_copy["ro_key"] = self.db.decrypt(
583 params_copy.pop("private_key"),
584 params_copy.pop("schema_version"),
585 params_copy.pop("salt"),
586 )
587 params_copy["ip_addr"] = params_copy.pop("ip_address")
588 target_vim.inject_user_key(**params_copy)
589 self.logger.debug(
590 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
591 )
592
593 return (
594 "DONE",
595 None,
596 db_task_update,
597 ) # params_copy["key"]
598 except (vimconn.VimConnException, NsWorkerException) as e:
599 retries += 1
600
601 self.logger.debug(traceback.format_exc())
602 if retries < self.max_retries_inject_ssh_key:
603 return (
604 "BUILD",
605 None,
606 {
607 "retries": retries,
608 "next_retry": self.time_retries_inject_ssh_key,
609 },
610 )
611
612 self.logger.error(
613 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
614 )
615 ro_vim_item_update = {"vim_details": str(e)}
616
617 return "FAILED", ro_vim_item_update, db_task_update
618
619
620 class VimInteractionImage(VimInteractionBase):
621 def new(self, ro_task, task_index, task_depends):
622 task = ro_task["tasks"][task_index]
623 task_id = task["task_id"]
624 created = False
625 created_items = {}
626 target_vim = self.my_vims[ro_task["target_id"]]
627
628 try:
629 # FIND
630 if task.get("find_params"):
631 vim_images = target_vim.get_image_list(**task["find_params"])
632
633 if not vim_images:
634 raise NsWorkerExceptionNotFound(
635 "Image not found with this criteria: '{}'".format(
636 task["find_params"]
637 )
638 )
639 elif len(vim_images) > 1:
640 raise NsWorkerException(
641 "More than one image found with this criteria: '{}'".format(
642 task["find_params"]
643 )
644 )
645 else:
646 vim_image_id = vim_images[0]["id"]
647
648 ro_vim_item_update = {
649 "vim_id": vim_image_id,
650 "vim_status": "DONE",
651 "created": created,
652 "created_items": created_items,
653 "vim_details": None,
654 }
655 self.logger.debug(
656 "task={} {} new-image={} created={}".format(
657 task_id, ro_task["target_id"], vim_image_id, created
658 )
659 )
660
661 return "DONE", ro_vim_item_update
662 except (NsWorkerException, vimconn.VimConnException) as e:
663 self.logger.error(
664 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
665 )
666 ro_vim_item_update = {
667 "vim_status": "VIM_ERROR",
668 "created": created,
669 "vim_details": str(e),
670 }
671
672 return "FAILED", ro_vim_item_update
673
674
675 class VimInteractionFlavor(VimInteractionBase):
676 def delete(self, ro_task, task_index):
677 task = ro_task["tasks"][task_index]
678 task_id = task["task_id"]
679 flavor_vim_id = ro_task["vim_info"]["vim_id"]
680 ro_vim_item_update_ok = {
681 "vim_status": "DELETED",
682 "created": False,
683 "vim_details": "DELETED",
684 "vim_id": None,
685 }
686
687 try:
688 if flavor_vim_id:
689 target_vim = self.my_vims[ro_task["target_id"]]
690 target_vim.delete_flavor(flavor_vim_id)
691 except vimconn.VimConnNotFoundException:
692 ro_vim_item_update_ok["vim_details"] = "already deleted"
693 except vimconn.VimConnException as e:
694 self.logger.error(
695 "ro_task={} vim={} del-flavor={}: {}".format(
696 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
697 )
698 )
699 ro_vim_item_update = {
700 "vim_status": "VIM_ERROR",
701 "vim_details": "Error while deleting: {}".format(e),
702 }
703
704 return "FAILED", ro_vim_item_update
705
706 self.logger.debug(
707 "task={} {} del-flavor={} {}".format(
708 task_id,
709 ro_task["target_id"],
710 flavor_vim_id,
711 ro_vim_item_update_ok.get("vim_details", ""),
712 )
713 )
714
715 return "DONE", ro_vim_item_update_ok
716
717 def new(self, ro_task, task_index, task_depends):
718 task = ro_task["tasks"][task_index]
719 task_id = task["task_id"]
720 created = False
721 created_items = {}
722 target_vim = self.my_vims[ro_task["target_id"]]
723
724 try:
725 # FIND
726 vim_flavor_id = None
727
728 if task.get("find_params"):
729 try:
730 flavor_data = task["find_params"]["flavor_data"]
731 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
732 except vimconn.VimConnNotFoundException:
733 pass
734
735 if not vim_flavor_id and task.get("params"):
736 # CREATE
737 flavor_data = task["params"]["flavor_data"]
738 vim_flavor_id = target_vim.new_flavor(flavor_data)
739 created = True
740
741 ro_vim_item_update = {
742 "vim_id": vim_flavor_id,
743 "vim_status": "DONE",
744 "created": created,
745 "created_items": created_items,
746 "vim_details": None,
747 }
748 self.logger.debug(
749 "task={} {} new-flavor={} created={}".format(
750 task_id, ro_task["target_id"], vim_flavor_id, created
751 )
752 )
753
754 return "DONE", ro_vim_item_update
755 except (vimconn.VimConnException, NsWorkerException) as e:
756 self.logger.error(
757 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
758 )
759 ro_vim_item_update = {
760 "vim_status": "VIM_ERROR",
761 "created": created,
762 "vim_details": str(e),
763 }
764
765 return "FAILED", ro_vim_item_update
766
767
768 class VimInteractionAffinityGroup(VimInteractionBase):
769 def delete(self, ro_task, task_index):
770 task = ro_task["tasks"][task_index]
771 task_id = task["task_id"]
772 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
773 ro_vim_item_update_ok = {
774 "vim_status": "DELETED",
775 "created": False,
776 "vim_details": "DELETED",
777 "vim_id": None,
778 }
779
780 try:
781 if affinity_group_vim_id:
782 target_vim = self.my_vims[ro_task["target_id"]]
783 target_vim.delete_affinity_group(affinity_group_vim_id)
784 except vimconn.VimConnNotFoundException:
785 ro_vim_item_update_ok["vim_details"] = "already deleted"
786 except vimconn.VimConnException as e:
787 self.logger.error(
788 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
789 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
790 )
791 )
792 ro_vim_item_update = {
793 "vim_status": "VIM_ERROR",
794 "vim_details": "Error while deleting: {}".format(e),
795 }
796
797 return "FAILED", ro_vim_item_update
798
799 self.logger.debug(
800 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
801 task_id,
802 ro_task["target_id"],
803 affinity_group_vim_id,
804 ro_vim_item_update_ok.get("vim_details", ""),
805 )
806 )
807
808 return "DONE", ro_vim_item_update_ok
809
810 def new(self, ro_task, task_index, task_depends):
811 task = ro_task["tasks"][task_index]
812 task_id = task["task_id"]
813 created = False
814 created_items = {}
815 target_vim = self.my_vims[ro_task["target_id"]]
816
817 try:
818 affinity_group_vim_id = None
819 affinity_group_data = None
820
821 if task.get("params"):
822 affinity_group_data = task["params"].get("affinity_group_data")
823
824 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
825 try:
826 param_affinity_group_id = task["params"]["affinity_group_data"].get(
827 "vim-affinity-group-id"
828 )
829 affinity_group_vim_id = target_vim.get_affinity_group(
830 param_affinity_group_id
831 ).get("id")
832 except vimconn.VimConnNotFoundException:
833 self.logger.error(
834 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
835 "could not be found at VIM. Creating a new one.".format(
836 task_id, ro_task["target_id"], param_affinity_group_id
837 )
838 )
839
840 if not affinity_group_vim_id and affinity_group_data:
841 affinity_group_vim_id = target_vim.new_affinity_group(
842 affinity_group_data
843 )
844 created = True
845
846 ro_vim_item_update = {
847 "vim_id": affinity_group_vim_id,
848 "vim_status": "DONE",
849 "created": created,
850 "created_items": created_items,
851 "vim_details": None,
852 }
853 self.logger.debug(
854 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
855 task_id, ro_task["target_id"], affinity_group_vim_id, created
856 )
857 )
858
859 return "DONE", ro_vim_item_update
860 except (vimconn.VimConnException, NsWorkerException) as e:
861 self.logger.error(
862 "task={} vim={} new-affinity-or-anti-affinity-group:"
863 " {}".format(task_id, ro_task["target_id"], e)
864 )
865 ro_vim_item_update = {
866 "vim_status": "VIM_ERROR",
867 "created": created,
868 "vim_details": str(e),
869 }
870
871 return "FAILED", ro_vim_item_update
872
873
874 class VimInteractionSdnNet(VimInteractionBase):
875 @staticmethod
876 def _match_pci(port_pci, mapping):
877 """
878 Check if port_pci matches with mapping
879 mapping can have brackets to indicate that several chars are accepted. e.g
880 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
881 :param port_pci: text
882 :param mapping: text, can contain brackets to indicate several chars are available
883 :return: True if matches, False otherwise
884 """
885 if not port_pci or not mapping:
886 return False
887 if port_pci == mapping:
888 return True
889
890 mapping_index = 0
891 pci_index = 0
892 while True:
893 bracket_start = mapping.find("[", mapping_index)
894
895 if bracket_start == -1:
896 break
897
898 bracket_end = mapping.find("]", bracket_start)
899 if bracket_end == -1:
900 break
901
902 length = bracket_start - mapping_index
903 if (
904 length
905 and port_pci[pci_index : pci_index + length]
906 != mapping[mapping_index:bracket_start]
907 ):
908 return False
909
910 if (
911 port_pci[pci_index + length]
912 not in mapping[bracket_start + 1 : bracket_end]
913 ):
914 return False
915
916 pci_index += length + 1
917 mapping_index = bracket_end + 1
918
919 if port_pci[pci_index:] != mapping[mapping_index:]:
920 return False
921
922 return True
923
924 def _get_interfaces(self, vlds_to_connect, vim_account_id):
925 """
926 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
927 :param vim_account_id:
928 :return:
929 """
930 interfaces = []
931
932 for vld in vlds_to_connect:
933 table, _, db_id = vld.partition(":")
934 db_id, _, vld = db_id.partition(":")
935 _, _, vld_id = vld.partition(".")
936
937 if table == "vnfrs":
938 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
939 iface_key = "vnf-vld-id"
940 else: # table == "nsrs"
941 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
942 iface_key = "ns-vld-id"
943
944 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
945
946 for db_vnfr in db_vnfrs:
947 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
948 for iface_index, interface in enumerate(vdur["interfaces"]):
949 if interface.get(iface_key) == vld_id and interface.get(
950 "type"
951 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
952 # only SR-IOV o PT
953 interface_ = interface.copy()
954 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
955 db_vnfr["_id"], vdu_index, iface_index
956 )
957
958 if vdur.get("status") == "ERROR":
959 interface_["status"] = "ERROR"
960
961 interfaces.append(interface_)
962
963 return interfaces
964
965 def refresh(self, ro_task):
966 # look for task create
967 task_create_index, _ = next(
968 i_t
969 for i_t in enumerate(ro_task["tasks"])
970 if i_t[1]
971 and i_t[1]["action"] == "CREATE"
972 and i_t[1]["status"] != "FINISHED"
973 )
974
975 return self.new(ro_task, task_create_index, None)
976
977 def new(self, ro_task, task_index, task_depends):
978
979 task = ro_task["tasks"][task_index]
980 task_id = task["task_id"]
981 target_vim = self.my_vims[ro_task["target_id"]]
982
983 sdn_net_id = ro_task["vim_info"]["vim_id"]
984
985 created_items = ro_task["vim_info"].get("created_items")
986 connected_ports = ro_task["vim_info"].get("connected_ports", [])
987 new_connected_ports = []
988 last_update = ro_task["vim_info"].get("last_update", 0)
989 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
990 error_list = []
991 created = ro_task["vim_info"].get("created", False)
992
993 try:
994 # CREATE
995 params = task["params"]
996 vlds_to_connect = params["vlds"]
997 associated_vim = params["target_vim"]
998 # external additional ports
999 additional_ports = params.get("sdn-ports") or ()
1000 _, _, vim_account_id = associated_vim.partition(":")
1001
1002 if associated_vim:
1003 # get associated VIM
1004 if associated_vim not in self.db_vims:
1005 self.db_vims[associated_vim] = self.db.get_one(
1006 "vim_accounts", {"_id": vim_account_id}
1007 )
1008
1009 db_vim = self.db_vims[associated_vim]
1010
1011 # look for ports to connect
1012 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1013 # print(ports)
1014
1015 sdn_ports = []
1016 pending_ports = error_ports = 0
1017 vlan_used = None
1018 sdn_need_update = False
1019
1020 for port in ports:
1021 vlan_used = port.get("vlan") or vlan_used
1022
1023 # TODO. Do not connect if already done
1024 if not port.get("compute_node") or not port.get("pci"):
1025 if port.get("status") == "ERROR":
1026 error_ports += 1
1027 else:
1028 pending_ports += 1
1029 continue
1030
1031 pmap = None
1032 compute_node_mappings = next(
1033 (
1034 c
1035 for c in db_vim["config"].get("sdn-port-mapping", ())
1036 if c and c["compute_node"] == port["compute_node"]
1037 ),
1038 None,
1039 )
1040
1041 if compute_node_mappings:
1042 # process port_mapping pci of type 0000:af:1[01].[1357]
1043 pmap = next(
1044 (
1045 p
1046 for p in compute_node_mappings["ports"]
1047 if self._match_pci(port["pci"], p.get("pci"))
1048 ),
1049 None,
1050 )
1051
1052 if not pmap:
1053 if not db_vim["config"].get("mapping_not_needed"):
1054 error_list.append(
1055 "Port mapping not found for compute_node={} pci={}".format(
1056 port["compute_node"], port["pci"]
1057 )
1058 )
1059 continue
1060
1061 pmap = {}
1062
1063 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1064 new_port = {
1065 "service_endpoint_id": pmap.get("service_endpoint_id")
1066 or service_endpoint_id,
1067 "service_endpoint_encapsulation_type": "dot1q"
1068 if port["type"] == "SR-IOV"
1069 else None,
1070 "service_endpoint_encapsulation_info": {
1071 "vlan": port.get("vlan"),
1072 "mac": port.get("mac-address"),
1073 "device_id": pmap.get("device_id") or port["compute_node"],
1074 "device_interface_id": pmap.get("device_interface_id")
1075 or port["pci"],
1076 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1077 "switch_port": pmap.get("switch_port"),
1078 "service_mapping_info": pmap.get("service_mapping_info"),
1079 },
1080 }
1081
1082 # TODO
1083 # if port["modified_at"] > last_update:
1084 # sdn_need_update = True
1085 new_connected_ports.append(port["id"]) # TODO
1086 sdn_ports.append(new_port)
1087
1088 if error_ports:
1089 error_list.append(
1090 "{} interfaces have not been created as VDU is on ERROR status".format(
1091 error_ports
1092 )
1093 )
1094
1095 # connect external ports
1096 for index, additional_port in enumerate(additional_ports):
1097 additional_port_id = additional_port.get(
1098 "service_endpoint_id"
1099 ) or "external-{}".format(index)
1100 sdn_ports.append(
1101 {
1102 "service_endpoint_id": additional_port_id,
1103 "service_endpoint_encapsulation_type": additional_port.get(
1104 "service_endpoint_encapsulation_type", "dot1q"
1105 ),
1106 "service_endpoint_encapsulation_info": {
1107 "vlan": additional_port.get("vlan") or vlan_used,
1108 "mac": additional_port.get("mac_address"),
1109 "device_id": additional_port.get("device_id"),
1110 "device_interface_id": additional_port.get(
1111 "device_interface_id"
1112 ),
1113 "switch_dpid": additional_port.get("switch_dpid")
1114 or additional_port.get("switch_id"),
1115 "switch_port": additional_port.get("switch_port"),
1116 "service_mapping_info": additional_port.get(
1117 "service_mapping_info"
1118 ),
1119 },
1120 }
1121 )
1122 new_connected_ports.append(additional_port_id)
1123 sdn_info = ""
1124
1125 # if there are more ports to connect or they have been modified, call create/update
1126 if error_list:
1127 sdn_status = "ERROR"
1128 sdn_info = "; ".join(error_list)
1129 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1130 last_update = time.time()
1131
1132 if not sdn_net_id:
1133 if len(sdn_ports) < 2:
1134 sdn_status = "ACTIVE"
1135
1136 if not pending_ports:
1137 self.logger.debug(
1138 "task={} {} new-sdn-net done, less than 2 ports".format(
1139 task_id, ro_task["target_id"]
1140 )
1141 )
1142 else:
1143 net_type = params.get("type") or "ELAN"
1144 (
1145 sdn_net_id,
1146 created_items,
1147 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1148 created = True
1149 self.logger.debug(
1150 "task={} {} new-sdn-net={} created={}".format(
1151 task_id, ro_task["target_id"], sdn_net_id, created
1152 )
1153 )
1154 else:
1155 created_items = target_vim.edit_connectivity_service(
1156 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1157 )
1158 created = True
1159 self.logger.debug(
1160 "task={} {} update-sdn-net={} created={}".format(
1161 task_id, ro_task["target_id"], sdn_net_id, created
1162 )
1163 )
1164
1165 connected_ports = new_connected_ports
1166 elif sdn_net_id:
1167 wim_status_dict = target_vim.get_connectivity_service_status(
1168 sdn_net_id, conn_info=created_items
1169 )
1170 sdn_status = wim_status_dict["sdn_status"]
1171
1172 if wim_status_dict.get("sdn_info"):
1173 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1174
1175 if wim_status_dict.get("error_msg"):
1176 sdn_info = wim_status_dict.get("error_msg") or ""
1177
1178 if pending_ports:
1179 if sdn_status != "ERROR":
1180 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1181 len(ports) - pending_ports, len(ports)
1182 )
1183
1184 if sdn_status == "ACTIVE":
1185 sdn_status = "BUILD"
1186
1187 ro_vim_item_update = {
1188 "vim_id": sdn_net_id,
1189 "vim_status": sdn_status,
1190 "created": created,
1191 "created_items": created_items,
1192 "connected_ports": connected_ports,
1193 "vim_details": sdn_info,
1194 "last_update": last_update,
1195 }
1196
1197 return sdn_status, ro_vim_item_update
1198 except Exception as e:
1199 self.logger.error(
1200 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1201 exc_info=not isinstance(
1202 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1203 ),
1204 )
1205 ro_vim_item_update = {
1206 "vim_status": "VIM_ERROR",
1207 "created": created,
1208 "vim_details": str(e),
1209 }
1210
1211 return "FAILED", ro_vim_item_update
1212
1213 def delete(self, ro_task, task_index):
1214 task = ro_task["tasks"][task_index]
1215 task_id = task["task_id"]
1216 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1217 ro_vim_item_update_ok = {
1218 "vim_status": "DELETED",
1219 "created": False,
1220 "vim_details": "DELETED",
1221 "vim_id": None,
1222 }
1223
1224 try:
1225 if sdn_vim_id:
1226 target_vim = self.my_vims[ro_task["target_id"]]
1227 target_vim.delete_connectivity_service(
1228 sdn_vim_id, ro_task["vim_info"].get("created_items")
1229 )
1230
1231 except Exception as e:
1232 if (
1233 isinstance(e, sdnconn.SdnConnectorError)
1234 and e.http_code == HTTPStatus.NOT_FOUND.value
1235 ):
1236 ro_vim_item_update_ok["vim_details"] = "already deleted"
1237 else:
1238 self.logger.error(
1239 "ro_task={} vim={} del-sdn-net={}: {}".format(
1240 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1241 ),
1242 exc_info=not isinstance(
1243 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1244 ),
1245 )
1246 ro_vim_item_update = {
1247 "vim_status": "VIM_ERROR",
1248 "vim_details": "Error while deleting: {}".format(e),
1249 }
1250
1251 return "FAILED", ro_vim_item_update
1252
1253 self.logger.debug(
1254 "task={} {} del-sdn-net={} {}".format(
1255 task_id,
1256 ro_task["target_id"],
1257 sdn_vim_id,
1258 ro_vim_item_update_ok.get("vim_details", ""),
1259 )
1260 )
1261
1262 return "DONE", ro_vim_item_update_ok
1263
1264
1265 class ConfigValidate:
1266 def __init__(self, config: Dict):
1267 self.conf = config
1268
1269 @property
1270 def active(self):
1271 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1272 if (
1273 self.conf["period"]["refresh_active"] >= 60
1274 or self.conf["period"]["refresh_active"] == -1
1275 ):
1276 return self.conf["period"]["refresh_active"]
1277
1278 return 60
1279
1280 @property
1281 def build(self):
1282 return self.conf["period"]["refresh_build"]
1283
1284 @property
1285 def image(self):
1286 return self.conf["period"]["refresh_image"]
1287
1288 @property
1289 def error(self):
1290 return self.conf["period"]["refresh_error"]
1291
1292 @property
1293 def queue_size(self):
1294 return self.conf["period"]["queue_size"]
1295
1296
1297 class NsWorker(threading.Thread):
1298 def __init__(self, worker_index, config, plugins, db):
1299 """
1300
1301 :param worker_index: thread index
1302 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1303 :param plugins: global shared dict with the loaded plugins
1304 :param db: database class instance to use
1305 """
1306 threading.Thread.__init__(self)
1307 self.config = config
1308 self.plugins = plugins
1309 self.plugin_name = "unknown"
1310 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1311 self.worker_index = worker_index
1312 # refresh periods for created items
1313 self.refresh_config = ConfigValidate(config)
1314 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1315 # targetvim: vimplugin class
1316 self.my_vims = {}
1317 # targetvim: vim information from database
1318 self.db_vims = {}
1319 # targetvim list
1320 self.vim_targets = []
1321 self.my_id = config["process_id"] + ":" + str(worker_index)
1322 self.db = db
1323 self.item2class = {
1324 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1325 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1326 "image": VimInteractionImage(
1327 self.db, self.my_vims, self.db_vims, self.logger
1328 ),
1329 "flavor": VimInteractionFlavor(
1330 self.db, self.my_vims, self.db_vims, self.logger
1331 ),
1332 "sdn_net": VimInteractionSdnNet(
1333 self.db, self.my_vims, self.db_vims, self.logger
1334 ),
1335 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1336 self.db, self.my_vims, self.db_vims, self.logger
1337 ),
1338 }
1339 self.time_last_task_processed = None
1340 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1341 self.tasks_to_delete = []
1342 # it is idle when there are not vim_targets associated
1343 self.idle = True
1344 self.task_locked_time = config["global"]["task_locked_time"]
1345
1346 def insert_task(self, task):
1347 try:
1348 self.task_queue.put(task, False)
1349 return None
1350 except queue.Full:
1351 raise NsWorkerException("timeout inserting a task")
1352
1353 def terminate(self):
1354 self.insert_task("exit")
1355
1356 def del_task(self, task):
1357 with self.task_lock:
1358 if task["status"] == "SCHEDULED":
1359 task["status"] = "SUPERSEDED"
1360 return True
1361 else: # task["status"] == "processing"
1362 self.task_lock.release()
1363 return False
1364
1365 def _process_vim_config(self, target_id, db_vim):
1366 """
1367 Process vim config, creating vim configuration files as ca_cert
1368 :param target_id: vim/sdn/wim + id
1369 :param db_vim: Vim dictionary obtained from database
1370 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1371 """
1372 if not db_vim.get("config"):
1373 return
1374
1375 file_name = ""
1376
1377 try:
1378 if db_vim["config"].get("ca_cert_content"):
1379 file_name = "{}:{}".format(target_id, self.worker_index)
1380
1381 try:
1382 mkdir(file_name)
1383 except FileExistsError:
1384 pass
1385
1386 file_name = file_name + "/ca_cert"
1387
1388 with open(file_name, "w") as f:
1389 f.write(db_vim["config"]["ca_cert_content"])
1390 del db_vim["config"]["ca_cert_content"]
1391 db_vim["config"]["ca_cert"] = file_name
1392 except Exception as e:
1393 raise NsWorkerException(
1394 "Error writing to file '{}': {}".format(file_name, e)
1395 )
1396
1397 def _load_plugin(self, name, type="vim"):
1398 # type can be vim or sdn
1399 if "rovim_dummy" not in self.plugins:
1400 self.plugins["rovim_dummy"] = VimDummyConnector
1401
1402 if "rosdn_dummy" not in self.plugins:
1403 self.plugins["rosdn_dummy"] = SdnDummyConnector
1404
1405 if name in self.plugins:
1406 return self.plugins[name]
1407
1408 try:
1409 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1410 self.plugins[name] = ep.load()
1411 except Exception as e:
1412 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1413
1414 if name and name not in self.plugins:
1415 raise NsWorkerException(
1416 "Plugin 'osm_{n}' has not been installed".format(n=name)
1417 )
1418
1419 return self.plugins[name]
1420
1421 def _unload_vim(self, target_id):
1422 """
1423 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1424 :param target_id: Contains type:_id; where type can be 'vim', ...
1425 :return: None.
1426 """
1427 try:
1428 self.db_vims.pop(target_id, None)
1429 self.my_vims.pop(target_id, None)
1430
1431 if target_id in self.vim_targets:
1432 self.vim_targets.remove(target_id)
1433
1434 self.logger.info("Unloaded {}".format(target_id))
1435 rmtree("{}:{}".format(target_id, self.worker_index))
1436 except FileNotFoundError:
1437 pass # this is raised by rmtree if folder does not exist
1438 except Exception as e:
1439 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1440
1441 def _check_vim(self, target_id):
1442 """
1443 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1444 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1445 :return: None.
1446 """
1447 target, _, _id = target_id.partition(":")
1448 now = time.time()
1449 update_dict = {}
1450 unset_dict = {}
1451 op_text = ""
1452 step = ""
1453 loaded = target_id in self.vim_targets
1454 target_database = (
1455 "vim_accounts"
1456 if target == "vim"
1457 else "wim_accounts"
1458 if target == "wim"
1459 else "sdns"
1460 )
1461
1462 try:
1463 step = "Getting {} from db".format(target_id)
1464 db_vim = self.db.get_one(target_database, {"_id": _id})
1465
1466 for op_index, operation in enumerate(
1467 db_vim["_admin"].get("operations", ())
1468 ):
1469 if operation["operationState"] != "PROCESSING":
1470 continue
1471
1472 locked_at = operation.get("locked_at")
1473
1474 if locked_at is not None and locked_at >= now - self.task_locked_time:
1475 # some other thread is doing this operation
1476 return
1477
1478 # lock
1479 op_text = "_admin.operations.{}.".format(op_index)
1480
1481 if not self.db.set_one(
1482 target_database,
1483 q_filter={
1484 "_id": _id,
1485 op_text + "operationState": "PROCESSING",
1486 op_text + "locked_at": locked_at,
1487 },
1488 update_dict={
1489 op_text + "locked_at": now,
1490 "admin.current_operation": op_index,
1491 },
1492 fail_on_empty=False,
1493 ):
1494 return
1495
1496 unset_dict[op_text + "locked_at"] = None
1497 unset_dict["current_operation"] = None
1498 step = "Loading " + target_id
1499 error_text = self._load_vim(target_id)
1500
1501 if not error_text:
1502 step = "Checking connectivity"
1503
1504 if target == "vim":
1505 self.my_vims[target_id].check_vim_connectivity()
1506 else:
1507 self.my_vims[target_id].check_credentials()
1508
1509 update_dict["_admin.operationalState"] = "ENABLED"
1510 update_dict["_admin.detailed-status"] = ""
1511 unset_dict[op_text + "detailed-status"] = None
1512 update_dict[op_text + "operationState"] = "COMPLETED"
1513
1514 return
1515
1516 except Exception as e:
1517 error_text = "{}: {}".format(step, e)
1518 self.logger.error("{} for {}: {}".format(step, target_id, e))
1519
1520 finally:
1521 if update_dict or unset_dict:
1522 if error_text:
1523 update_dict[op_text + "operationState"] = "FAILED"
1524 update_dict[op_text + "detailed-status"] = error_text
1525 unset_dict.pop(op_text + "detailed-status", None)
1526 update_dict["_admin.operationalState"] = "ERROR"
1527 update_dict["_admin.detailed-status"] = error_text
1528
1529 if op_text:
1530 update_dict[op_text + "statusEnteredTime"] = now
1531
1532 self.db.set_one(
1533 target_database,
1534 q_filter={"_id": _id},
1535 update_dict=update_dict,
1536 unset=unset_dict,
1537 fail_on_empty=False,
1538 )
1539
1540 if not loaded:
1541 self._unload_vim(target_id)
1542
1543 def _reload_vim(self, target_id):
1544 if target_id in self.vim_targets:
1545 self._load_vim(target_id)
1546 else:
1547 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1548 # just remove it to force load again next time it is needed
1549 self.db_vims.pop(target_id, None)
1550
1551 def _load_vim(self, target_id):
1552 """
1553 Load or reload a vim_account, sdn_controller or wim_account.
1554 Read content from database, load the plugin if not loaded.
1555 In case of error loading the plugin, it load a failing VIM_connector
1556 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1557 :param target_id: Contains type:_id; where type can be 'vim', ...
1558 :return: None if ok, descriptive text if error
1559 """
1560 target, _, _id = target_id.partition(":")
1561 target_database = (
1562 "vim_accounts"
1563 if target == "vim"
1564 else "wim_accounts"
1565 if target == "wim"
1566 else "sdns"
1567 )
1568 plugin_name = ""
1569 vim = None
1570
1571 try:
1572 step = "Getting {}={} from db".format(target, _id)
1573 # TODO process for wim, sdnc, ...
1574 vim = self.db.get_one(target_database, {"_id": _id})
1575
1576 # if deep_get(vim, "config", "sdn-controller"):
1577 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1578 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1579
1580 step = "Decrypting password"
1581 schema_version = vim.get("schema_version")
1582 self.db.encrypt_decrypt_fields(
1583 vim,
1584 "decrypt",
1585 fields=("password", "secret"),
1586 schema_version=schema_version,
1587 salt=_id,
1588 )
1589 self._process_vim_config(target_id, vim)
1590
1591 if target == "vim":
1592 plugin_name = "rovim_" + vim["vim_type"]
1593 step = "Loading plugin '{}'".format(plugin_name)
1594 vim_module_conn = self._load_plugin(plugin_name)
1595 step = "Loading {}'".format(target_id)
1596 self.my_vims[target_id] = vim_module_conn(
1597 uuid=vim["_id"],
1598 name=vim["name"],
1599 tenant_id=vim.get("vim_tenant_id"),
1600 tenant_name=vim.get("vim_tenant_name"),
1601 url=vim["vim_url"],
1602 url_admin=None,
1603 user=vim["vim_user"],
1604 passwd=vim["vim_password"],
1605 config=vim.get("config") or {},
1606 persistent_info={},
1607 )
1608 else: # sdn
1609 plugin_name = "rosdn_" + vim["type"]
1610 step = "Loading plugin '{}'".format(plugin_name)
1611 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1612 step = "Loading {}'".format(target_id)
1613 wim = deepcopy(vim)
1614 wim_config = wim.pop("config", {}) or {}
1615 wim["uuid"] = wim["_id"]
1616 wim["wim_url"] = wim["url"]
1617
1618 if wim.get("dpid"):
1619 wim_config["dpid"] = wim.pop("dpid")
1620
1621 if wim.get("switch_id"):
1622 wim_config["switch_id"] = wim.pop("switch_id")
1623
1624 # wim, wim_account, config
1625 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1626 self.db_vims[target_id] = vim
1627 self.error_status = None
1628
1629 self.logger.info(
1630 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1631 )
1632 except Exception as e:
1633 self.logger.error(
1634 "Cannot load {} plugin={}: {} {}".format(
1635 target_id, plugin_name, step, e
1636 )
1637 )
1638
1639 self.db_vims[target_id] = vim or {}
1640 self.db_vims[target_id] = FailingConnector(str(e))
1641 error_status = "{} Error: {}".format(step, e)
1642
1643 return error_status
1644 finally:
1645 if target_id not in self.vim_targets:
1646 self.vim_targets.append(target_id)
1647
1648 def _get_db_task(self):
1649 """
1650 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1651 :return: None
1652 """
1653 now = time.time()
1654
1655 if not self.time_last_task_processed:
1656 self.time_last_task_processed = now
1657
1658 try:
1659 while True:
1660 """
1661 # Log RO tasks only when loglevel is DEBUG
1662 if self.logger.getEffectiveLevel() == logging.DEBUG:
1663 self._log_ro_task(
1664 None,
1665 None,
1666 None,
1667 "TASK_WF",
1668 "task_locked_time="
1669 + str(self.task_locked_time)
1670 + " "
1671 + "time_last_task_processed="
1672 + str(self.time_last_task_processed)
1673 + " "
1674 + "now="
1675 + str(now),
1676 )
1677 """
1678 locked = self.db.set_one(
1679 "ro_tasks",
1680 q_filter={
1681 "target_id": self.vim_targets,
1682 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1683 "locked_at.lt": now - self.task_locked_time,
1684 "to_check_at.lt": self.time_last_task_processed,
1685 "to_check_at.gt": -1,
1686 },
1687 update_dict={"locked_by": self.my_id, "locked_at": now},
1688 fail_on_empty=False,
1689 )
1690
1691 if locked:
1692 # read and return
1693 ro_task = self.db.get_one(
1694 "ro_tasks",
1695 q_filter={
1696 "target_id": self.vim_targets,
1697 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1698 "locked_at": now,
1699 },
1700 )
1701 return ro_task
1702
1703 if self.time_last_task_processed == now:
1704 self.time_last_task_processed = None
1705 return None
1706 else:
1707 self.time_last_task_processed = now
1708 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1709
1710 except DbException as e:
1711 self.logger.error("Database exception at _get_db_task: {}".format(e))
1712 except Exception as e:
1713 self.logger.critical(
1714 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1715 )
1716
1717 return None
1718
1719 def _get_db_all_tasks(self):
1720 """
1721 Read all content of table ro_tasks to log it
1722 :return: None
1723 """
1724 try:
1725 # Checking the content of the BD:
1726
1727 # read and return
1728 ro_task = self.db.get_list("ro_tasks")
1729 for rt in ro_task:
1730 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1731 return ro_task
1732
1733 except DbException as e:
1734 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1735 except Exception as e:
1736 self.logger.critical(
1737 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1738 )
1739
1740 return None
1741
1742 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1743 """
1744 Generate a log with the following format:
1745
1746 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1747 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1748 task_array_index;task_id;task_action;task_item;task_args
1749
1750 Example:
1751
1752 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1753 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1754 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1755 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1756 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1757 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1758 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1759 """
1760 try:
1761 line = []
1762 i = 0
1763 if ro_task is not None and isinstance(ro_task, dict):
1764 for t in ro_task["tasks"]:
1765 line.clear()
1766 line.append(mark)
1767 line.append(event)
1768 line.append(ro_task.get("_id", ""))
1769 line.append(str(ro_task.get("locked_at", "")))
1770 line.append(str(ro_task.get("modified_at", "")))
1771 line.append(str(ro_task.get("created_at", "")))
1772 line.append(str(ro_task.get("to_check_at", "")))
1773 line.append(str(ro_task.get("locked_by", "")))
1774 line.append(str(ro_task.get("target_id", "")))
1775 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1776 line.append(str(ro_task.get("vim_info", "")))
1777 line.append(str(ro_task.get("tasks", "")))
1778 if isinstance(t, dict):
1779 line.append(str(t.get("status", "")))
1780 line.append(str(t.get("action_id", "")))
1781 line.append(str(i))
1782 line.append(str(t.get("task_id", "")))
1783 line.append(str(t.get("action", "")))
1784 line.append(str(t.get("item", "")))
1785 line.append(str(t.get("find_params", "")))
1786 line.append(str(t.get("params", "")))
1787 else:
1788 line.extend([""] * 2)
1789 line.append(str(i))
1790 line.extend([""] * 5)
1791
1792 i += 1
1793 self.logger.debug(";".join(line))
1794 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1795 i = 0
1796 while True:
1797 st = "tasks.{}.status".format(i)
1798 if st not in db_ro_task_update:
1799 break
1800 line.clear()
1801 line.append(mark)
1802 line.append(event)
1803 line.append(db_ro_task_update.get("_id", ""))
1804 line.append(str(db_ro_task_update.get("locked_at", "")))
1805 line.append(str(db_ro_task_update.get("modified_at", "")))
1806 line.append("")
1807 line.append(str(db_ro_task_update.get("to_check_at", "")))
1808 line.append(str(db_ro_task_update.get("locked_by", "")))
1809 line.append("")
1810 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1811 line.append("")
1812 line.append(str(db_ro_task_update.get("vim_info", "")))
1813 line.append(str(str(db_ro_task_update).count(".status")))
1814 line.append(db_ro_task_update.get(st, ""))
1815 line.append("")
1816 line.append(str(i))
1817 line.extend([""] * 3)
1818 i += 1
1819 self.logger.debug(";".join(line))
1820
1821 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1822 line.clear()
1823 line.append(mark)
1824 line.append(event)
1825 line.append(db_ro_task_delete.get("_id", ""))
1826 line.append("")
1827 line.append(db_ro_task_delete.get("modified_at", ""))
1828 line.extend([""] * 13)
1829 self.logger.debug(";".join(line))
1830
1831 else:
1832 line.clear()
1833 line.append(mark)
1834 line.append(event)
1835 line.extend([""] * 16)
1836 self.logger.debug(";".join(line))
1837
1838 except Exception as e:
1839 self.logger.error("Error logging ro_task: {}".format(e))
1840
1841 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1842 """
1843 Determine if this task need to be done or superseded
1844 :return: None
1845 """
1846 my_task = ro_task["tasks"][task_index]
1847 task_id = my_task["task_id"]
1848 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1849 "created_items", False
1850 )
1851
1852 if my_task["status"] == "FAILED":
1853 return None, None # TODO need to be retry??
1854
1855 try:
1856 for index, task in enumerate(ro_task["tasks"]):
1857 if index == task_index or not task:
1858 continue # own task
1859
1860 if (
1861 my_task["target_record"] == task["target_record"]
1862 and task["action"] == "CREATE"
1863 ):
1864 # set to finished
1865 db_update["tasks.{}.status".format(index)] = task[
1866 "status"
1867 ] = "FINISHED"
1868 elif task["action"] == "CREATE" and task["status"] not in (
1869 "FINISHED",
1870 "SUPERSEDED",
1871 ):
1872 needed_delete = False
1873
1874 if needed_delete:
1875 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1876 else:
1877 return "SUPERSEDED", None
1878 except Exception as e:
1879 if not isinstance(e, NsWorkerException):
1880 self.logger.critical(
1881 "Unexpected exception at _delete_task task={}: {}".format(
1882 task_id, e
1883 ),
1884 exc_info=True,
1885 )
1886
1887 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1888
1889 def _create_task(self, ro_task, task_index, task_depends, db_update):
1890 """
1891 Determine if this task need to create something at VIM
1892 :return: None
1893 """
1894 my_task = ro_task["tasks"][task_index]
1895 task_id = my_task["task_id"]
1896 task_status = None
1897
1898 if my_task["status"] == "FAILED":
1899 return None, None # TODO need to be retry??
1900 elif my_task["status"] == "SCHEDULED":
1901 # check if already created by another task
1902 for index, task in enumerate(ro_task["tasks"]):
1903 if index == task_index or not task:
1904 continue # own task
1905
1906 if task["action"] == "CREATE" and task["status"] not in (
1907 "SCHEDULED",
1908 "FINISHED",
1909 "SUPERSEDED",
1910 ):
1911 return task["status"], "COPY_VIM_INFO"
1912
1913 try:
1914 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1915 ro_task, task_index, task_depends
1916 )
1917 # TODO update other CREATE tasks
1918 except Exception as e:
1919 if not isinstance(e, NsWorkerException):
1920 self.logger.error(
1921 "Error executing task={}: {}".format(task_id, e), exc_info=True
1922 )
1923
1924 task_status = "FAILED"
1925 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1926 # TODO update ro_vim_item_update
1927
1928 return task_status, ro_vim_item_update
1929 else:
1930 return None, None
1931
1932 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1933 """
1934 Look for dependency task
1935 :param task_id: Can be one of
1936 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1937 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1938 3. task.task_id: "<action_id>:number"
1939 :param ro_task:
1940 :param target_id:
1941 :return: database ro_task plus index of task
1942 """
1943 if (
1944 task_id.startswith("vim:")
1945 or task_id.startswith("sdn:")
1946 or task_id.startswith("wim:")
1947 ):
1948 target_id, _, task_id = task_id.partition(" ")
1949
1950 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1951 ro_task_dependency = self.db.get_one(
1952 "ro_tasks",
1953 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1954 fail_on_empty=False,
1955 )
1956
1957 if ro_task_dependency:
1958 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1959 if task["target_record_id"] == task_id:
1960 return ro_task_dependency, task_index
1961
1962 else:
1963 if ro_task:
1964 for task_index, task in enumerate(ro_task["tasks"]):
1965 if task and task["task_id"] == task_id:
1966 return ro_task, task_index
1967
1968 ro_task_dependency = self.db.get_one(
1969 "ro_tasks",
1970 q_filter={
1971 "tasks.ANYINDEX.task_id": task_id,
1972 "tasks.ANYINDEX.target_record.ne": None,
1973 },
1974 fail_on_empty=False,
1975 )
1976
1977 if ro_task_dependency:
1978 for task_index, task in ro_task_dependency["tasks"]:
1979 if task["task_id"] == task_id:
1980 return ro_task_dependency, task_index
1981 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1982
1983 def update_vm_refresh(self):
1984 """Enables the VM status updates if self.refresh_config.active parameter
1985 is not -1 and than updates the DB accordingly
1986
1987 """
1988 try:
1989 self.logger.debug("Checking if VM status update config")
1990 next_refresh = time.time()
1991 if self.refresh_config.active == -1:
1992 next_refresh = -1
1993 else:
1994 next_refresh += self.refresh_config.active
1995
1996 if next_refresh != -1:
1997 db_ro_task_update = {}
1998 now = time.time()
1999 next_check_at = now + (24 * 60 * 60)
2000 next_check_at = min(next_check_at, next_refresh)
2001 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2002 db_ro_task_update["to_check_at"] = next_check_at
2003
2004 self.logger.debug(
2005 "Finding tasks which to be updated to enable VM status updates"
2006 )
2007 refresh_tasks = self.db.get_list(
2008 "ro_tasks",
2009 q_filter={
2010 "tasks.status": "DONE",
2011 "to_check_at.lt": 0,
2012 },
2013 )
2014 self.logger.debug("Updating tasks to change the to_check_at status")
2015 for task in refresh_tasks:
2016 q_filter = {
2017 "_id": task["_id"],
2018 }
2019 self.db.set_one(
2020 "ro_tasks",
2021 q_filter=q_filter,
2022 update_dict=db_ro_task_update,
2023 fail_on_empty=True,
2024 )
2025
2026 except Exception as e:
2027 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2028
2029 def _process_pending_tasks(self, ro_task):
2030 ro_task_id = ro_task["_id"]
2031 now = time.time()
2032 # one day
2033 next_check_at = now + (24 * 60 * 60)
2034 db_ro_task_update = {}
2035
2036 def _update_refresh(new_status):
2037 # compute next_refresh
2038 nonlocal task
2039 nonlocal next_check_at
2040 nonlocal db_ro_task_update
2041 nonlocal ro_task
2042
2043 next_refresh = time.time()
2044
2045 if task["item"] in ("image", "flavor"):
2046 next_refresh += self.refresh_config.image
2047 elif new_status == "BUILD":
2048 next_refresh += self.refresh_config.build
2049 elif new_status == "DONE":
2050 if self.refresh_config.active == -1:
2051 next_refresh = -1
2052 else:
2053 next_refresh += self.refresh_config.active
2054 else:
2055 next_refresh += self.refresh_config.error
2056
2057 next_check_at = min(next_check_at, next_refresh)
2058 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2059 ro_task["vim_info"]["refresh_at"] = next_refresh
2060
2061 try:
2062 """
2063 # Log RO tasks only when loglevel is DEBUG
2064 if self.logger.getEffectiveLevel() == logging.DEBUG:
2065 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2066 """
2067 # Check if vim status refresh is enabled again
2068 self.update_vm_refresh()
2069 # 0: get task_status_create
2070 lock_object = None
2071 task_status_create = None
2072 task_create = next(
2073 (
2074 t
2075 for t in ro_task["tasks"]
2076 if t
2077 and t["action"] == "CREATE"
2078 and t["status"] in ("BUILD", "DONE")
2079 ),
2080 None,
2081 )
2082
2083 if task_create:
2084 task_status_create = task_create["status"]
2085
2086 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2087 for task_action in ("DELETE", "CREATE", "EXEC"):
2088 db_vim_update = None
2089 new_status = None
2090
2091 for task_index, task in enumerate(ro_task["tasks"]):
2092 if not task:
2093 continue # task deleted
2094
2095 task_depends = {}
2096 target_update = None
2097
2098 if (
2099 (
2100 task_action in ("DELETE", "EXEC")
2101 and task["status"] not in ("SCHEDULED", "BUILD")
2102 )
2103 or task["action"] != task_action
2104 or (
2105 task_action == "CREATE"
2106 and task["status"] in ("FINISHED", "SUPERSEDED")
2107 )
2108 ):
2109 continue
2110
2111 task_path = "tasks.{}.status".format(task_index)
2112 try:
2113 db_vim_info_update = None
2114
2115 if task["status"] == "SCHEDULED":
2116 # check if tasks that this depends on have been completed
2117 dependency_not_completed = False
2118
2119 for dependency_task_id in task.get("depends_on") or ():
2120 (
2121 dependency_ro_task,
2122 dependency_task_index,
2123 ) = self._get_dependency(
2124 dependency_task_id, target_id=ro_task["target_id"]
2125 )
2126 dependency_task = dependency_ro_task["tasks"][
2127 dependency_task_index
2128 ]
2129
2130 if dependency_task["status"] == "SCHEDULED":
2131 dependency_not_completed = True
2132 next_check_at = min(
2133 next_check_at, dependency_ro_task["to_check_at"]
2134 )
2135 # must allow dependent task to be processed first
2136 # to do this set time after last_task_processed
2137 next_check_at = max(
2138 self.time_last_task_processed, next_check_at
2139 )
2140 break
2141 elif dependency_task["status"] == "FAILED":
2142 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2143 task["action"],
2144 task["item"],
2145 dependency_task["action"],
2146 dependency_task["item"],
2147 dependency_task_id,
2148 dependency_ro_task["vim_info"].get(
2149 "vim_details"
2150 ),
2151 )
2152 self.logger.error(
2153 "task={} {}".format(task["task_id"], error_text)
2154 )
2155 raise NsWorkerException(error_text)
2156
2157 task_depends[dependency_task_id] = dependency_ro_task[
2158 "vim_info"
2159 ]["vim_id"]
2160 task_depends[
2161 "TASK-{}".format(dependency_task_id)
2162 ] = dependency_ro_task["vim_info"]["vim_id"]
2163
2164 if dependency_not_completed:
2165 # TODO set at vim_info.vim_details that it is waiting
2166 continue
2167
2168 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2169 # the task of renew this locking. It will update database locket_at periodically
2170 if not lock_object:
2171 lock_object = LockRenew.add_lock_object(
2172 "ro_tasks", ro_task, self
2173 )
2174
2175 if task["action"] == "DELETE":
2176 (new_status, db_vim_info_update,) = self._delete_task(
2177 ro_task, task_index, task_depends, db_ro_task_update
2178 )
2179 new_status = (
2180 "FINISHED" if new_status == "DONE" else new_status
2181 )
2182 # ^with FINISHED instead of DONE it will not be refreshing
2183
2184 if new_status in ("FINISHED", "SUPERSEDED"):
2185 target_update = "DELETE"
2186 elif task["action"] == "EXEC":
2187 (
2188 new_status,
2189 db_vim_info_update,
2190 db_task_update,
2191 ) = self.item2class[task["item"]].exec(
2192 ro_task, task_index, task_depends
2193 )
2194 new_status = (
2195 "FINISHED" if new_status == "DONE" else new_status
2196 )
2197 # ^with FINISHED instead of DONE it will not be refreshing
2198
2199 if db_task_update:
2200 # load into database the modified db_task_update "retries" and "next_retry"
2201 if db_task_update.get("retries"):
2202 db_ro_task_update[
2203 "tasks.{}.retries".format(task_index)
2204 ] = db_task_update["retries"]
2205
2206 next_check_at = time.time() + db_task_update.get(
2207 "next_retry", 60
2208 )
2209 target_update = None
2210 elif task["action"] == "CREATE":
2211 if task["status"] == "SCHEDULED":
2212 if task_status_create:
2213 new_status = task_status_create
2214 target_update = "COPY_VIM_INFO"
2215 else:
2216 new_status, db_vim_info_update = self.item2class[
2217 task["item"]
2218 ].new(ro_task, task_index, task_depends)
2219 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2220 _update_refresh(new_status)
2221 else:
2222 refresh_at = ro_task["vim_info"]["refresh_at"]
2223 if refresh_at and refresh_at != -1 and now > refresh_at:
2224 new_status, db_vim_info_update = self.item2class[
2225 task["item"]
2226 ].refresh(ro_task)
2227 _update_refresh(new_status)
2228 else:
2229 # The refresh is updated to avoid set the value of "refresh_at" to
2230 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2231 # because it can happen that in this case the task is never processed
2232 _update_refresh(task["status"])
2233
2234 except Exception as e:
2235 new_status = "FAILED"
2236 db_vim_info_update = {
2237 "vim_status": "VIM_ERROR",
2238 "vim_details": str(e),
2239 }
2240
2241 if not isinstance(
2242 e, (NsWorkerException, vimconn.VimConnException)
2243 ):
2244 self.logger.error(
2245 "Unexpected exception at _delete_task task={}: {}".format(
2246 task["task_id"], e
2247 ),
2248 exc_info=True,
2249 )
2250
2251 try:
2252 if db_vim_info_update:
2253 db_vim_update = db_vim_info_update.copy()
2254 db_ro_task_update.update(
2255 {
2256 "vim_info." + k: v
2257 for k, v in db_vim_info_update.items()
2258 }
2259 )
2260 ro_task["vim_info"].update(db_vim_info_update)
2261
2262 if new_status:
2263 if task_action == "CREATE":
2264 task_status_create = new_status
2265 db_ro_task_update[task_path] = new_status
2266
2267 if target_update or db_vim_update:
2268 if target_update == "DELETE":
2269 self._update_target(task, None)
2270 elif target_update == "COPY_VIM_INFO":
2271 self._update_target(task, ro_task["vim_info"])
2272 else:
2273 self._update_target(task, db_vim_update)
2274
2275 except Exception as e:
2276 if (
2277 isinstance(e, DbException)
2278 and e.http_code == HTTPStatus.NOT_FOUND
2279 ):
2280 # if the vnfrs or nsrs has been removed from database, this task must be removed
2281 self.logger.debug(
2282 "marking to delete task={}".format(task["task_id"])
2283 )
2284 self.tasks_to_delete.append(task)
2285 else:
2286 self.logger.error(
2287 "Unexpected exception at _update_target task={}: {}".format(
2288 task["task_id"], e
2289 ),
2290 exc_info=True,
2291 )
2292
2293 locked_at = ro_task["locked_at"]
2294
2295 if lock_object:
2296 locked_at = [
2297 lock_object["locked_at"],
2298 lock_object["locked_at"] + self.task_locked_time,
2299 ]
2300 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2301 # contain exactly locked_at + self.task_locked_time
2302 LockRenew.remove_lock_object(lock_object)
2303
2304 q_filter = {
2305 "_id": ro_task["_id"],
2306 "to_check_at": ro_task["to_check_at"],
2307 "locked_at": locked_at,
2308 }
2309 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2310 # outside this task (by ro_nbi) do not update it
2311 db_ro_task_update["locked_by"] = None
2312 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2313 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2314 db_ro_task_update["modified_at"] = now
2315 db_ro_task_update["to_check_at"] = next_check_at
2316
2317 """
2318 # Log RO tasks only when loglevel is DEBUG
2319 if self.logger.getEffectiveLevel() == logging.DEBUG:
2320 db_ro_task_update_log = db_ro_task_update.copy()
2321 db_ro_task_update_log["_id"] = q_filter["_id"]
2322 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2323 """
2324
2325 if not self.db.set_one(
2326 "ro_tasks",
2327 update_dict=db_ro_task_update,
2328 q_filter=q_filter,
2329 fail_on_empty=False,
2330 ):
2331 del db_ro_task_update["to_check_at"]
2332 del q_filter["to_check_at"]
2333 """
2334 # Log RO tasks only when loglevel is DEBUG
2335 if self.logger.getEffectiveLevel() == logging.DEBUG:
2336 self._log_ro_task(
2337 None,
2338 db_ro_task_update_log,
2339 None,
2340 "TASK_WF",
2341 "SET_TASK " + str(q_filter),
2342 )
2343 """
2344 self.db.set_one(
2345 "ro_tasks",
2346 q_filter=q_filter,
2347 update_dict=db_ro_task_update,
2348 fail_on_empty=True,
2349 )
2350 except DbException as e:
2351 self.logger.error(
2352 "ro_task={} Error updating database {}".format(ro_task_id, e)
2353 )
2354 except Exception as e:
2355 self.logger.error(
2356 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2357 )
2358
2359 def _update_target(self, task, ro_vim_item_update):
2360 table, _, temp = task["target_record"].partition(":")
2361 _id, _, path_vim_status = temp.partition(":")
2362 path_item = path_vim_status[: path_vim_status.rfind(".")]
2363 path_item = path_item[: path_item.rfind(".")]
2364 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2365 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2366
2367 if ro_vim_item_update:
2368 update_dict = {
2369 path_vim_status + "." + k: v
2370 for k, v in ro_vim_item_update.items()
2371 if k
2372 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2373 }
2374
2375 if path_vim_status.startswith("vdur."):
2376 # for backward compatibility, add vdur.name apart from vdur.vim_name
2377 if ro_vim_item_update.get("vim_name"):
2378 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2379
2380 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2381 if ro_vim_item_update.get("vim_id"):
2382 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2383
2384 # update general status
2385 if ro_vim_item_update.get("vim_status"):
2386 update_dict[path_item + ".status"] = ro_vim_item_update[
2387 "vim_status"
2388 ]
2389
2390 if ro_vim_item_update.get("interfaces"):
2391 path_interfaces = path_item + ".interfaces"
2392
2393 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2394 if iface:
2395 update_dict.update(
2396 {
2397 path_interfaces + ".{}.".format(i) + k: v
2398 for k, v in iface.items()
2399 if k in ("vlan", "compute_node", "pci")
2400 }
2401 )
2402
2403 # put ip_address and mac_address with ip-address and mac-address
2404 if iface.get("ip_address"):
2405 update_dict[
2406 path_interfaces + ".{}.".format(i) + "ip-address"
2407 ] = iface["ip_address"]
2408
2409 if iface.get("mac_address"):
2410 update_dict[
2411 path_interfaces + ".{}.".format(i) + "mac-address"
2412 ] = iface["mac_address"]
2413
2414 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2415 update_dict["ip-address"] = iface.get("ip_address").split(
2416 ";"
2417 )[0]
2418
2419 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2420 update_dict[path_item + ".ip-address"] = iface.get(
2421 "ip_address"
2422 ).split(";")[0]
2423
2424 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2425 else:
2426 update_dict = {path_item + ".status": "DELETED"}
2427 self.db.set_one(
2428 table,
2429 q_filter={"_id": _id},
2430 update_dict=update_dict,
2431 unset={path_vim_status: None},
2432 )
2433
2434 def _process_delete_db_tasks(self):
2435 """
2436 Delete task from database because vnfrs or nsrs or both have been deleted
2437 :return: None. Uses and modify self.tasks_to_delete
2438 """
2439 while self.tasks_to_delete:
2440 task = self.tasks_to_delete[0]
2441 vnfrs_deleted = None
2442 nsr_id = task["nsr_id"]
2443
2444 if task["target_record"].startswith("vnfrs:"):
2445 # check if nsrs is present
2446 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2447 vnfrs_deleted = task["target_record"].split(":")[1]
2448
2449 try:
2450 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2451 except Exception as e:
2452 self.logger.error(
2453 "Error deleting task={}: {}".format(task["task_id"], e)
2454 )
2455 self.tasks_to_delete.pop(0)
2456
2457 @staticmethod
2458 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2459 """
2460 Static method because it is called from osm_ng_ro.ns
2461 :param db: instance of database to use
2462 :param nsr_id: affected nsrs id
2463 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2464 :return: None, exception is fails
2465 """
2466 retries = 5
2467 for retry in range(retries):
2468 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2469 now = time.time()
2470 conflict = False
2471
2472 for ro_task in ro_tasks:
2473 db_update = {}
2474 to_delete_ro_task = True
2475
2476 for index, task in enumerate(ro_task["tasks"]):
2477 if not task:
2478 pass
2479 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2480 vnfrs_deleted
2481 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2482 ):
2483 db_update["tasks.{}".format(index)] = None
2484 else:
2485 # used by other nsr, ro_task cannot be deleted
2486 to_delete_ro_task = False
2487
2488 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2489 if to_delete_ro_task:
2490 if not db.del_one(
2491 "ro_tasks",
2492 q_filter={
2493 "_id": ro_task["_id"],
2494 "modified_at": ro_task["modified_at"],
2495 },
2496 fail_on_empty=False,
2497 ):
2498 conflict = True
2499 elif db_update:
2500 db_update["modified_at"] = now
2501 if not db.set_one(
2502 "ro_tasks",
2503 q_filter={
2504 "_id": ro_task["_id"],
2505 "modified_at": ro_task["modified_at"],
2506 },
2507 update_dict=db_update,
2508 fail_on_empty=False,
2509 ):
2510 conflict = True
2511 if not conflict:
2512 return
2513 else:
2514 raise NsWorkerException("Exceeded {} retries".format(retries))
2515
2516 def run(self):
2517 # load database
2518 self.logger.info("Starting")
2519 while True:
2520 # step 1: get commands from queue
2521 try:
2522 if self.vim_targets:
2523 task = self.task_queue.get(block=False)
2524 else:
2525 if not self.idle:
2526 self.logger.debug("enters in idle state")
2527 self.idle = True
2528 task = self.task_queue.get(block=True)
2529 self.idle = False
2530
2531 if task[0] == "terminate":
2532 break
2533 elif task[0] == "load_vim":
2534 self.logger.info("order to load vim {}".format(task[1]))
2535 self._load_vim(task[1])
2536 elif task[0] == "unload_vim":
2537 self.logger.info("order to unload vim {}".format(task[1]))
2538 self._unload_vim(task[1])
2539 elif task[0] == "reload_vim":
2540 self._reload_vim(task[1])
2541 elif task[0] == "check_vim":
2542 self.logger.info("order to check vim {}".format(task[1]))
2543 self._check_vim(task[1])
2544 continue
2545 except Exception as e:
2546 if isinstance(e, queue.Empty):
2547 pass
2548 else:
2549 self.logger.critical(
2550 "Error processing task: {}".format(e), exc_info=True
2551 )
2552
2553 # step 2: process pending_tasks, delete not needed tasks
2554 try:
2555 if self.tasks_to_delete:
2556 self._process_delete_db_tasks()
2557 busy = False
2558 """
2559 # Log RO tasks only when loglevel is DEBUG
2560 if self.logger.getEffectiveLevel() == logging.DEBUG:
2561 _ = self._get_db_all_tasks()
2562 """
2563 ro_task = self._get_db_task()
2564 if ro_task:
2565 self._process_pending_tasks(ro_task)
2566 busy = True
2567 if not busy:
2568 time.sleep(5)
2569 except Exception as e:
2570 self.logger.critical(
2571 "Unexpected exception at run: " + str(e), exc_info=True
2572 )
2573
2574 self.logger.info("Finishing")