edf1bbffde904ae0e548a8577f955495751d3b78
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn, vimconn
43 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
44 from osm_ro_plugin.vim_dummy import VimDummyConnector
45 import yaml
46
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 }
217 self.logger.debug(
218 "task={} {} new-net={} created={}".format(
219 task_id, ro_task["target_id"], vim_net_id, created
220 )
221 )
222
223 return "BUILD", ro_vim_item_update
224 except (vimconn.VimConnException, NsWorkerException) as e:
225 self.logger.error(
226 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
227 )
228 ro_vim_item_update = {
229 "vim_status": "VIM_ERROR",
230 "created": created,
231 "vim_details": str(e),
232 }
233
234 return "FAILED", ro_vim_item_update
235
236 def refresh(self, ro_task):
237 """Call VIM to get network status"""
238 ro_task_id = ro_task["_id"]
239 target_vim = self.my_vims[ro_task["target_id"]]
240 vim_id = ro_task["vim_info"]["vim_id"]
241 net_to_refresh_list = [vim_id]
242
243 try:
244 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
245 vim_info = vim_dict[vim_id]
246
247 if vim_info["status"] == "ACTIVE":
248 task_status = "DONE"
249 elif vim_info["status"] == "BUILD":
250 task_status = "BUILD"
251 else:
252 task_status = "FAILED"
253 except vimconn.VimConnException as e:
254 # Mark all tasks at VIM_ERROR status
255 self.logger.error(
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id, ro_task["target_id"], vim_id, e
258 )
259 )
260 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
261 task_status = "FAILED"
262
263 ro_vim_item_update = {}
264 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
265 ro_vim_item_update["vim_status"] = vim_info["status"]
266
267 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
268 ro_vim_item_update["vim_name"] = vim_info.get("name")
269
270 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
272 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
273 elif vim_info["status"] == "DELETED":
274 ro_vim_item_update["vim_id"] = None
275 ro_vim_item_update["vim_details"] = "Deleted externally"
276 else:
277 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
278 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
279
280 if ro_vim_item_update:
281 self.logger.debug(
282 "ro_task={} {} get-net={}: status={} {}".format(
283 ro_task_id,
284 ro_task["target_id"],
285 vim_id,
286 ro_vim_item_update.get("vim_status"),
287 ro_vim_item_update.get("vim_details")
288 if ro_vim_item_update.get("vim_status") != "ACTIVE"
289 else "",
290 )
291 )
292
293 return task_status, ro_vim_item_update
294
295 def delete(self, ro_task, task_index):
296 task = ro_task["tasks"][task_index]
297 task_id = task["task_id"]
298 net_vim_id = ro_task["vim_info"]["vim_id"]
299 ro_vim_item_update_ok = {
300 "vim_status": "DELETED",
301 "created": False,
302 "vim_details": "DELETED",
303 "vim_id": None,
304 }
305
306 try:
307 if net_vim_id or ro_task["vim_info"]["created_items"]:
308 target_vim = self.my_vims[ro_task["target_id"]]
309 target_vim.delete_network(
310 net_vim_id, ro_task["vim_info"]["created_items"]
311 )
312 except vimconn.VimConnNotFoundException:
313 ro_vim_item_update_ok["vim_details"] = "already deleted"
314 except vimconn.VimConnException as e:
315 self.logger.error(
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task["_id"], ro_task["target_id"], net_vim_id, e
318 )
319 )
320 ro_vim_item_update = {
321 "vim_status": "VIM_ERROR",
322 "vim_details": "Error while deleting: {}".format(e),
323 }
324
325 return "FAILED", ro_vim_item_update
326
327 self.logger.debug(
328 "task={} {} del-net={} {}".format(
329 task_id,
330 ro_task["target_id"],
331 net_vim_id,
332 ro_vim_item_update_ok.get("vim_details", ""),
333 )
334 )
335
336 return "DONE", ro_vim_item_update_ok
337
338
339 class VimInteractionVdu(VimInteractionBase):
340 max_retries_inject_ssh_key = 20 # 20 times
341 time_retries_inject_ssh_key = 30 # wevery 30 seconds
342
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 created_items = {}
348 target_vim = self.my_vims[ro_task["target_id"]]
349
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391
392 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
393 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
394
395 ro_vim_item_update = {
396 "vim_id": vim_vm_id,
397 "vim_status": "BUILD",
398 "created": created,
399 "created_items": created_items,
400 "vim_details": None,
401 "interfaces_vim_ids": interfaces,
402 "interfaces": [],
403 }
404 self.logger.debug(
405 "task={} {} new-vm={} created={}".format(
406 task_id, ro_task["target_id"], vim_vm_id, created
407 )
408 )
409
410 return "BUILD", ro_vim_item_update
411 except (vimconn.VimConnException, NsWorkerException) as e:
412 self.logger.error(
413 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
414 )
415 ro_vim_item_update = {
416 "vim_status": "VIM_ERROR",
417 "created": created,
418 "vim_details": str(e),
419 }
420
421 return "FAILED", ro_vim_item_update
422
423 def delete(self, ro_task, task_index):
424 task = ro_task["tasks"][task_index]
425 task_id = task["task_id"]
426 vm_vim_id = ro_task["vim_info"]["vim_id"]
427 ro_vim_item_update_ok = {
428 "vim_status": "DELETED",
429 "created": False,
430 "vim_details": "DELETED",
431 "vim_id": None,
432 }
433
434 try:
435 if vm_vim_id or ro_task["vim_info"]["created_items"]:
436 target_vim = self.my_vims[ro_task["target_id"]]
437 target_vim.delete_vminstance(
438 vm_vim_id, ro_task["vim_info"]["created_items"]
439 )
440 except vimconn.VimConnNotFoundException:
441 ro_vim_item_update_ok["vim_details"] = "already deleted"
442 except vimconn.VimConnException as e:
443 self.logger.error(
444 "ro_task={} vim={} del-vm={}: {}".format(
445 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
446 )
447 )
448 ro_vim_item_update = {
449 "vim_status": "VIM_ERROR",
450 "vim_details": "Error while deleting: {}".format(e),
451 }
452
453 return "FAILED", ro_vim_item_update
454
455 self.logger.debug(
456 "task={} {} del-vm={} {}".format(
457 task_id,
458 ro_task["target_id"],
459 vm_vim_id,
460 ro_vim_item_update_ok.get("vim_details", ""),
461 )
462 )
463
464 return "DONE", ro_vim_item_update_ok
465
466 def refresh(self, ro_task):
467 """Call VIM to get vm status"""
468 ro_task_id = ro_task["_id"]
469 target_vim = self.my_vims[ro_task["target_id"]]
470 vim_id = ro_task["vim_info"]["vim_id"]
471
472 if not vim_id:
473 return None, None
474
475 vm_to_refresh_list = [vim_id]
476 try:
477 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
478 vim_info = vim_dict[vim_id]
479
480 if vim_info["status"] == "ACTIVE":
481 task_status = "DONE"
482 elif vim_info["status"] == "BUILD":
483 task_status = "BUILD"
484 else:
485 task_status = "FAILED"
486
487 # try to load and parse vim_information
488 try:
489 vim_info_info = yaml.safe_load(vim_info["vim_info"])
490 if vim_info_info.get("name"):
491 vim_info["name"] = vim_info_info["name"]
492 except Exception:
493 pass
494 except vimconn.VimConnException as e:
495 # Mark all tasks at VIM_ERROR status
496 self.logger.error(
497 "ro_task={} vim={} get-vm={}: {}".format(
498 ro_task_id, ro_task["target_id"], vim_id, e
499 )
500 )
501 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
502 task_status = "FAILED"
503
504 ro_vim_item_update = {}
505
506 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
507 vim_interfaces = []
508 if vim_info.get("interfaces"):
509 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
510 iface = next(
511 (
512 iface
513 for iface in vim_info["interfaces"]
514 if vim_iface_id == iface["vim_interface_id"]
515 ),
516 None,
517 )
518 # if iface:
519 # iface.pop("vim_info", None)
520 vim_interfaces.append(iface)
521
522 task_create = next(
523 t
524 for t in ro_task["tasks"]
525 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
526 )
527 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
528 vim_interfaces[task_create["mgmt_vnf_interface"]][
529 "mgmt_vnf_interface"
530 ] = True
531
532 mgmt_vdu_iface = task_create.get(
533 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
534 )
535 if vim_interfaces:
536 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
537
538 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
539 ro_vim_item_update["interfaces"] = vim_interfaces
540
541 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
542 ro_vim_item_update["vim_status"] = vim_info["status"]
543
544 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
545 ro_vim_item_update["vim_name"] = vim_info.get("name")
546
547 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
548 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
549 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
550 elif vim_info["status"] == "DELETED":
551 ro_vim_item_update["vim_id"] = None
552 ro_vim_item_update["vim_details"] = "Deleted externally"
553 else:
554 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
555 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
556
557 if ro_vim_item_update:
558 self.logger.debug(
559 "ro_task={} {} get-vm={}: status={} {}".format(
560 ro_task_id,
561 ro_task["target_id"],
562 vim_id,
563 ro_vim_item_update.get("vim_status"),
564 ro_vim_item_update.get("vim_details")
565 if ro_vim_item_update.get("vim_status") != "ACTIVE"
566 else "",
567 )
568 )
569
570 return task_status, ro_vim_item_update
571
572 def exec(self, ro_task, task_index, task_depends):
573 task = ro_task["tasks"][task_index]
574 task_id = task["task_id"]
575 target_vim = self.my_vims[ro_task["target_id"]]
576 db_task_update = {"retries": 0}
577 retries = task.get("retries", 0)
578
579 try:
580 params = task["params"]
581 params_copy = deepcopy(params)
582 params_copy["ro_key"] = self.db.decrypt(
583 params_copy.pop("private_key"),
584 params_copy.pop("schema_version"),
585 params_copy.pop("salt"),
586 )
587 params_copy["ip_addr"] = params_copy.pop("ip_address")
588 target_vim.inject_user_key(**params_copy)
589 self.logger.debug(
590 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
591 )
592
593 return (
594 "DONE",
595 None,
596 db_task_update,
597 ) # params_copy["key"]
598 except (vimconn.VimConnException, NsWorkerException) as e:
599 retries += 1
600
601 self.logger.debug(traceback.format_exc())
602 if retries < self.max_retries_inject_ssh_key:
603 return (
604 "BUILD",
605 None,
606 {
607 "retries": retries,
608 "next_retry": self.time_retries_inject_ssh_key,
609 },
610 )
611
612 self.logger.error(
613 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
614 )
615 ro_vim_item_update = {"vim_details": str(e)}
616
617 return "FAILED", ro_vim_item_update, db_task_update
618
619
620 class VimInteractionImage(VimInteractionBase):
621 def new(self, ro_task, task_index, task_depends):
622 task = ro_task["tasks"][task_index]
623 task_id = task["task_id"]
624 created = False
625 created_items = {}
626 target_vim = self.my_vims[ro_task["target_id"]]
627
628 try:
629 # FIND
630 if task.get("find_params"):
631 vim_images = target_vim.get_image_list(**task["find_params"])
632
633 if not vim_images:
634 raise NsWorkerExceptionNotFound(
635 "Image not found with this criteria: '{}'".format(
636 task["find_params"]
637 )
638 )
639 elif len(vim_images) > 1:
640 raise NsWorkerException(
641 "More than one image found with this criteria: '{}'".format(
642 task["find_params"]
643 )
644 )
645 else:
646 vim_image_id = vim_images[0]["id"]
647
648 ro_vim_item_update = {
649 "vim_id": vim_image_id,
650 "vim_status": "DONE",
651 "created": created,
652 "created_items": created_items,
653 "vim_details": None,
654 }
655 self.logger.debug(
656 "task={} {} new-image={} created={}".format(
657 task_id, ro_task["target_id"], vim_image_id, created
658 )
659 )
660
661 return "DONE", ro_vim_item_update
662 except (NsWorkerException, vimconn.VimConnException) as e:
663 self.logger.error(
664 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
665 )
666 ro_vim_item_update = {
667 "vim_status": "VIM_ERROR",
668 "created": created,
669 "vim_details": str(e),
670 }
671
672 return "FAILED", ro_vim_item_update
673
674
675 class VimInteractionFlavor(VimInteractionBase):
676 def delete(self, ro_task, task_index):
677 task = ro_task["tasks"][task_index]
678 task_id = task["task_id"]
679 flavor_vim_id = ro_task["vim_info"]["vim_id"]
680 ro_vim_item_update_ok = {
681 "vim_status": "DELETED",
682 "created": False,
683 "vim_details": "DELETED",
684 "vim_id": None,
685 }
686
687 try:
688 if flavor_vim_id:
689 target_vim = self.my_vims[ro_task["target_id"]]
690 target_vim.delete_flavor(flavor_vim_id)
691 except vimconn.VimConnNotFoundException:
692 ro_vim_item_update_ok["vim_details"] = "already deleted"
693 except vimconn.VimConnException as e:
694 self.logger.error(
695 "ro_task={} vim={} del-flavor={}: {}".format(
696 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
697 )
698 )
699 ro_vim_item_update = {
700 "vim_status": "VIM_ERROR",
701 "vim_details": "Error while deleting: {}".format(e),
702 }
703
704 return "FAILED", ro_vim_item_update
705
706 self.logger.debug(
707 "task={} {} del-flavor={} {}".format(
708 task_id,
709 ro_task["target_id"],
710 flavor_vim_id,
711 ro_vim_item_update_ok.get("vim_details", ""),
712 )
713 )
714
715 return "DONE", ro_vim_item_update_ok
716
717 def new(self, ro_task, task_index, task_depends):
718 task = ro_task["tasks"][task_index]
719 task_id = task["task_id"]
720 created = False
721 created_items = {}
722 target_vim = self.my_vims[ro_task["target_id"]]
723
724 try:
725 # FIND
726 vim_flavor_id = None
727
728 if task.get("find_params"):
729 try:
730 flavor_data = task["find_params"]["flavor_data"]
731 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
732 except vimconn.VimConnNotFoundException:
733 pass
734
735 if not vim_flavor_id and task.get("params"):
736 # CREATE
737 flavor_data = task["params"]["flavor_data"]
738 vim_flavor_id = target_vim.new_flavor(flavor_data)
739 created = True
740
741 ro_vim_item_update = {
742 "vim_id": vim_flavor_id,
743 "vim_status": "DONE",
744 "created": created,
745 "created_items": created_items,
746 "vim_details": None,
747 }
748 self.logger.debug(
749 "task={} {} new-flavor={} created={}".format(
750 task_id, ro_task["target_id"], vim_flavor_id, created
751 )
752 )
753
754 return "DONE", ro_vim_item_update
755 except (vimconn.VimConnException, NsWorkerException) as e:
756 self.logger.error(
757 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
758 )
759 ro_vim_item_update = {
760 "vim_status": "VIM_ERROR",
761 "created": created,
762 "vim_details": str(e),
763 }
764
765 return "FAILED", ro_vim_item_update
766
767
768 class VimInteractionAffinityGroup(VimInteractionBase):
769 def delete(self, ro_task, task_index):
770 task = ro_task["tasks"][task_index]
771 task_id = task["task_id"]
772 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
773 ro_vim_item_update_ok = {
774 "vim_status": "DELETED",
775 "created": False,
776 "vim_details": "DELETED",
777 "vim_id": None,
778 }
779
780 try:
781 if affinity_group_vim_id:
782 target_vim = self.my_vims[ro_task["target_id"]]
783 target_vim.delete_affinity_group(affinity_group_vim_id)
784 except vimconn.VimConnNotFoundException:
785 ro_vim_item_update_ok["vim_details"] = "already deleted"
786 except vimconn.VimConnException as e:
787 self.logger.error(
788 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
789 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
790 )
791 )
792 ro_vim_item_update = {
793 "vim_status": "VIM_ERROR",
794 "vim_details": "Error while deleting: {}".format(e),
795 }
796
797 return "FAILED", ro_vim_item_update
798
799 self.logger.debug(
800 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
801 task_id,
802 ro_task["target_id"],
803 affinity_group_vim_id,
804 ro_vim_item_update_ok.get("vim_details", ""),
805 )
806 )
807
808 return "DONE", ro_vim_item_update_ok
809
810 def new(self, ro_task, task_index, task_depends):
811 task = ro_task["tasks"][task_index]
812 task_id = task["task_id"]
813 created = False
814 created_items = {}
815 target_vim = self.my_vims[ro_task["target_id"]]
816
817 try:
818 affinity_group_vim_id = None
819 affinity_group_data = None
820
821 if task.get("params"):
822 affinity_group_data = task["params"].get("affinity_group_data")
823
824 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
825 try:
826 param_affinity_group_id = task["params"]["affinity_group_data"].get(
827 "vim-affinity-group-id"
828 )
829 affinity_group_vim_id = target_vim.get_affinity_group(
830 param_affinity_group_id
831 ).get("id")
832 except vimconn.VimConnNotFoundException:
833 self.logger.error(
834 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
835 "could not be found at VIM. Creating a new one.".format(
836 task_id, ro_task["target_id"], param_affinity_group_id
837 )
838 )
839
840 if not affinity_group_vim_id and affinity_group_data:
841 affinity_group_vim_id = target_vim.new_affinity_group(
842 affinity_group_data
843 )
844 created = True
845
846 ro_vim_item_update = {
847 "vim_id": affinity_group_vim_id,
848 "vim_status": "DONE",
849 "created": created,
850 "created_items": created_items,
851 "vim_details": None,
852 }
853 self.logger.debug(
854 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
855 task_id, ro_task["target_id"], affinity_group_vim_id, created
856 )
857 )
858
859 return "DONE", ro_vim_item_update
860 except (vimconn.VimConnException, NsWorkerException) as e:
861 self.logger.error(
862 "task={} vim={} new-affinity-or-anti-affinity-group:"
863 " {}".format(task_id, ro_task["target_id"], e)
864 )
865 ro_vim_item_update = {
866 "vim_status": "VIM_ERROR",
867 "created": created,
868 "vim_details": str(e),
869 }
870
871 return "FAILED", ro_vim_item_update
872
873
874 class VimInteractionSdnNet(VimInteractionBase):
875 @staticmethod
876 def _match_pci(port_pci, mapping):
877 """
878 Check if port_pci matches with mapping
879 mapping can have brackets to indicate that several chars are accepted. e.g
880 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
881 :param port_pci: text
882 :param mapping: text, can contain brackets to indicate several chars are available
883 :return: True if matches, False otherwise
884 """
885 if not port_pci or not mapping:
886 return False
887 if port_pci == mapping:
888 return True
889
890 mapping_index = 0
891 pci_index = 0
892 while True:
893 bracket_start = mapping.find("[", mapping_index)
894
895 if bracket_start == -1:
896 break
897
898 bracket_end = mapping.find("]", bracket_start)
899 if bracket_end == -1:
900 break
901
902 length = bracket_start - mapping_index
903 if (
904 length
905 and port_pci[pci_index : pci_index + length]
906 != mapping[mapping_index:bracket_start]
907 ):
908 return False
909
910 if (
911 port_pci[pci_index + length]
912 not in mapping[bracket_start + 1 : bracket_end]
913 ):
914 return False
915
916 pci_index += length + 1
917 mapping_index = bracket_end + 1
918
919 if port_pci[pci_index:] != mapping[mapping_index:]:
920 return False
921
922 return True
923
924 def _get_interfaces(self, vlds_to_connect, vim_account_id):
925 """
926 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
927 :param vim_account_id:
928 :return:
929 """
930 interfaces = []
931
932 for vld in vlds_to_connect:
933 table, _, db_id = vld.partition(":")
934 db_id, _, vld = db_id.partition(":")
935 _, _, vld_id = vld.partition(".")
936
937 if table == "vnfrs":
938 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
939 iface_key = "vnf-vld-id"
940 else: # table == "nsrs"
941 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
942 iface_key = "ns-vld-id"
943
944 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
945
946 for db_vnfr in db_vnfrs:
947 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
948 for iface_index, interface in enumerate(vdur["interfaces"]):
949 if interface.get(iface_key) == vld_id and interface.get(
950 "type"
951 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
952 # only SR-IOV o PT
953 interface_ = interface.copy()
954 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
955 db_vnfr["_id"], vdu_index, iface_index
956 )
957
958 if vdur.get("status") == "ERROR":
959 interface_["status"] = "ERROR"
960
961 interfaces.append(interface_)
962
963 return interfaces
964
965 def refresh(self, ro_task):
966 # look for task create
967 task_create_index, _ = next(
968 i_t
969 for i_t in enumerate(ro_task["tasks"])
970 if i_t[1]
971 and i_t[1]["action"] == "CREATE"
972 and i_t[1]["status"] != "FINISHED"
973 )
974
975 return self.new(ro_task, task_create_index, None)
976
977 def new(self, ro_task, task_index, task_depends):
978
979 task = ro_task["tasks"][task_index]
980 task_id = task["task_id"]
981 target_vim = self.my_vims[ro_task["target_id"]]
982
983 sdn_net_id = ro_task["vim_info"]["vim_id"]
984
985 created_items = ro_task["vim_info"].get("created_items")
986 connected_ports = ro_task["vim_info"].get("connected_ports", [])
987 new_connected_ports = []
988 last_update = ro_task["vim_info"].get("last_update", 0)
989 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
990 error_list = []
991 created = ro_task["vim_info"].get("created", False)
992
993 try:
994 # CREATE
995 params = task["params"]
996 vlds_to_connect = params.get("vlds", [])
997 associated_vim = params.get("target_vim")
998 # external additional ports
999 additional_ports = params.get("sdn-ports") or ()
1000 _, _, vim_account_id = (
1001 (None, None, None)
1002 if associated_vim is None
1003 else associated_vim.partition(":")
1004 )
1005
1006 if associated_vim:
1007 # get associated VIM
1008 if associated_vim not in self.db_vims:
1009 self.db_vims[associated_vim] = self.db.get_one(
1010 "vim_accounts", {"_id": vim_account_id}
1011 )
1012
1013 db_vim = self.db_vims[associated_vim]
1014
1015 # look for ports to connect
1016 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1017 # print(ports)
1018
1019 sdn_ports = []
1020 pending_ports = error_ports = 0
1021 vlan_used = None
1022 sdn_need_update = False
1023
1024 for port in ports:
1025 vlan_used = port.get("vlan") or vlan_used
1026
1027 # TODO. Do not connect if already done
1028 if not port.get("compute_node") or not port.get("pci"):
1029 if port.get("status") == "ERROR":
1030 error_ports += 1
1031 else:
1032 pending_ports += 1
1033 continue
1034
1035 pmap = None
1036 compute_node_mappings = next(
1037 (
1038 c
1039 for c in db_vim["config"].get("sdn-port-mapping", ())
1040 if c and c["compute_node"] == port["compute_node"]
1041 ),
1042 None,
1043 )
1044
1045 if compute_node_mappings:
1046 # process port_mapping pci of type 0000:af:1[01].[1357]
1047 pmap = next(
1048 (
1049 p
1050 for p in compute_node_mappings["ports"]
1051 if self._match_pci(port["pci"], p.get("pci"))
1052 ),
1053 None,
1054 )
1055
1056 if not pmap:
1057 if not db_vim["config"].get("mapping_not_needed"):
1058 error_list.append(
1059 "Port mapping not found for compute_node={} pci={}".format(
1060 port["compute_node"], port["pci"]
1061 )
1062 )
1063 continue
1064
1065 pmap = {}
1066
1067 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1068 new_port = {
1069 "service_endpoint_id": pmap.get("service_endpoint_id")
1070 or service_endpoint_id,
1071 "service_endpoint_encapsulation_type": "dot1q"
1072 if port["type"] == "SR-IOV"
1073 else None,
1074 "service_endpoint_encapsulation_info": {
1075 "vlan": port.get("vlan"),
1076 "mac": port.get("mac-address"),
1077 "device_id": pmap.get("device_id") or port["compute_node"],
1078 "device_interface_id": pmap.get("device_interface_id")
1079 or port["pci"],
1080 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1081 "switch_port": pmap.get("switch_port"),
1082 "service_mapping_info": pmap.get("service_mapping_info"),
1083 },
1084 }
1085
1086 # TODO
1087 # if port["modified_at"] > last_update:
1088 # sdn_need_update = True
1089 new_connected_ports.append(port["id"]) # TODO
1090 sdn_ports.append(new_port)
1091
1092 if error_ports:
1093 error_list.append(
1094 "{} interfaces have not been created as VDU is on ERROR status".format(
1095 error_ports
1096 )
1097 )
1098
1099 # connect external ports
1100 for index, additional_port in enumerate(additional_ports):
1101 additional_port_id = additional_port.get(
1102 "service_endpoint_id"
1103 ) or "external-{}".format(index)
1104 sdn_ports.append(
1105 {
1106 "service_endpoint_id": additional_port_id,
1107 "service_endpoint_encapsulation_type": additional_port.get(
1108 "service_endpoint_encapsulation_type", "dot1q"
1109 ),
1110 "service_endpoint_encapsulation_info": {
1111 "vlan": additional_port.get("vlan") or vlan_used,
1112 "mac": additional_port.get("mac_address"),
1113 "device_id": additional_port.get("device_id"),
1114 "device_interface_id": additional_port.get(
1115 "device_interface_id"
1116 ),
1117 "switch_dpid": additional_port.get("switch_dpid")
1118 or additional_port.get("switch_id"),
1119 "switch_port": additional_port.get("switch_port"),
1120 "service_mapping_info": additional_port.get(
1121 "service_mapping_info"
1122 ),
1123 },
1124 }
1125 )
1126 new_connected_ports.append(additional_port_id)
1127 sdn_info = ""
1128
1129 # if there are more ports to connect or they have been modified, call create/update
1130 if error_list:
1131 sdn_status = "ERROR"
1132 sdn_info = "; ".join(error_list)
1133 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1134 last_update = time.time()
1135
1136 if not sdn_net_id:
1137 if len(sdn_ports) < 2:
1138 sdn_status = "ACTIVE"
1139
1140 if not pending_ports:
1141 self.logger.debug(
1142 "task={} {} new-sdn-net done, less than 2 ports".format(
1143 task_id, ro_task["target_id"]
1144 )
1145 )
1146 else:
1147 net_type = params.get("type") or "ELAN"
1148 (
1149 sdn_net_id,
1150 created_items,
1151 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1152 created = True
1153 self.logger.debug(
1154 "task={} {} new-sdn-net={} created={}".format(
1155 task_id, ro_task["target_id"], sdn_net_id, created
1156 )
1157 )
1158 else:
1159 created_items = target_vim.edit_connectivity_service(
1160 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1161 )
1162 created = True
1163 self.logger.debug(
1164 "task={} {} update-sdn-net={} created={}".format(
1165 task_id, ro_task["target_id"], sdn_net_id, created
1166 )
1167 )
1168
1169 connected_ports = new_connected_ports
1170 elif sdn_net_id:
1171 wim_status_dict = target_vim.get_connectivity_service_status(
1172 sdn_net_id, conn_info=created_items
1173 )
1174 sdn_status = wim_status_dict["sdn_status"]
1175
1176 if wim_status_dict.get("sdn_info"):
1177 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1178
1179 if wim_status_dict.get("error_msg"):
1180 sdn_info = wim_status_dict.get("error_msg") or ""
1181
1182 if pending_ports:
1183 if sdn_status != "ERROR":
1184 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1185 len(ports) - pending_ports, len(ports)
1186 )
1187
1188 if sdn_status == "ACTIVE":
1189 sdn_status = "BUILD"
1190
1191 ro_vim_item_update = {
1192 "vim_id": sdn_net_id,
1193 "vim_status": sdn_status,
1194 "created": created,
1195 "created_items": created_items,
1196 "connected_ports": connected_ports,
1197 "vim_details": sdn_info,
1198 "last_update": last_update,
1199 }
1200
1201 return sdn_status, ro_vim_item_update
1202 except Exception as e:
1203 self.logger.error(
1204 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1205 exc_info=not isinstance(
1206 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1207 ),
1208 )
1209 ro_vim_item_update = {
1210 "vim_status": "VIM_ERROR",
1211 "created": created,
1212 "vim_details": str(e),
1213 }
1214
1215 return "FAILED", ro_vim_item_update
1216
1217 def delete(self, ro_task, task_index):
1218 task = ro_task["tasks"][task_index]
1219 task_id = task["task_id"]
1220 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1221 ro_vim_item_update_ok = {
1222 "vim_status": "DELETED",
1223 "created": False,
1224 "vim_details": "DELETED",
1225 "vim_id": None,
1226 }
1227
1228 try:
1229 if sdn_vim_id:
1230 target_vim = self.my_vims[ro_task["target_id"]]
1231 target_vim.delete_connectivity_service(
1232 sdn_vim_id, ro_task["vim_info"].get("created_items")
1233 )
1234
1235 except Exception as e:
1236 if (
1237 isinstance(e, sdnconn.SdnConnectorError)
1238 and e.http_code == HTTPStatus.NOT_FOUND.value
1239 ):
1240 ro_vim_item_update_ok["vim_details"] = "already deleted"
1241 else:
1242 self.logger.error(
1243 "ro_task={} vim={} del-sdn-net={}: {}".format(
1244 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1245 ),
1246 exc_info=not isinstance(
1247 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1248 ),
1249 )
1250 ro_vim_item_update = {
1251 "vim_status": "VIM_ERROR",
1252 "vim_details": "Error while deleting: {}".format(e),
1253 }
1254
1255 return "FAILED", ro_vim_item_update
1256
1257 self.logger.debug(
1258 "task={} {} del-sdn-net={} {}".format(
1259 task_id,
1260 ro_task["target_id"],
1261 sdn_vim_id,
1262 ro_vim_item_update_ok.get("vim_details", ""),
1263 )
1264 )
1265
1266 return "DONE", ro_vim_item_update_ok
1267
1268
1269 class ConfigValidate:
1270 def __init__(self, config: Dict):
1271 self.conf = config
1272
1273 @property
1274 def active(self):
1275 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1276 if (
1277 self.conf["period"]["refresh_active"] >= 60
1278 or self.conf["period"]["refresh_active"] == -1
1279 ):
1280 return self.conf["period"]["refresh_active"]
1281
1282 return 60
1283
1284 @property
1285 def build(self):
1286 return self.conf["period"]["refresh_build"]
1287
1288 @property
1289 def image(self):
1290 return self.conf["period"]["refresh_image"]
1291
1292 @property
1293 def error(self):
1294 return self.conf["period"]["refresh_error"]
1295
1296 @property
1297 def queue_size(self):
1298 return self.conf["period"]["queue_size"]
1299
1300
1301 class NsWorker(threading.Thread):
1302 def __init__(self, worker_index, config, plugins, db):
1303 """
1304
1305 :param worker_index: thread index
1306 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1307 :param plugins: global shared dict with the loaded plugins
1308 :param db: database class instance to use
1309 """
1310 threading.Thread.__init__(self)
1311 self.config = config
1312 self.plugins = plugins
1313 self.plugin_name = "unknown"
1314 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1315 self.worker_index = worker_index
1316 # refresh periods for created items
1317 self.refresh_config = ConfigValidate(config)
1318 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1319 # targetvim: vimplugin class
1320 self.my_vims = {}
1321 # targetvim: vim information from database
1322 self.db_vims = {}
1323 # targetvim list
1324 self.vim_targets = []
1325 self.my_id = config["process_id"] + ":" + str(worker_index)
1326 self.db = db
1327 self.item2class = {
1328 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1329 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1330 "image": VimInteractionImage(
1331 self.db, self.my_vims, self.db_vims, self.logger
1332 ),
1333 "flavor": VimInteractionFlavor(
1334 self.db, self.my_vims, self.db_vims, self.logger
1335 ),
1336 "sdn_net": VimInteractionSdnNet(
1337 self.db, self.my_vims, self.db_vims, self.logger
1338 ),
1339 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1340 self.db, self.my_vims, self.db_vims, self.logger
1341 ),
1342 }
1343 self.time_last_task_processed = None
1344 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1345 self.tasks_to_delete = []
1346 # it is idle when there are not vim_targets associated
1347 self.idle = True
1348 self.task_locked_time = config["global"]["task_locked_time"]
1349
1350 def insert_task(self, task):
1351 try:
1352 self.task_queue.put(task, False)
1353 return None
1354 except queue.Full:
1355 raise NsWorkerException("timeout inserting a task")
1356
1357 def terminate(self):
1358 self.insert_task("exit")
1359
1360 def del_task(self, task):
1361 with self.task_lock:
1362 if task["status"] == "SCHEDULED":
1363 task["status"] = "SUPERSEDED"
1364 return True
1365 else: # task["status"] == "processing"
1366 self.task_lock.release()
1367 return False
1368
1369 def _process_vim_config(self, target_id, db_vim):
1370 """
1371 Process vim config, creating vim configuration files as ca_cert
1372 :param target_id: vim/sdn/wim + id
1373 :param db_vim: Vim dictionary obtained from database
1374 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1375 """
1376 if not db_vim.get("config"):
1377 return
1378
1379 file_name = ""
1380
1381 try:
1382 if db_vim["config"].get("ca_cert_content"):
1383 file_name = "{}:{}".format(target_id, self.worker_index)
1384
1385 try:
1386 mkdir(file_name)
1387 except FileExistsError:
1388 pass
1389
1390 file_name = file_name + "/ca_cert"
1391
1392 with open(file_name, "w") as f:
1393 f.write(db_vim["config"]["ca_cert_content"])
1394 del db_vim["config"]["ca_cert_content"]
1395 db_vim["config"]["ca_cert"] = file_name
1396 except Exception as e:
1397 raise NsWorkerException(
1398 "Error writing to file '{}': {}".format(file_name, e)
1399 )
1400
1401 def _load_plugin(self, name, type="vim"):
1402 # type can be vim or sdn
1403 if "rovim_dummy" not in self.plugins:
1404 self.plugins["rovim_dummy"] = VimDummyConnector
1405
1406 if "rosdn_dummy" not in self.plugins:
1407 self.plugins["rosdn_dummy"] = SdnDummyConnector
1408
1409 if name in self.plugins:
1410 return self.plugins[name]
1411
1412 try:
1413 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1414 self.plugins[name] = ep.load()
1415 except Exception as e:
1416 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1417
1418 if name and name not in self.plugins:
1419 raise NsWorkerException(
1420 "Plugin 'osm_{n}' has not been installed".format(n=name)
1421 )
1422
1423 return self.plugins[name]
1424
1425 def _unload_vim(self, target_id):
1426 """
1427 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1428 :param target_id: Contains type:_id; where type can be 'vim', ...
1429 :return: None.
1430 """
1431 try:
1432 self.db_vims.pop(target_id, None)
1433 self.my_vims.pop(target_id, None)
1434
1435 if target_id in self.vim_targets:
1436 self.vim_targets.remove(target_id)
1437
1438 self.logger.info("Unloaded {}".format(target_id))
1439 rmtree("{}:{}".format(target_id, self.worker_index))
1440 except FileNotFoundError:
1441 pass # this is raised by rmtree if folder does not exist
1442 except Exception as e:
1443 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1444
1445 def _check_vim(self, target_id):
1446 """
1447 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1448 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1449 :return: None.
1450 """
1451 target, _, _id = target_id.partition(":")
1452 now = time.time()
1453 update_dict = {}
1454 unset_dict = {}
1455 op_text = ""
1456 step = ""
1457 loaded = target_id in self.vim_targets
1458 target_database = (
1459 "vim_accounts"
1460 if target == "vim"
1461 else "wim_accounts"
1462 if target == "wim"
1463 else "sdns"
1464 )
1465
1466 try:
1467 step = "Getting {} from db".format(target_id)
1468 db_vim = self.db.get_one(target_database, {"_id": _id})
1469
1470 for op_index, operation in enumerate(
1471 db_vim["_admin"].get("operations", ())
1472 ):
1473 if operation["operationState"] != "PROCESSING":
1474 continue
1475
1476 locked_at = operation.get("locked_at")
1477
1478 if locked_at is not None and locked_at >= now - self.task_locked_time:
1479 # some other thread is doing this operation
1480 return
1481
1482 # lock
1483 op_text = "_admin.operations.{}.".format(op_index)
1484
1485 if not self.db.set_one(
1486 target_database,
1487 q_filter={
1488 "_id": _id,
1489 op_text + "operationState": "PROCESSING",
1490 op_text + "locked_at": locked_at,
1491 },
1492 update_dict={
1493 op_text + "locked_at": now,
1494 "admin.current_operation": op_index,
1495 },
1496 fail_on_empty=False,
1497 ):
1498 return
1499
1500 unset_dict[op_text + "locked_at"] = None
1501 unset_dict["current_operation"] = None
1502 step = "Loading " + target_id
1503 error_text = self._load_vim(target_id)
1504
1505 if not error_text:
1506 step = "Checking connectivity"
1507
1508 if target == "vim":
1509 self.my_vims[target_id].check_vim_connectivity()
1510 else:
1511 self.my_vims[target_id].check_credentials()
1512
1513 update_dict["_admin.operationalState"] = "ENABLED"
1514 update_dict["_admin.detailed-status"] = ""
1515 unset_dict[op_text + "detailed-status"] = None
1516 update_dict[op_text + "operationState"] = "COMPLETED"
1517
1518 return
1519
1520 except Exception as e:
1521 error_text = "{}: {}".format(step, e)
1522 self.logger.error("{} for {}: {}".format(step, target_id, e))
1523
1524 finally:
1525 if update_dict or unset_dict:
1526 if error_text:
1527 update_dict[op_text + "operationState"] = "FAILED"
1528 update_dict[op_text + "detailed-status"] = error_text
1529 unset_dict.pop(op_text + "detailed-status", None)
1530 update_dict["_admin.operationalState"] = "ERROR"
1531 update_dict["_admin.detailed-status"] = error_text
1532
1533 if op_text:
1534 update_dict[op_text + "statusEnteredTime"] = now
1535
1536 self.db.set_one(
1537 target_database,
1538 q_filter={"_id": _id},
1539 update_dict=update_dict,
1540 unset=unset_dict,
1541 fail_on_empty=False,
1542 )
1543
1544 if not loaded:
1545 self._unload_vim(target_id)
1546
1547 def _reload_vim(self, target_id):
1548 if target_id in self.vim_targets:
1549 self._load_vim(target_id)
1550 else:
1551 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1552 # just remove it to force load again next time it is needed
1553 self.db_vims.pop(target_id, None)
1554
1555 def _load_vim(self, target_id):
1556 """
1557 Load or reload a vim_account, sdn_controller or wim_account.
1558 Read content from database, load the plugin if not loaded.
1559 In case of error loading the plugin, it load a failing VIM_connector
1560 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1561 :param target_id: Contains type:_id; where type can be 'vim', ...
1562 :return: None if ok, descriptive text if error
1563 """
1564 target, _, _id = target_id.partition(":")
1565 target_database = (
1566 "vim_accounts"
1567 if target == "vim"
1568 else "wim_accounts"
1569 if target == "wim"
1570 else "sdns"
1571 )
1572 plugin_name = ""
1573 vim = None
1574
1575 try:
1576 step = "Getting {}={} from db".format(target, _id)
1577 # TODO process for wim, sdnc, ...
1578 vim = self.db.get_one(target_database, {"_id": _id})
1579
1580 # if deep_get(vim, "config", "sdn-controller"):
1581 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1582 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1583
1584 step = "Decrypting password"
1585 schema_version = vim.get("schema_version")
1586 self.db.encrypt_decrypt_fields(
1587 vim,
1588 "decrypt",
1589 fields=("password", "secret"),
1590 schema_version=schema_version,
1591 salt=_id,
1592 )
1593 self._process_vim_config(target_id, vim)
1594
1595 if target == "vim":
1596 plugin_name = "rovim_" + vim["vim_type"]
1597 step = "Loading plugin '{}'".format(plugin_name)
1598 vim_module_conn = self._load_plugin(plugin_name)
1599 step = "Loading {}'".format(target_id)
1600 self.my_vims[target_id] = vim_module_conn(
1601 uuid=vim["_id"],
1602 name=vim["name"],
1603 tenant_id=vim.get("vim_tenant_id"),
1604 tenant_name=vim.get("vim_tenant_name"),
1605 url=vim["vim_url"],
1606 url_admin=None,
1607 user=vim["vim_user"],
1608 passwd=vim["vim_password"],
1609 config=vim.get("config") or {},
1610 persistent_info={},
1611 )
1612 else: # sdn
1613 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1614 step = "Loading plugin '{}'".format(plugin_name)
1615 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1616 step = "Loading {}'".format(target_id)
1617 wim = deepcopy(vim)
1618 wim_config = wim.pop("config", {}) or {}
1619 wim["uuid"] = wim["_id"]
1620 if "url" in wim and "wim_url" not in wim:
1621 wim["wim_url"] = wim["url"]
1622 elif "url" not in wim and "wim_url" in wim:
1623 wim["url"] = wim["wim_url"]
1624
1625 if wim.get("dpid"):
1626 wim_config["dpid"] = wim.pop("dpid")
1627
1628 if wim.get("switch_id"):
1629 wim_config["switch_id"] = wim.pop("switch_id")
1630
1631 # wim, wim_account, config
1632 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1633 self.db_vims[target_id] = vim
1634 self.error_status = None
1635
1636 self.logger.info(
1637 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1638 )
1639 except Exception as e:
1640 self.logger.error(
1641 "Cannot load {} plugin={}: {} {}".format(
1642 target_id, plugin_name, step, e
1643 )
1644 )
1645
1646 self.db_vims[target_id] = vim or {}
1647 self.db_vims[target_id] = FailingConnector(str(e))
1648 error_status = "{} Error: {}".format(step, e)
1649
1650 return error_status
1651 finally:
1652 if target_id not in self.vim_targets:
1653 self.vim_targets.append(target_id)
1654
1655 def _get_db_task(self):
1656 """
1657 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1658 :return: None
1659 """
1660 now = time.time()
1661
1662 if not self.time_last_task_processed:
1663 self.time_last_task_processed = now
1664
1665 try:
1666 while True:
1667 """
1668 # Log RO tasks only when loglevel is DEBUG
1669 if self.logger.getEffectiveLevel() == logging.DEBUG:
1670 self._log_ro_task(
1671 None,
1672 None,
1673 None,
1674 "TASK_WF",
1675 "task_locked_time="
1676 + str(self.task_locked_time)
1677 + " "
1678 + "time_last_task_processed="
1679 + str(self.time_last_task_processed)
1680 + " "
1681 + "now="
1682 + str(now),
1683 )
1684 """
1685 locked = self.db.set_one(
1686 "ro_tasks",
1687 q_filter={
1688 "target_id": self.vim_targets,
1689 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1690 "locked_at.lt": now - self.task_locked_time,
1691 "to_check_at.lt": self.time_last_task_processed,
1692 "to_check_at.gt": -1,
1693 },
1694 update_dict={"locked_by": self.my_id, "locked_at": now},
1695 fail_on_empty=False,
1696 )
1697
1698 if locked:
1699 # read and return
1700 ro_task = self.db.get_one(
1701 "ro_tasks",
1702 q_filter={
1703 "target_id": self.vim_targets,
1704 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1705 "locked_at": now,
1706 },
1707 )
1708 return ro_task
1709
1710 if self.time_last_task_processed == now:
1711 self.time_last_task_processed = None
1712 return None
1713 else:
1714 self.time_last_task_processed = now
1715 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1716
1717 except DbException as e:
1718 self.logger.error("Database exception at _get_db_task: {}".format(e))
1719 except Exception as e:
1720 self.logger.critical(
1721 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1722 )
1723
1724 return None
1725
1726 def _get_db_all_tasks(self):
1727 """
1728 Read all content of table ro_tasks to log it
1729 :return: None
1730 """
1731 try:
1732 # Checking the content of the BD:
1733
1734 # read and return
1735 ro_task = self.db.get_list("ro_tasks")
1736 for rt in ro_task:
1737 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1738 return ro_task
1739
1740 except DbException as e:
1741 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1742 except Exception as e:
1743 self.logger.critical(
1744 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1745 )
1746
1747 return None
1748
1749 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1750 """
1751 Generate a log with the following format:
1752
1753 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1754 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1755 task_array_index;task_id;task_action;task_item;task_args
1756
1757 Example:
1758
1759 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1760 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1761 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1762 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1763 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1764 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1765 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1766 """
1767 try:
1768 line = []
1769 i = 0
1770 if ro_task is not None and isinstance(ro_task, dict):
1771 for t in ro_task["tasks"]:
1772 line.clear()
1773 line.append(mark)
1774 line.append(event)
1775 line.append(ro_task.get("_id", ""))
1776 line.append(str(ro_task.get("locked_at", "")))
1777 line.append(str(ro_task.get("modified_at", "")))
1778 line.append(str(ro_task.get("created_at", "")))
1779 line.append(str(ro_task.get("to_check_at", "")))
1780 line.append(str(ro_task.get("locked_by", "")))
1781 line.append(str(ro_task.get("target_id", "")))
1782 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1783 line.append(str(ro_task.get("vim_info", "")))
1784 line.append(str(ro_task.get("tasks", "")))
1785 if isinstance(t, dict):
1786 line.append(str(t.get("status", "")))
1787 line.append(str(t.get("action_id", "")))
1788 line.append(str(i))
1789 line.append(str(t.get("task_id", "")))
1790 line.append(str(t.get("action", "")))
1791 line.append(str(t.get("item", "")))
1792 line.append(str(t.get("find_params", "")))
1793 line.append(str(t.get("params", "")))
1794 else:
1795 line.extend([""] * 2)
1796 line.append(str(i))
1797 line.extend([""] * 5)
1798
1799 i += 1
1800 self.logger.debug(";".join(line))
1801 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1802 i = 0
1803 while True:
1804 st = "tasks.{}.status".format(i)
1805 if st not in db_ro_task_update:
1806 break
1807 line.clear()
1808 line.append(mark)
1809 line.append(event)
1810 line.append(db_ro_task_update.get("_id", ""))
1811 line.append(str(db_ro_task_update.get("locked_at", "")))
1812 line.append(str(db_ro_task_update.get("modified_at", "")))
1813 line.append("")
1814 line.append(str(db_ro_task_update.get("to_check_at", "")))
1815 line.append(str(db_ro_task_update.get("locked_by", "")))
1816 line.append("")
1817 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1818 line.append("")
1819 line.append(str(db_ro_task_update.get("vim_info", "")))
1820 line.append(str(str(db_ro_task_update).count(".status")))
1821 line.append(db_ro_task_update.get(st, ""))
1822 line.append("")
1823 line.append(str(i))
1824 line.extend([""] * 3)
1825 i += 1
1826 self.logger.debug(";".join(line))
1827
1828 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1829 line.clear()
1830 line.append(mark)
1831 line.append(event)
1832 line.append(db_ro_task_delete.get("_id", ""))
1833 line.append("")
1834 line.append(db_ro_task_delete.get("modified_at", ""))
1835 line.extend([""] * 13)
1836 self.logger.debug(";".join(line))
1837
1838 else:
1839 line.clear()
1840 line.append(mark)
1841 line.append(event)
1842 line.extend([""] * 16)
1843 self.logger.debug(";".join(line))
1844
1845 except Exception as e:
1846 self.logger.error("Error logging ro_task: {}".format(e))
1847
1848 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1849 """
1850 Determine if this task need to be done or superseded
1851 :return: None
1852 """
1853 my_task = ro_task["tasks"][task_index]
1854 task_id = my_task["task_id"]
1855 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1856 "created_items", False
1857 )
1858
1859 if my_task["status"] == "FAILED":
1860 return None, None # TODO need to be retry??
1861
1862 try:
1863 for index, task in enumerate(ro_task["tasks"]):
1864 if index == task_index or not task:
1865 continue # own task
1866
1867 if (
1868 my_task["target_record"] == task["target_record"]
1869 and task["action"] == "CREATE"
1870 ):
1871 # set to finished
1872 db_update["tasks.{}.status".format(index)] = task[
1873 "status"
1874 ] = "FINISHED"
1875 elif task["action"] == "CREATE" and task["status"] not in (
1876 "FINISHED",
1877 "SUPERSEDED",
1878 ):
1879 needed_delete = False
1880
1881 if needed_delete:
1882 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1883 else:
1884 return "SUPERSEDED", None
1885 except Exception as e:
1886 if not isinstance(e, NsWorkerException):
1887 self.logger.critical(
1888 "Unexpected exception at _delete_task task={}: {}".format(
1889 task_id, e
1890 ),
1891 exc_info=True,
1892 )
1893
1894 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1895
1896 def _create_task(self, ro_task, task_index, task_depends, db_update):
1897 """
1898 Determine if this task need to create something at VIM
1899 :return: None
1900 """
1901 my_task = ro_task["tasks"][task_index]
1902 task_id = my_task["task_id"]
1903 task_status = None
1904
1905 if my_task["status"] == "FAILED":
1906 return None, None # TODO need to be retry??
1907 elif my_task["status"] == "SCHEDULED":
1908 # check if already created by another task
1909 for index, task in enumerate(ro_task["tasks"]):
1910 if index == task_index or not task:
1911 continue # own task
1912
1913 if task["action"] == "CREATE" and task["status"] not in (
1914 "SCHEDULED",
1915 "FINISHED",
1916 "SUPERSEDED",
1917 ):
1918 return task["status"], "COPY_VIM_INFO"
1919
1920 try:
1921 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1922 ro_task, task_index, task_depends
1923 )
1924 # TODO update other CREATE tasks
1925 except Exception as e:
1926 if not isinstance(e, NsWorkerException):
1927 self.logger.error(
1928 "Error executing task={}: {}".format(task_id, e), exc_info=True
1929 )
1930
1931 task_status = "FAILED"
1932 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1933 # TODO update ro_vim_item_update
1934
1935 return task_status, ro_vim_item_update
1936 else:
1937 return None, None
1938
1939 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1940 """
1941 Look for dependency task
1942 :param task_id: Can be one of
1943 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1944 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1945 3. task.task_id: "<action_id>:number"
1946 :param ro_task:
1947 :param target_id:
1948 :return: database ro_task plus index of task
1949 """
1950 if (
1951 task_id.startswith("vim:")
1952 or task_id.startswith("sdn:")
1953 or task_id.startswith("wim:")
1954 ):
1955 target_id, _, task_id = task_id.partition(" ")
1956
1957 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1958 ro_task_dependency = self.db.get_one(
1959 "ro_tasks",
1960 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1961 fail_on_empty=False,
1962 )
1963
1964 if ro_task_dependency:
1965 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1966 if task["target_record_id"] == task_id:
1967 return ro_task_dependency, task_index
1968
1969 else:
1970 if ro_task:
1971 for task_index, task in enumerate(ro_task["tasks"]):
1972 if task and task["task_id"] == task_id:
1973 return ro_task, task_index
1974
1975 ro_task_dependency = self.db.get_one(
1976 "ro_tasks",
1977 q_filter={
1978 "tasks.ANYINDEX.task_id": task_id,
1979 "tasks.ANYINDEX.target_record.ne": None,
1980 },
1981 fail_on_empty=False,
1982 )
1983
1984 if ro_task_dependency:
1985 for task_index, task in ro_task_dependency["tasks"]:
1986 if task["task_id"] == task_id:
1987 return ro_task_dependency, task_index
1988 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1989
1990 def update_vm_refresh(self):
1991 """Enables the VM status updates if self.refresh_config.active parameter
1992 is not -1 and than updates the DB accordingly
1993
1994 """
1995 try:
1996 self.logger.debug("Checking if VM status update config")
1997 next_refresh = time.time()
1998 if self.refresh_config.active == -1:
1999 next_refresh = -1
2000 else:
2001 next_refresh += self.refresh_config.active
2002
2003 if next_refresh != -1:
2004 db_ro_task_update = {}
2005 now = time.time()
2006 next_check_at = now + (24 * 60 * 60)
2007 next_check_at = min(next_check_at, next_refresh)
2008 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2009 db_ro_task_update["to_check_at"] = next_check_at
2010
2011 self.logger.debug(
2012 "Finding tasks which to be updated to enable VM status updates"
2013 )
2014 refresh_tasks = self.db.get_list(
2015 "ro_tasks",
2016 q_filter={
2017 "tasks.status": "DONE",
2018 "to_check_at.lt": 0,
2019 },
2020 )
2021 self.logger.debug("Updating tasks to change the to_check_at status")
2022 for task in refresh_tasks:
2023 q_filter = {
2024 "_id": task["_id"],
2025 }
2026 self.db.set_one(
2027 "ro_tasks",
2028 q_filter=q_filter,
2029 update_dict=db_ro_task_update,
2030 fail_on_empty=True,
2031 )
2032
2033 except Exception as e:
2034 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2035
2036 def _process_pending_tasks(self, ro_task):
2037 ro_task_id = ro_task["_id"]
2038 now = time.time()
2039 # one day
2040 next_check_at = now + (24 * 60 * 60)
2041 db_ro_task_update = {}
2042
2043 def _update_refresh(new_status):
2044 # compute next_refresh
2045 nonlocal task
2046 nonlocal next_check_at
2047 nonlocal db_ro_task_update
2048 nonlocal ro_task
2049
2050 next_refresh = time.time()
2051
2052 if task["item"] in ("image", "flavor"):
2053 next_refresh += self.refresh_config.image
2054 elif new_status == "BUILD":
2055 next_refresh += self.refresh_config.build
2056 elif new_status == "DONE":
2057 if self.refresh_config.active == -1:
2058 next_refresh = -1
2059 else:
2060 next_refresh += self.refresh_config.active
2061 else:
2062 next_refresh += self.refresh_config.error
2063
2064 next_check_at = min(next_check_at, next_refresh)
2065 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2066 ro_task["vim_info"]["refresh_at"] = next_refresh
2067
2068 try:
2069 """
2070 # Log RO tasks only when loglevel is DEBUG
2071 if self.logger.getEffectiveLevel() == logging.DEBUG:
2072 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2073 """
2074 # Check if vim status refresh is enabled again
2075 self.update_vm_refresh()
2076 # 0: get task_status_create
2077 lock_object = None
2078 task_status_create = None
2079 task_create = next(
2080 (
2081 t
2082 for t in ro_task["tasks"]
2083 if t
2084 and t["action"] == "CREATE"
2085 and t["status"] in ("BUILD", "DONE")
2086 ),
2087 None,
2088 )
2089
2090 if task_create:
2091 task_status_create = task_create["status"]
2092
2093 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2094 for task_action in ("DELETE", "CREATE", "EXEC"):
2095 db_vim_update = None
2096 new_status = None
2097
2098 for task_index, task in enumerate(ro_task["tasks"]):
2099 if not task:
2100 continue # task deleted
2101
2102 task_depends = {}
2103 target_update = None
2104
2105 if (
2106 (
2107 task_action in ("DELETE", "EXEC")
2108 and task["status"] not in ("SCHEDULED", "BUILD")
2109 )
2110 or task["action"] != task_action
2111 or (
2112 task_action == "CREATE"
2113 and task["status"] in ("FINISHED", "SUPERSEDED")
2114 )
2115 ):
2116 continue
2117
2118 task_path = "tasks.{}.status".format(task_index)
2119 try:
2120 db_vim_info_update = None
2121
2122 if task["status"] == "SCHEDULED":
2123 # check if tasks that this depends on have been completed
2124 dependency_not_completed = False
2125
2126 for dependency_task_id in task.get("depends_on") or ():
2127 (
2128 dependency_ro_task,
2129 dependency_task_index,
2130 ) = self._get_dependency(
2131 dependency_task_id, target_id=ro_task["target_id"]
2132 )
2133 dependency_task = dependency_ro_task["tasks"][
2134 dependency_task_index
2135 ]
2136
2137 if dependency_task["status"] == "SCHEDULED":
2138 dependency_not_completed = True
2139 next_check_at = min(
2140 next_check_at, dependency_ro_task["to_check_at"]
2141 )
2142 # must allow dependent task to be processed first
2143 # to do this set time after last_task_processed
2144 next_check_at = max(
2145 self.time_last_task_processed, next_check_at
2146 )
2147 break
2148 elif dependency_task["status"] == "FAILED":
2149 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2150 task["action"],
2151 task["item"],
2152 dependency_task["action"],
2153 dependency_task["item"],
2154 dependency_task_id,
2155 dependency_ro_task["vim_info"].get(
2156 "vim_details"
2157 ),
2158 )
2159 self.logger.error(
2160 "task={} {}".format(task["task_id"], error_text)
2161 )
2162 raise NsWorkerException(error_text)
2163
2164 task_depends[dependency_task_id] = dependency_ro_task[
2165 "vim_info"
2166 ]["vim_id"]
2167 task_depends[
2168 "TASK-{}".format(dependency_task_id)
2169 ] = dependency_ro_task["vim_info"]["vim_id"]
2170
2171 if dependency_not_completed:
2172 # TODO set at vim_info.vim_details that it is waiting
2173 continue
2174
2175 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2176 # the task of renew this locking. It will update database locket_at periodically
2177 if not lock_object:
2178 lock_object = LockRenew.add_lock_object(
2179 "ro_tasks", ro_task, self
2180 )
2181
2182 if task["action"] == "DELETE":
2183 (new_status, db_vim_info_update,) = self._delete_task(
2184 ro_task, task_index, task_depends, db_ro_task_update
2185 )
2186 new_status = (
2187 "FINISHED" if new_status == "DONE" else new_status
2188 )
2189 # ^with FINISHED instead of DONE it will not be refreshing
2190
2191 if new_status in ("FINISHED", "SUPERSEDED"):
2192 target_update = "DELETE"
2193 elif task["action"] == "EXEC":
2194 (
2195 new_status,
2196 db_vim_info_update,
2197 db_task_update,
2198 ) = self.item2class[task["item"]].exec(
2199 ro_task, task_index, task_depends
2200 )
2201 new_status = (
2202 "FINISHED" if new_status == "DONE" else new_status
2203 )
2204 # ^with FINISHED instead of DONE it will not be refreshing
2205
2206 if db_task_update:
2207 # load into database the modified db_task_update "retries" and "next_retry"
2208 if db_task_update.get("retries"):
2209 db_ro_task_update[
2210 "tasks.{}.retries".format(task_index)
2211 ] = db_task_update["retries"]
2212
2213 next_check_at = time.time() + db_task_update.get(
2214 "next_retry", 60
2215 )
2216 target_update = None
2217 elif task["action"] == "CREATE":
2218 if task["status"] == "SCHEDULED":
2219 if task_status_create:
2220 new_status = task_status_create
2221 target_update = "COPY_VIM_INFO"
2222 else:
2223 new_status, db_vim_info_update = self.item2class[
2224 task["item"]
2225 ].new(ro_task, task_index, task_depends)
2226 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2227 _update_refresh(new_status)
2228 else:
2229 refresh_at = ro_task["vim_info"]["refresh_at"]
2230 if refresh_at and refresh_at != -1 and now > refresh_at:
2231 new_status, db_vim_info_update = self.item2class[
2232 task["item"]
2233 ].refresh(ro_task)
2234 _update_refresh(new_status)
2235 else:
2236 # The refresh is updated to avoid set the value of "refresh_at" to
2237 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2238 # because it can happen that in this case the task is never processed
2239 _update_refresh(task["status"])
2240
2241 except Exception as e:
2242 new_status = "FAILED"
2243 db_vim_info_update = {
2244 "vim_status": "VIM_ERROR",
2245 "vim_details": str(e),
2246 }
2247
2248 if not isinstance(
2249 e, (NsWorkerException, vimconn.VimConnException)
2250 ):
2251 self.logger.error(
2252 "Unexpected exception at _delete_task task={}: {}".format(
2253 task["task_id"], e
2254 ),
2255 exc_info=True,
2256 )
2257
2258 try:
2259 if db_vim_info_update:
2260 db_vim_update = db_vim_info_update.copy()
2261 db_ro_task_update.update(
2262 {
2263 "vim_info." + k: v
2264 for k, v in db_vim_info_update.items()
2265 }
2266 )
2267 ro_task["vim_info"].update(db_vim_info_update)
2268
2269 if new_status:
2270 if task_action == "CREATE":
2271 task_status_create = new_status
2272 db_ro_task_update[task_path] = new_status
2273
2274 if target_update or db_vim_update:
2275 if target_update == "DELETE":
2276 self._update_target(task, None)
2277 elif target_update == "COPY_VIM_INFO":
2278 self._update_target(task, ro_task["vim_info"])
2279 else:
2280 self._update_target(task, db_vim_update)
2281
2282 except Exception as e:
2283 if (
2284 isinstance(e, DbException)
2285 and e.http_code == HTTPStatus.NOT_FOUND
2286 ):
2287 # if the vnfrs or nsrs has been removed from database, this task must be removed
2288 self.logger.debug(
2289 "marking to delete task={}".format(task["task_id"])
2290 )
2291 self.tasks_to_delete.append(task)
2292 else:
2293 self.logger.error(
2294 "Unexpected exception at _update_target task={}: {}".format(
2295 task["task_id"], e
2296 ),
2297 exc_info=True,
2298 )
2299
2300 locked_at = ro_task["locked_at"]
2301
2302 if lock_object:
2303 locked_at = [
2304 lock_object["locked_at"],
2305 lock_object["locked_at"] + self.task_locked_time,
2306 ]
2307 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2308 # contain exactly locked_at + self.task_locked_time
2309 LockRenew.remove_lock_object(lock_object)
2310
2311 q_filter = {
2312 "_id": ro_task["_id"],
2313 "to_check_at": ro_task["to_check_at"],
2314 "locked_at": locked_at,
2315 }
2316 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2317 # outside this task (by ro_nbi) do not update it
2318 db_ro_task_update["locked_by"] = None
2319 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2320 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2321 db_ro_task_update["modified_at"] = now
2322 db_ro_task_update["to_check_at"] = next_check_at
2323
2324 """
2325 # Log RO tasks only when loglevel is DEBUG
2326 if self.logger.getEffectiveLevel() == logging.DEBUG:
2327 db_ro_task_update_log = db_ro_task_update.copy()
2328 db_ro_task_update_log["_id"] = q_filter["_id"]
2329 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2330 """
2331
2332 if not self.db.set_one(
2333 "ro_tasks",
2334 update_dict=db_ro_task_update,
2335 q_filter=q_filter,
2336 fail_on_empty=False,
2337 ):
2338 del db_ro_task_update["to_check_at"]
2339 del q_filter["to_check_at"]
2340 """
2341 # Log RO tasks only when loglevel is DEBUG
2342 if self.logger.getEffectiveLevel() == logging.DEBUG:
2343 self._log_ro_task(
2344 None,
2345 db_ro_task_update_log,
2346 None,
2347 "TASK_WF",
2348 "SET_TASK " + str(q_filter),
2349 )
2350 """
2351 self.db.set_one(
2352 "ro_tasks",
2353 q_filter=q_filter,
2354 update_dict=db_ro_task_update,
2355 fail_on_empty=True,
2356 )
2357 except DbException as e:
2358 self.logger.error(
2359 "ro_task={} Error updating database {}".format(ro_task_id, e)
2360 )
2361 except Exception as e:
2362 self.logger.error(
2363 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2364 )
2365
2366 def _update_target(self, task, ro_vim_item_update):
2367 table, _, temp = task["target_record"].partition(":")
2368 _id, _, path_vim_status = temp.partition(":")
2369 path_item = path_vim_status[: path_vim_status.rfind(".")]
2370 path_item = path_item[: path_item.rfind(".")]
2371 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2372 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2373
2374 if ro_vim_item_update:
2375 update_dict = {
2376 path_vim_status + "." + k: v
2377 for k, v in ro_vim_item_update.items()
2378 if k
2379 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2380 }
2381
2382 if path_vim_status.startswith("vdur."):
2383 # for backward compatibility, add vdur.name apart from vdur.vim_name
2384 if ro_vim_item_update.get("vim_name"):
2385 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2386
2387 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2388 if ro_vim_item_update.get("vim_id"):
2389 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2390
2391 # update general status
2392 if ro_vim_item_update.get("vim_status"):
2393 update_dict[path_item + ".status"] = ro_vim_item_update[
2394 "vim_status"
2395 ]
2396
2397 if ro_vim_item_update.get("interfaces"):
2398 path_interfaces = path_item + ".interfaces"
2399
2400 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2401 if iface:
2402 update_dict.update(
2403 {
2404 path_interfaces + ".{}.".format(i) + k: v
2405 for k, v in iface.items()
2406 if k in ("vlan", "compute_node", "pci")
2407 }
2408 )
2409
2410 # put ip_address and mac_address with ip-address and mac-address
2411 if iface.get("ip_address"):
2412 update_dict[
2413 path_interfaces + ".{}.".format(i) + "ip-address"
2414 ] = iface["ip_address"]
2415
2416 if iface.get("mac_address"):
2417 update_dict[
2418 path_interfaces + ".{}.".format(i) + "mac-address"
2419 ] = iface["mac_address"]
2420
2421 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2422 update_dict["ip-address"] = iface.get("ip_address").split(
2423 ";"
2424 )[0]
2425
2426 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2427 update_dict[path_item + ".ip-address"] = iface.get(
2428 "ip_address"
2429 ).split(";")[0]
2430
2431 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2432 else:
2433 update_dict = {path_item + ".status": "DELETED"}
2434 self.db.set_one(
2435 table,
2436 q_filter={"_id": _id},
2437 update_dict=update_dict,
2438 unset={path_vim_status: None},
2439 )
2440
2441 def _process_delete_db_tasks(self):
2442 """
2443 Delete task from database because vnfrs or nsrs or both have been deleted
2444 :return: None. Uses and modify self.tasks_to_delete
2445 """
2446 while self.tasks_to_delete:
2447 task = self.tasks_to_delete[0]
2448 vnfrs_deleted = None
2449 nsr_id = task["nsr_id"]
2450
2451 if task["target_record"].startswith("vnfrs:"):
2452 # check if nsrs is present
2453 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2454 vnfrs_deleted = task["target_record"].split(":")[1]
2455
2456 try:
2457 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2458 except Exception as e:
2459 self.logger.error(
2460 "Error deleting task={}: {}".format(task["task_id"], e)
2461 )
2462 self.tasks_to_delete.pop(0)
2463
2464 @staticmethod
2465 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2466 """
2467 Static method because it is called from osm_ng_ro.ns
2468 :param db: instance of database to use
2469 :param nsr_id: affected nsrs id
2470 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2471 :return: None, exception is fails
2472 """
2473 retries = 5
2474 for retry in range(retries):
2475 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2476 now = time.time()
2477 conflict = False
2478
2479 for ro_task in ro_tasks:
2480 db_update = {}
2481 to_delete_ro_task = True
2482
2483 for index, task in enumerate(ro_task["tasks"]):
2484 if not task:
2485 pass
2486 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2487 vnfrs_deleted
2488 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2489 ):
2490 db_update["tasks.{}".format(index)] = None
2491 else:
2492 # used by other nsr, ro_task cannot be deleted
2493 to_delete_ro_task = False
2494
2495 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2496 if to_delete_ro_task:
2497 if not db.del_one(
2498 "ro_tasks",
2499 q_filter={
2500 "_id": ro_task["_id"],
2501 "modified_at": ro_task["modified_at"],
2502 },
2503 fail_on_empty=False,
2504 ):
2505 conflict = True
2506 elif db_update:
2507 db_update["modified_at"] = now
2508 if not db.set_one(
2509 "ro_tasks",
2510 q_filter={
2511 "_id": ro_task["_id"],
2512 "modified_at": ro_task["modified_at"],
2513 },
2514 update_dict=db_update,
2515 fail_on_empty=False,
2516 ):
2517 conflict = True
2518 if not conflict:
2519 return
2520 else:
2521 raise NsWorkerException("Exceeded {} retries".format(retries))
2522
2523 def run(self):
2524 # load database
2525 self.logger.info("Starting")
2526 while True:
2527 # step 1: get commands from queue
2528 try:
2529 if self.vim_targets:
2530 task = self.task_queue.get(block=False)
2531 else:
2532 if not self.idle:
2533 self.logger.debug("enters in idle state")
2534 self.idle = True
2535 task = self.task_queue.get(block=True)
2536 self.idle = False
2537
2538 if task[0] == "terminate":
2539 break
2540 elif task[0] == "load_vim":
2541 self.logger.info("order to load vim {}".format(task[1]))
2542 self._load_vim(task[1])
2543 elif task[0] == "unload_vim":
2544 self.logger.info("order to unload vim {}".format(task[1]))
2545 self._unload_vim(task[1])
2546 elif task[0] == "reload_vim":
2547 self._reload_vim(task[1])
2548 elif task[0] == "check_vim":
2549 self.logger.info("order to check vim {}".format(task[1]))
2550 self._check_vim(task[1])
2551 continue
2552 except Exception as e:
2553 if isinstance(e, queue.Empty):
2554 pass
2555 else:
2556 self.logger.critical(
2557 "Error processing task: {}".format(e), exc_info=True
2558 )
2559
2560 # step 2: process pending_tasks, delete not needed tasks
2561 try:
2562 if self.tasks_to_delete:
2563 self._process_delete_db_tasks()
2564 busy = False
2565 """
2566 # Log RO tasks only when loglevel is DEBUG
2567 if self.logger.getEffectiveLevel() == logging.DEBUG:
2568 _ = self._get_db_all_tasks()
2569 """
2570 ro_task = self._get_db_task()
2571 if ro_task:
2572 self._process_pending_tasks(ro_task)
2573 busy = True
2574 if not busy:
2575 time.sleep(5)
2576 except Exception as e:
2577 self.logger.critical(
2578 "Unexpected exception at run: " + str(e), exc_info=True
2579 )
2580
2581 self.logger.info("Finishing")