Feature 10909: Heal operation for VDU. Fix virtual machine deletion and volume deleti...
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from unittest.mock import Mock
37
38 from importlib_metadata import entry_points
39 from osm_common.dbbase import DbException
40 from osm_ng_ro.vim_admin import LockRenew
41 from osm_ro_plugin import sdnconn, vimconn
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin.vim_dummy import VimDummyConnector
44 import yaml
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 "vim_message": None,
215 }
216 self.logger.debug(
217 "task={} {} new-net={} created={}".format(
218 task_id, ro_task["target_id"], vim_net_id, created
219 )
220 )
221
222 return "BUILD", ro_vim_item_update
223 except (vimconn.VimConnException, NsWorkerException) as e:
224 self.logger.error(
225 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
226 )
227 ro_vim_item_update = {
228 "vim_status": "VIM_ERROR",
229 "created": created,
230 "vim_message": str(e),
231 }
232
233 return "FAILED", ro_vim_item_update
234
235 def refresh(self, ro_task):
236 """Call VIM to get network status"""
237 ro_task_id = ro_task["_id"]
238 target_vim = self.my_vims[ro_task["target_id"]]
239 vim_id = ro_task["vim_info"]["vim_id"]
240 net_to_refresh_list = [vim_id]
241
242 try:
243 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
244 vim_info = vim_dict[vim_id]
245
246 if vim_info["status"] == "ACTIVE":
247 task_status = "DONE"
248 elif vim_info["status"] == "BUILD":
249 task_status = "BUILD"
250 else:
251 task_status = "FAILED"
252 except vimconn.VimConnException as e:
253 # Mark all tasks at VIM_ERROR status
254 self.logger.error(
255 "ro_task={} vim={} get-net={}: {}".format(
256 ro_task_id, ro_task["target_id"], vim_id, e
257 )
258 )
259 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
260 task_status = "FAILED"
261
262 ro_vim_item_update = {}
263 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
264 ro_vim_item_update["vim_status"] = vim_info["status"]
265
266 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
267 ro_vim_item_update["vim_name"] = vim_info.get("name")
268
269 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
270 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
271 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
272 elif vim_info["status"] == "DELETED":
273 ro_vim_item_update["vim_id"] = None
274 ro_vim_item_update["vim_message"] = "Deleted externally"
275 else:
276 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
277 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
278
279 if ro_vim_item_update:
280 self.logger.debug(
281 "ro_task={} {} get-net={}: status={} {}".format(
282 ro_task_id,
283 ro_task["target_id"],
284 vim_id,
285 ro_vim_item_update.get("vim_status"),
286 ro_vim_item_update.get("vim_message")
287 if ro_vim_item_update.get("vim_status") != "ACTIVE"
288 else "",
289 )
290 )
291
292 return task_status, ro_vim_item_update
293
294 def delete(self, ro_task, task_index):
295 task = ro_task["tasks"][task_index]
296 task_id = task["task_id"]
297 net_vim_id = ro_task["vim_info"]["vim_id"]
298 ro_vim_item_update_ok = {
299 "vim_status": "DELETED",
300 "created": False,
301 "vim_message": "DELETED",
302 "vim_id": None,
303 }
304
305 try:
306 if net_vim_id or ro_task["vim_info"]["created_items"]:
307 target_vim = self.my_vims[ro_task["target_id"]]
308 target_vim.delete_network(
309 net_vim_id, ro_task["vim_info"]["created_items"]
310 )
311 except vimconn.VimConnNotFoundException:
312 ro_vim_item_update_ok["vim_message"] = "already deleted"
313 except vimconn.VimConnException as e:
314 self.logger.error(
315 "ro_task={} vim={} del-net={}: {}".format(
316 ro_task["_id"], ro_task["target_id"], net_vim_id, e
317 )
318 )
319 ro_vim_item_update = {
320 "vim_status": "VIM_ERROR",
321 "vim_message": "Error while deleting: {}".format(e),
322 }
323
324 return "FAILED", ro_vim_item_update
325
326 self.logger.debug(
327 "task={} {} del-net={} {}".format(
328 task_id,
329 ro_task["target_id"],
330 net_vim_id,
331 ro_vim_item_update_ok.get("vim_message", ""),
332 )
333 )
334
335 return "DONE", ro_vim_item_update_ok
336
337
338 class VimInteractionVdu(VimInteractionBase):
339 max_retries_inject_ssh_key = 20 # 20 times
340 time_retries_inject_ssh_key = 30 # wevery 30 seconds
341
342 def new(self, ro_task, task_index, task_depends):
343 task = ro_task["tasks"][task_index]
344 task_id = task["task_id"]
345 created = False
346 created_items = {}
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390
391 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
392 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393
394 # add to created items previous_created_volumes (healing)
395 if task.get("previous_created_volumes"):
396 for k, v in task["previous_created_volumes"].items():
397 created_items[k] = v
398
399 ro_vim_item_update = {
400 "vim_id": vim_vm_id,
401 "vim_status": "BUILD",
402 "created": created,
403 "created_items": created_items,
404 "vim_details": None,
405 "vim_message": None,
406 "interfaces_vim_ids": interfaces,
407 "interfaces": [],
408 "interfaces_backup": [],
409 }
410 self.logger.debug(
411 "task={} {} new-vm={} created={}".format(
412 task_id, ro_task["target_id"], vim_vm_id, created
413 )
414 )
415
416 return "BUILD", ro_vim_item_update
417 except (vimconn.VimConnException, NsWorkerException) as e:
418 self.logger.error(
419 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
420 )
421 ro_vim_item_update = {
422 "vim_status": "VIM_ERROR",
423 "created": created,
424 "vim_message": str(e),
425 }
426
427 return "FAILED", ro_vim_item_update
428
429 def delete(self, ro_task, task_index):
430 task = ro_task["tasks"][task_index]
431 task_id = task["task_id"]
432 vm_vim_id = ro_task["vim_info"]["vim_id"]
433 ro_vim_item_update_ok = {
434 "vim_status": "DELETED",
435 "created": False,
436 "vim_message": "DELETED",
437 "vim_id": None,
438 }
439
440 try:
441 self.logger.debug(
442 "delete_vminstance: vm_vim_id={} created_items={}".format(
443 vm_vim_id, ro_task["vim_info"]["created_items"]
444 )
445 )
446 if vm_vim_id or ro_task["vim_info"]["created_items"]:
447 target_vim = self.my_vims[ro_task["target_id"]]
448 target_vim.delete_vminstance(
449 vm_vim_id,
450 ro_task["vim_info"]["created_items"],
451 ro_task["vim_info"].get("volumes_to_hold", []),
452 )
453 except vimconn.VimConnNotFoundException:
454 ro_vim_item_update_ok["vim_message"] = "already deleted"
455 except vimconn.VimConnException as e:
456 self.logger.error(
457 "ro_task={} vim={} del-vm={}: {}".format(
458 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
459 )
460 )
461 ro_vim_item_update = {
462 "vim_status": "VIM_ERROR",
463 "vim_message": "Error while deleting: {}".format(e),
464 }
465
466 return "FAILED", ro_vim_item_update
467
468 self.logger.debug(
469 "task={} {} del-vm={} {}".format(
470 task_id,
471 ro_task["target_id"],
472 vm_vim_id,
473 ro_vim_item_update_ok.get("vim_message", ""),
474 )
475 )
476
477 return "DONE", ro_vim_item_update_ok
478
479 def refresh(self, ro_task):
480 """Call VIM to get vm status"""
481 ro_task_id = ro_task["_id"]
482 target_vim = self.my_vims[ro_task["target_id"]]
483 vim_id = ro_task["vim_info"]["vim_id"]
484
485 if not vim_id:
486 return None, None
487
488 vm_to_refresh_list = [vim_id]
489 try:
490 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
491 vim_info = vim_dict[vim_id]
492
493 if vim_info["status"] == "ACTIVE":
494 task_status = "DONE"
495 elif vim_info["status"] == "BUILD":
496 task_status = "BUILD"
497 else:
498 task_status = "FAILED"
499
500 # try to load and parse vim_information
501 try:
502 vim_info_info = yaml.safe_load(vim_info["vim_info"])
503 if vim_info_info.get("name"):
504 vim_info["name"] = vim_info_info["name"]
505 except Exception:
506 pass
507 except vimconn.VimConnException as e:
508 # Mark all tasks at VIM_ERROR status
509 self.logger.error(
510 "ro_task={} vim={} get-vm={}: {}".format(
511 ro_task_id, ro_task["target_id"], vim_id, e
512 )
513 )
514 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
515 task_status = "FAILED"
516
517 ro_vim_item_update = {}
518
519 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
520 vim_interfaces = []
521 if vim_info.get("interfaces"):
522 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
523 iface = next(
524 (
525 iface
526 for iface in vim_info["interfaces"]
527 if vim_iface_id == iface["vim_interface_id"]
528 ),
529 None,
530 )
531 # if iface:
532 # iface.pop("vim_info", None)
533 vim_interfaces.append(iface)
534
535 task_create = next(
536 t
537 for t in ro_task["tasks"]
538 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
539 )
540 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
541 vim_interfaces[task_create["mgmt_vnf_interface"]][
542 "mgmt_vnf_interface"
543 ] = True
544
545 mgmt_vdu_iface = task_create.get(
546 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
547 )
548 if vim_interfaces:
549 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
550
551 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
552 ro_vim_item_update["interfaces"] = vim_interfaces
553
554 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
555 ro_vim_item_update["vim_status"] = vim_info["status"]
556
557 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
558 ro_vim_item_update["vim_name"] = vim_info.get("name")
559
560 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
561 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
562 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
563 elif vim_info["status"] == "DELETED":
564 ro_vim_item_update["vim_id"] = None
565 ro_vim_item_update["vim_message"] = "Deleted externally"
566 else:
567 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
568 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
569
570 if ro_vim_item_update:
571 self.logger.debug(
572 "ro_task={} {} get-vm={}: status={} {}".format(
573 ro_task_id,
574 ro_task["target_id"],
575 vim_id,
576 ro_vim_item_update.get("vim_status"),
577 ro_vim_item_update.get("vim_message")
578 if ro_vim_item_update.get("vim_status") != "ACTIVE"
579 else "",
580 )
581 )
582
583 return task_status, ro_vim_item_update
584
585 def exec(self, ro_task, task_index, task_depends):
586 task = ro_task["tasks"][task_index]
587 task_id = task["task_id"]
588 target_vim = self.my_vims[ro_task["target_id"]]
589 db_task_update = {"retries": 0}
590 retries = task.get("retries", 0)
591
592 try:
593 params = task["params"]
594 params_copy = deepcopy(params)
595 params_copy["ro_key"] = self.db.decrypt(
596 params_copy.pop("private_key"),
597 params_copy.pop("schema_version"),
598 params_copy.pop("salt"),
599 )
600 params_copy["ip_addr"] = params_copy.pop("ip_address")
601 target_vim.inject_user_key(**params_copy)
602 self.logger.debug(
603 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
604 )
605
606 return (
607 "DONE",
608 None,
609 db_task_update,
610 ) # params_copy["key"]
611 except (vimconn.VimConnException, NsWorkerException) as e:
612 retries += 1
613
614 self.logger.debug(traceback.format_exc())
615 if retries < self.max_retries_inject_ssh_key:
616 return (
617 "BUILD",
618 None,
619 {
620 "retries": retries,
621 "next_retry": self.time_retries_inject_ssh_key,
622 },
623 )
624
625 self.logger.error(
626 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
627 )
628 ro_vim_item_update = {"vim_message": str(e)}
629
630 return "FAILED", ro_vim_item_update, db_task_update
631
632
633 class VimInteractionImage(VimInteractionBase):
634 def new(self, ro_task, task_index, task_depends):
635 task = ro_task["tasks"][task_index]
636 task_id = task["task_id"]
637 created = False
638 created_items = {}
639 target_vim = self.my_vims[ro_task["target_id"]]
640
641 try:
642 # FIND
643 if task.get("find_params"):
644 vim_images = target_vim.get_image_list(**task["find_params"])
645
646 if not vim_images:
647 raise NsWorkerExceptionNotFound(
648 "Image not found with this criteria: '{}'".format(
649 task["find_params"]
650 )
651 )
652 elif len(vim_images) > 1:
653 raise NsWorkerException(
654 "More than one image found with this criteria: '{}'".format(
655 task["find_params"]
656 )
657 )
658 else:
659 vim_image_id = vim_images[0]["id"]
660
661 ro_vim_item_update = {
662 "vim_id": vim_image_id,
663 "vim_status": "DONE",
664 "created": created,
665 "created_items": created_items,
666 "vim_details": None,
667 "vim_message": None,
668 }
669 self.logger.debug(
670 "task={} {} new-image={} created={}".format(
671 task_id, ro_task["target_id"], vim_image_id, created
672 )
673 )
674
675 return "DONE", ro_vim_item_update
676 except (NsWorkerException, vimconn.VimConnException) as e:
677 self.logger.error(
678 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
679 )
680 ro_vim_item_update = {
681 "vim_status": "VIM_ERROR",
682 "created": created,
683 "vim_message": str(e),
684 }
685
686 return "FAILED", ro_vim_item_update
687
688
689 class VimInteractionFlavor(VimInteractionBase):
690 def delete(self, ro_task, task_index):
691 task = ro_task["tasks"][task_index]
692 task_id = task["task_id"]
693 flavor_vim_id = ro_task["vim_info"]["vim_id"]
694 ro_vim_item_update_ok = {
695 "vim_status": "DELETED",
696 "created": False,
697 "vim_message": "DELETED",
698 "vim_id": None,
699 }
700
701 try:
702 if flavor_vim_id:
703 target_vim = self.my_vims[ro_task["target_id"]]
704 target_vim.delete_flavor(flavor_vim_id)
705 except vimconn.VimConnNotFoundException:
706 ro_vim_item_update_ok["vim_message"] = "already deleted"
707 except vimconn.VimConnException as e:
708 self.logger.error(
709 "ro_task={} vim={} del-flavor={}: {}".format(
710 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
711 )
712 )
713 ro_vim_item_update = {
714 "vim_status": "VIM_ERROR",
715 "vim_message": "Error while deleting: {}".format(e),
716 }
717
718 return "FAILED", ro_vim_item_update
719
720 self.logger.debug(
721 "task={} {} del-flavor={} {}".format(
722 task_id,
723 ro_task["target_id"],
724 flavor_vim_id,
725 ro_vim_item_update_ok.get("vim_message", ""),
726 )
727 )
728
729 return "DONE", ro_vim_item_update_ok
730
731 def new(self, ro_task, task_index, task_depends):
732 task = ro_task["tasks"][task_index]
733 task_id = task["task_id"]
734 created = False
735 created_items = {}
736 target_vim = self.my_vims[ro_task["target_id"]]
737
738 try:
739 # FIND
740 vim_flavor_id = None
741
742 if task.get("find_params"):
743 try:
744 flavor_data = task["find_params"]["flavor_data"]
745 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
746 except vimconn.VimConnNotFoundException:
747 pass
748
749 if not vim_flavor_id and task.get("params"):
750 # CREATE
751 flavor_data = task["params"]["flavor_data"]
752 vim_flavor_id = target_vim.new_flavor(flavor_data)
753 created = True
754
755 ro_vim_item_update = {
756 "vim_id": vim_flavor_id,
757 "vim_status": "DONE",
758 "created": created,
759 "created_items": created_items,
760 "vim_details": None,
761 "vim_message": None,
762 }
763 self.logger.debug(
764 "task={} {} new-flavor={} created={}".format(
765 task_id, ro_task["target_id"], vim_flavor_id, created
766 )
767 )
768
769 return "DONE", ro_vim_item_update
770 except (vimconn.VimConnException, NsWorkerException) as e:
771 self.logger.error(
772 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
773 )
774 ro_vim_item_update = {
775 "vim_status": "VIM_ERROR",
776 "created": created,
777 "vim_message": str(e),
778 }
779
780 return "FAILED", ro_vim_item_update
781
782
783 class VimInteractionAffinityGroup(VimInteractionBase):
784 def delete(self, ro_task, task_index):
785 task = ro_task["tasks"][task_index]
786 task_id = task["task_id"]
787 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
788 ro_vim_item_update_ok = {
789 "vim_status": "DELETED",
790 "created": False,
791 "vim_message": "DELETED",
792 "vim_id": None,
793 }
794
795 try:
796 if affinity_group_vim_id:
797 target_vim = self.my_vims[ro_task["target_id"]]
798 target_vim.delete_affinity_group(affinity_group_vim_id)
799 except vimconn.VimConnNotFoundException:
800 ro_vim_item_update_ok["vim_message"] = "already deleted"
801 except vimconn.VimConnException as e:
802 self.logger.error(
803 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
804 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
805 )
806 )
807 ro_vim_item_update = {
808 "vim_status": "VIM_ERROR",
809 "vim_message": "Error while deleting: {}".format(e),
810 }
811
812 return "FAILED", ro_vim_item_update
813
814 self.logger.debug(
815 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
816 task_id,
817 ro_task["target_id"],
818 affinity_group_vim_id,
819 ro_vim_item_update_ok.get("vim_message", ""),
820 )
821 )
822
823 return "DONE", ro_vim_item_update_ok
824
825 def new(self, ro_task, task_index, task_depends):
826 task = ro_task["tasks"][task_index]
827 task_id = task["task_id"]
828 created = False
829 created_items = {}
830 target_vim = self.my_vims[ro_task["target_id"]]
831
832 try:
833 affinity_group_vim_id = None
834 affinity_group_data = None
835
836 if task.get("params"):
837 affinity_group_data = task["params"].get("affinity_group_data")
838
839 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
840 try:
841 param_affinity_group_id = task["params"]["affinity_group_data"].get(
842 "vim-affinity-group-id"
843 )
844 affinity_group_vim_id = target_vim.get_affinity_group(
845 param_affinity_group_id
846 ).get("id")
847 except vimconn.VimConnNotFoundException:
848 self.logger.error(
849 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
850 "could not be found at VIM. Creating a new one.".format(
851 task_id, ro_task["target_id"], param_affinity_group_id
852 )
853 )
854
855 if not affinity_group_vim_id and affinity_group_data:
856 affinity_group_vim_id = target_vim.new_affinity_group(
857 affinity_group_data
858 )
859 created = True
860
861 ro_vim_item_update = {
862 "vim_id": affinity_group_vim_id,
863 "vim_status": "DONE",
864 "created": created,
865 "created_items": created_items,
866 "vim_details": None,
867 "vim_message": None,
868 }
869 self.logger.debug(
870 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
871 task_id, ro_task["target_id"], affinity_group_vim_id, created
872 )
873 )
874
875 return "DONE", ro_vim_item_update
876 except (vimconn.VimConnException, NsWorkerException) as e:
877 self.logger.error(
878 "task={} vim={} new-affinity-or-anti-affinity-group:"
879 " {}".format(task_id, ro_task["target_id"], e)
880 )
881 ro_vim_item_update = {
882 "vim_status": "VIM_ERROR",
883 "created": created,
884 "vim_message": str(e),
885 }
886
887 return "FAILED", ro_vim_item_update
888
889
890 class VimInteractionSdnNet(VimInteractionBase):
891 @staticmethod
892 def _match_pci(port_pci, mapping):
893 """
894 Check if port_pci matches with mapping
895 mapping can have brackets to indicate that several chars are accepted. e.g
896 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
897 :param port_pci: text
898 :param mapping: text, can contain brackets to indicate several chars are available
899 :return: True if matches, False otherwise
900 """
901 if not port_pci or not mapping:
902 return False
903 if port_pci == mapping:
904 return True
905
906 mapping_index = 0
907 pci_index = 0
908 while True:
909 bracket_start = mapping.find("[", mapping_index)
910
911 if bracket_start == -1:
912 break
913
914 bracket_end = mapping.find("]", bracket_start)
915 if bracket_end == -1:
916 break
917
918 length = bracket_start - mapping_index
919 if (
920 length
921 and port_pci[pci_index : pci_index + length]
922 != mapping[mapping_index:bracket_start]
923 ):
924 return False
925
926 if (
927 port_pci[pci_index + length]
928 not in mapping[bracket_start + 1 : bracket_end]
929 ):
930 return False
931
932 pci_index += length + 1
933 mapping_index = bracket_end + 1
934
935 if port_pci[pci_index:] != mapping[mapping_index:]:
936 return False
937
938 return True
939
940 def _get_interfaces(self, vlds_to_connect, vim_account_id):
941 """
942 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
943 :param vim_account_id:
944 :return:
945 """
946 interfaces = []
947
948 for vld in vlds_to_connect:
949 table, _, db_id = vld.partition(":")
950 db_id, _, vld = db_id.partition(":")
951 _, _, vld_id = vld.partition(".")
952
953 if table == "vnfrs":
954 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
955 iface_key = "vnf-vld-id"
956 else: # table == "nsrs"
957 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
958 iface_key = "ns-vld-id"
959
960 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
961
962 for db_vnfr in db_vnfrs:
963 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
964 for iface_index, interface in enumerate(vdur["interfaces"]):
965 if interface.get(iface_key) == vld_id and interface.get(
966 "type"
967 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
968 # only SR-IOV o PT
969 interface_ = interface.copy()
970 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
971 db_vnfr["_id"], vdu_index, iface_index
972 )
973
974 if vdur.get("status") == "ERROR":
975 interface_["status"] = "ERROR"
976
977 interfaces.append(interface_)
978
979 return interfaces
980
981 def refresh(self, ro_task):
982 # look for task create
983 task_create_index, _ = next(
984 i_t
985 for i_t in enumerate(ro_task["tasks"])
986 if i_t[1]
987 and i_t[1]["action"] == "CREATE"
988 and i_t[1]["status"] != "FINISHED"
989 )
990
991 return self.new(ro_task, task_create_index, None)
992
993 def new(self, ro_task, task_index, task_depends):
994
995 task = ro_task["tasks"][task_index]
996 task_id = task["task_id"]
997 target_vim = self.my_vims[ro_task["target_id"]]
998
999 sdn_net_id = ro_task["vim_info"]["vim_id"]
1000
1001 created_items = ro_task["vim_info"].get("created_items")
1002 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1003 new_connected_ports = []
1004 last_update = ro_task["vim_info"].get("last_update", 0)
1005 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1006 error_list = []
1007 created = ro_task["vim_info"].get("created", False)
1008
1009 try:
1010 # CREATE
1011 params = task["params"]
1012 vlds_to_connect = params["vlds"]
1013 associated_vim = params["target_vim"]
1014 # external additional ports
1015 additional_ports = params.get("sdn-ports") or ()
1016 _, _, vim_account_id = associated_vim.partition(":")
1017
1018 if associated_vim:
1019 # get associated VIM
1020 if associated_vim not in self.db_vims:
1021 self.db_vims[associated_vim] = self.db.get_one(
1022 "vim_accounts", {"_id": vim_account_id}
1023 )
1024
1025 db_vim = self.db_vims[associated_vim]
1026
1027 # look for ports to connect
1028 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1029 # print(ports)
1030
1031 sdn_ports = []
1032 pending_ports = error_ports = 0
1033 vlan_used = None
1034 sdn_need_update = False
1035
1036 for port in ports:
1037 vlan_used = port.get("vlan") or vlan_used
1038
1039 # TODO. Do not connect if already done
1040 if not port.get("compute_node") or not port.get("pci"):
1041 if port.get("status") == "ERROR":
1042 error_ports += 1
1043 else:
1044 pending_ports += 1
1045 continue
1046
1047 pmap = None
1048 compute_node_mappings = next(
1049 (
1050 c
1051 for c in db_vim["config"].get("sdn-port-mapping", ())
1052 if c and c["compute_node"] == port["compute_node"]
1053 ),
1054 None,
1055 )
1056
1057 if compute_node_mappings:
1058 # process port_mapping pci of type 0000:af:1[01].[1357]
1059 pmap = next(
1060 (
1061 p
1062 for p in compute_node_mappings["ports"]
1063 if self._match_pci(port["pci"], p.get("pci"))
1064 ),
1065 None,
1066 )
1067
1068 if not pmap:
1069 if not db_vim["config"].get("mapping_not_needed"):
1070 error_list.append(
1071 "Port mapping not found for compute_node={} pci={}".format(
1072 port["compute_node"], port["pci"]
1073 )
1074 )
1075 continue
1076
1077 pmap = {}
1078
1079 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1080 new_port = {
1081 "service_endpoint_id": pmap.get("service_endpoint_id")
1082 or service_endpoint_id,
1083 "service_endpoint_encapsulation_type": "dot1q"
1084 if port["type"] == "SR-IOV"
1085 else None,
1086 "service_endpoint_encapsulation_info": {
1087 "vlan": port.get("vlan"),
1088 "mac": port.get("mac-address"),
1089 "device_id": pmap.get("device_id") or port["compute_node"],
1090 "device_interface_id": pmap.get("device_interface_id")
1091 or port["pci"],
1092 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1093 "switch_port": pmap.get("switch_port"),
1094 "service_mapping_info": pmap.get("service_mapping_info"),
1095 },
1096 }
1097
1098 # TODO
1099 # if port["modified_at"] > last_update:
1100 # sdn_need_update = True
1101 new_connected_ports.append(port["id"]) # TODO
1102 sdn_ports.append(new_port)
1103
1104 if error_ports:
1105 error_list.append(
1106 "{} interfaces have not been created as VDU is on ERROR status".format(
1107 error_ports
1108 )
1109 )
1110
1111 # connect external ports
1112 for index, additional_port in enumerate(additional_ports):
1113 additional_port_id = additional_port.get(
1114 "service_endpoint_id"
1115 ) or "external-{}".format(index)
1116 sdn_ports.append(
1117 {
1118 "service_endpoint_id": additional_port_id,
1119 "service_endpoint_encapsulation_type": additional_port.get(
1120 "service_endpoint_encapsulation_type", "dot1q"
1121 ),
1122 "service_endpoint_encapsulation_info": {
1123 "vlan": additional_port.get("vlan") or vlan_used,
1124 "mac": additional_port.get("mac_address"),
1125 "device_id": additional_port.get("device_id"),
1126 "device_interface_id": additional_port.get(
1127 "device_interface_id"
1128 ),
1129 "switch_dpid": additional_port.get("switch_dpid")
1130 or additional_port.get("switch_id"),
1131 "switch_port": additional_port.get("switch_port"),
1132 "service_mapping_info": additional_port.get(
1133 "service_mapping_info"
1134 ),
1135 },
1136 }
1137 )
1138 new_connected_ports.append(additional_port_id)
1139 sdn_info = ""
1140
1141 # if there are more ports to connect or they have been modified, call create/update
1142 if error_list:
1143 sdn_status = "ERROR"
1144 sdn_info = "; ".join(error_list)
1145 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1146 last_update = time.time()
1147
1148 if not sdn_net_id:
1149 if len(sdn_ports) < 2:
1150 sdn_status = "ACTIVE"
1151
1152 if not pending_ports:
1153 self.logger.debug(
1154 "task={} {} new-sdn-net done, less than 2 ports".format(
1155 task_id, ro_task["target_id"]
1156 )
1157 )
1158 else:
1159 net_type = params.get("type") or "ELAN"
1160 (
1161 sdn_net_id,
1162 created_items,
1163 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1164 created = True
1165 self.logger.debug(
1166 "task={} {} new-sdn-net={} created={}".format(
1167 task_id, ro_task["target_id"], sdn_net_id, created
1168 )
1169 )
1170 else:
1171 created_items = target_vim.edit_connectivity_service(
1172 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1173 )
1174 created = True
1175 self.logger.debug(
1176 "task={} {} update-sdn-net={} created={}".format(
1177 task_id, ro_task["target_id"], sdn_net_id, created
1178 )
1179 )
1180
1181 connected_ports = new_connected_ports
1182 elif sdn_net_id:
1183 wim_status_dict = target_vim.get_connectivity_service_status(
1184 sdn_net_id, conn_info=created_items
1185 )
1186 sdn_status = wim_status_dict["sdn_status"]
1187
1188 if wim_status_dict.get("sdn_info"):
1189 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1190
1191 if wim_status_dict.get("error_msg"):
1192 sdn_info = wim_status_dict.get("error_msg") or ""
1193
1194 if pending_ports:
1195 if sdn_status != "ERROR":
1196 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1197 len(ports) - pending_ports, len(ports)
1198 )
1199
1200 if sdn_status == "ACTIVE":
1201 sdn_status = "BUILD"
1202
1203 ro_vim_item_update = {
1204 "vim_id": sdn_net_id,
1205 "vim_status": sdn_status,
1206 "created": created,
1207 "created_items": created_items,
1208 "connected_ports": connected_ports,
1209 "vim_details": sdn_info,
1210 "vim_message": None,
1211 "last_update": last_update,
1212 }
1213
1214 return sdn_status, ro_vim_item_update
1215 except Exception as e:
1216 self.logger.error(
1217 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1218 exc_info=not isinstance(
1219 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1220 ),
1221 )
1222 ro_vim_item_update = {
1223 "vim_status": "VIM_ERROR",
1224 "created": created,
1225 "vim_message": str(e),
1226 }
1227
1228 return "FAILED", ro_vim_item_update
1229
1230 def delete(self, ro_task, task_index):
1231 task = ro_task["tasks"][task_index]
1232 task_id = task["task_id"]
1233 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1234 ro_vim_item_update_ok = {
1235 "vim_status": "DELETED",
1236 "created": False,
1237 "vim_message": "DELETED",
1238 "vim_id": None,
1239 }
1240
1241 try:
1242 if sdn_vim_id:
1243 target_vim = self.my_vims[ro_task["target_id"]]
1244 target_vim.delete_connectivity_service(
1245 sdn_vim_id, ro_task["vim_info"].get("created_items")
1246 )
1247
1248 except Exception as e:
1249 if (
1250 isinstance(e, sdnconn.SdnConnectorError)
1251 and e.http_code == HTTPStatus.NOT_FOUND.value
1252 ):
1253 ro_vim_item_update_ok["vim_message"] = "already deleted"
1254 else:
1255 self.logger.error(
1256 "ro_task={} vim={} del-sdn-net={}: {}".format(
1257 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1258 ),
1259 exc_info=not isinstance(
1260 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1261 ),
1262 )
1263 ro_vim_item_update = {
1264 "vim_status": "VIM_ERROR",
1265 "vim_message": "Error while deleting: {}".format(e),
1266 }
1267
1268 return "FAILED", ro_vim_item_update
1269
1270 self.logger.debug(
1271 "task={} {} del-sdn-net={} {}".format(
1272 task_id,
1273 ro_task["target_id"],
1274 sdn_vim_id,
1275 ro_vim_item_update_ok.get("vim_message", ""),
1276 )
1277 )
1278
1279 return "DONE", ro_vim_item_update_ok
1280
1281
1282 class VimInteractionMigration(VimInteractionBase):
1283 def exec(self, ro_task, task_index, task_depends):
1284 task = ro_task["tasks"][task_index]
1285 task_id = task["task_id"]
1286 db_task_update = {"retries": 0}
1287 target_vim = self.my_vims[ro_task["target_id"]]
1288 vim_interfaces = []
1289 created = False
1290 created_items = {}
1291 refreshed_vim_info = {}
1292
1293 try:
1294 if task.get("params"):
1295 vim_vm_id = task["params"].get("vim_vm_id")
1296 migrate_host = task["params"].get("migrate_host")
1297 _, migrated_compute_node = target_vim.migrate_instance(
1298 vim_vm_id, migrate_host
1299 )
1300
1301 if migrated_compute_node:
1302 # When VM is migrated, vdu["vim_info"] needs to be updated
1303 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1304 ro_task["target_id"]
1305 )
1306
1307 # Refresh VM to get new vim_info
1308 vm_to_refresh_list = [vim_vm_id]
1309 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1310 refreshed_vim_info = vim_dict[vim_vm_id]
1311
1312 if refreshed_vim_info.get("interfaces"):
1313 for old_iface in vdu_old_vim_info.get("interfaces"):
1314 iface = next(
1315 (
1316 iface
1317 for iface in refreshed_vim_info["interfaces"]
1318 if old_iface["vim_interface_id"]
1319 == iface["vim_interface_id"]
1320 ),
1321 None,
1322 )
1323 vim_interfaces.append(iface)
1324
1325 ro_vim_item_update = {
1326 "vim_id": vim_vm_id,
1327 "vim_status": "ACTIVE",
1328 "created": created,
1329 "created_items": created_items,
1330 "vim_details": None,
1331 "vim_message": None,
1332 }
1333
1334 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1335 "ERROR",
1336 "VIM_ERROR",
1337 ):
1338 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1339
1340 if vim_interfaces:
1341 ro_vim_item_update["interfaces"] = vim_interfaces
1342
1343 self.logger.debug(
1344 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1345 )
1346
1347 return "DONE", ro_vim_item_update, db_task_update
1348
1349 except (vimconn.VimConnException, NsWorkerException) as e:
1350 self.logger.error(
1351 "task={} vim={} VM Migration:"
1352 " {}".format(task_id, ro_task["target_id"], e)
1353 )
1354 ro_vim_item_update = {
1355 "vim_status": "VIM_ERROR",
1356 "created": created,
1357 "vim_message": str(e),
1358 }
1359
1360 return "FAILED", ro_vim_item_update, db_task_update
1361
1362
1363 class NsWorker(threading.Thread):
1364 REFRESH_BUILD = 5 # 5 seconds
1365 REFRESH_ACTIVE = 60 # 1 minute
1366 REFRESH_ERROR = 600
1367 REFRESH_IMAGE = 3600 * 10
1368 REFRESH_DELETE = 3600 * 10
1369 QUEUE_SIZE = 100
1370 terminate = False
1371
1372 def __init__(self, worker_index, config, plugins, db):
1373 """
1374
1375 :param worker_index: thread index
1376 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1377 :param plugins: global shared dict with the loaded plugins
1378 :param db: database class instance to use
1379 """
1380 threading.Thread.__init__(self)
1381 self.config = config
1382 self.plugins = plugins
1383 self.plugin_name = "unknown"
1384 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1385 self.worker_index = worker_index
1386 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1387 # targetvim: vimplugin class
1388 self.my_vims = {}
1389 # targetvim: vim information from database
1390 self.db_vims = {}
1391 # targetvim list
1392 self.vim_targets = []
1393 self.my_id = config["process_id"] + ":" + str(worker_index)
1394 self.db = db
1395 self.item2class = {
1396 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1397 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1398 "image": VimInteractionImage(
1399 self.db, self.my_vims, self.db_vims, self.logger
1400 ),
1401 "flavor": VimInteractionFlavor(
1402 self.db, self.my_vims, self.db_vims, self.logger
1403 ),
1404 "sdn_net": VimInteractionSdnNet(
1405 self.db, self.my_vims, self.db_vims, self.logger
1406 ),
1407 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1408 self.db, self.my_vims, self.db_vims, self.logger
1409 ),
1410 "migrate": VimInteractionMigration(
1411 self.db, self.my_vims, self.db_vims, self.logger
1412 ),
1413 }
1414 self.time_last_task_processed = None
1415 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1416 self.tasks_to_delete = []
1417 # it is idle when there are not vim_targets associated
1418 self.idle = True
1419 self.task_locked_time = config["global"]["task_locked_time"]
1420
1421 def insert_task(self, task):
1422 try:
1423 self.task_queue.put(task, False)
1424 return None
1425 except queue.Full:
1426 raise NsWorkerException("timeout inserting a task")
1427
1428 def terminate(self):
1429 self.insert_task("exit")
1430
1431 def del_task(self, task):
1432 with self.task_lock:
1433 if task["status"] == "SCHEDULED":
1434 task["status"] = "SUPERSEDED"
1435 return True
1436 else: # task["status"] == "processing"
1437 self.task_lock.release()
1438 return False
1439
1440 def _process_vim_config(self, target_id, db_vim):
1441 """
1442 Process vim config, creating vim configuration files as ca_cert
1443 :param target_id: vim/sdn/wim + id
1444 :param db_vim: Vim dictionary obtained from database
1445 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1446 """
1447 if not db_vim.get("config"):
1448 return
1449
1450 file_name = ""
1451
1452 try:
1453 if db_vim["config"].get("ca_cert_content"):
1454 file_name = "{}:{}".format(target_id, self.worker_index)
1455
1456 try:
1457 mkdir(file_name)
1458 except FileExistsError:
1459 pass
1460
1461 file_name = file_name + "/ca_cert"
1462
1463 with open(file_name, "w") as f:
1464 f.write(db_vim["config"]["ca_cert_content"])
1465 del db_vim["config"]["ca_cert_content"]
1466 db_vim["config"]["ca_cert"] = file_name
1467 except Exception as e:
1468 raise NsWorkerException(
1469 "Error writing to file '{}': {}".format(file_name, e)
1470 )
1471
1472 def _load_plugin(self, name, type="vim"):
1473 # type can be vim or sdn
1474 if "rovim_dummy" not in self.plugins:
1475 self.plugins["rovim_dummy"] = VimDummyConnector
1476
1477 if "rosdn_dummy" not in self.plugins:
1478 self.plugins["rosdn_dummy"] = SdnDummyConnector
1479
1480 if name in self.plugins:
1481 return self.plugins[name]
1482
1483 try:
1484 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1485 self.plugins[name] = ep.load()
1486 except Exception as e:
1487 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1488
1489 if name and name not in self.plugins:
1490 raise NsWorkerException(
1491 "Plugin 'osm_{n}' has not been installed".format(n=name)
1492 )
1493
1494 return self.plugins[name]
1495
1496 def _unload_vim(self, target_id):
1497 """
1498 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1499 :param target_id: Contains type:_id; where type can be 'vim', ...
1500 :return: None.
1501 """
1502 try:
1503 self.db_vims.pop(target_id, None)
1504 self.my_vims.pop(target_id, None)
1505
1506 if target_id in self.vim_targets:
1507 self.vim_targets.remove(target_id)
1508
1509 self.logger.info("Unloaded {}".format(target_id))
1510 rmtree("{}:{}".format(target_id, self.worker_index))
1511 except FileNotFoundError:
1512 pass # this is raised by rmtree if folder does not exist
1513 except Exception as e:
1514 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1515
1516 def _check_vim(self, target_id):
1517 """
1518 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1519 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1520 :return: None.
1521 """
1522 target, _, _id = target_id.partition(":")
1523 now = time.time()
1524 update_dict = {}
1525 unset_dict = {}
1526 op_text = ""
1527 step = ""
1528 loaded = target_id in self.vim_targets
1529 target_database = (
1530 "vim_accounts"
1531 if target == "vim"
1532 else "wim_accounts"
1533 if target == "wim"
1534 else "sdns"
1535 )
1536
1537 try:
1538 step = "Getting {} from db".format(target_id)
1539 db_vim = self.db.get_one(target_database, {"_id": _id})
1540
1541 for op_index, operation in enumerate(
1542 db_vim["_admin"].get("operations", ())
1543 ):
1544 if operation["operationState"] != "PROCESSING":
1545 continue
1546
1547 locked_at = operation.get("locked_at")
1548
1549 if locked_at is not None and locked_at >= now - self.task_locked_time:
1550 # some other thread is doing this operation
1551 return
1552
1553 # lock
1554 op_text = "_admin.operations.{}.".format(op_index)
1555
1556 if not self.db.set_one(
1557 target_database,
1558 q_filter={
1559 "_id": _id,
1560 op_text + "operationState": "PROCESSING",
1561 op_text + "locked_at": locked_at,
1562 },
1563 update_dict={
1564 op_text + "locked_at": now,
1565 "admin.current_operation": op_index,
1566 },
1567 fail_on_empty=False,
1568 ):
1569 return
1570
1571 unset_dict[op_text + "locked_at"] = None
1572 unset_dict["current_operation"] = None
1573 step = "Loading " + target_id
1574 error_text = self._load_vim(target_id)
1575
1576 if not error_text:
1577 step = "Checking connectivity"
1578
1579 if target == "vim":
1580 self.my_vims[target_id].check_vim_connectivity()
1581 else:
1582 self.my_vims[target_id].check_credentials()
1583
1584 update_dict["_admin.operationalState"] = "ENABLED"
1585 update_dict["_admin.detailed-status"] = ""
1586 unset_dict[op_text + "detailed-status"] = None
1587 update_dict[op_text + "operationState"] = "COMPLETED"
1588
1589 return
1590
1591 except Exception as e:
1592 error_text = "{}: {}".format(step, e)
1593 self.logger.error("{} for {}: {}".format(step, target_id, e))
1594
1595 finally:
1596 if update_dict or unset_dict:
1597 if error_text:
1598 update_dict[op_text + "operationState"] = "FAILED"
1599 update_dict[op_text + "detailed-status"] = error_text
1600 unset_dict.pop(op_text + "detailed-status", None)
1601 update_dict["_admin.operationalState"] = "ERROR"
1602 update_dict["_admin.detailed-status"] = error_text
1603
1604 if op_text:
1605 update_dict[op_text + "statusEnteredTime"] = now
1606
1607 self.db.set_one(
1608 target_database,
1609 q_filter={"_id": _id},
1610 update_dict=update_dict,
1611 unset=unset_dict,
1612 fail_on_empty=False,
1613 )
1614
1615 if not loaded:
1616 self._unload_vim(target_id)
1617
1618 def _reload_vim(self, target_id):
1619 if target_id in self.vim_targets:
1620 self._load_vim(target_id)
1621 else:
1622 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1623 # just remove it to force load again next time it is needed
1624 self.db_vims.pop(target_id, None)
1625
1626 def _load_vim(self, target_id):
1627 """
1628 Load or reload a vim_account, sdn_controller or wim_account.
1629 Read content from database, load the plugin if not loaded.
1630 In case of error loading the plugin, it load a failing VIM_connector
1631 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1632 :param target_id: Contains type:_id; where type can be 'vim', ...
1633 :return: None if ok, descriptive text if error
1634 """
1635 target, _, _id = target_id.partition(":")
1636 target_database = (
1637 "vim_accounts"
1638 if target == "vim"
1639 else "wim_accounts"
1640 if target == "wim"
1641 else "sdns"
1642 )
1643 plugin_name = ""
1644 vim = None
1645
1646 try:
1647 step = "Getting {}={} from db".format(target, _id)
1648 # TODO process for wim, sdnc, ...
1649 vim = self.db.get_one(target_database, {"_id": _id})
1650
1651 # if deep_get(vim, "config", "sdn-controller"):
1652 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1653 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1654
1655 step = "Decrypting password"
1656 schema_version = vim.get("schema_version")
1657 self.db.encrypt_decrypt_fields(
1658 vim,
1659 "decrypt",
1660 fields=("password", "secret"),
1661 schema_version=schema_version,
1662 salt=_id,
1663 )
1664 self._process_vim_config(target_id, vim)
1665
1666 if target == "vim":
1667 plugin_name = "rovim_" + vim["vim_type"]
1668 step = "Loading plugin '{}'".format(plugin_name)
1669 vim_module_conn = self._load_plugin(plugin_name)
1670 step = "Loading {}'".format(target_id)
1671 self.my_vims[target_id] = vim_module_conn(
1672 uuid=vim["_id"],
1673 name=vim["name"],
1674 tenant_id=vim.get("vim_tenant_id"),
1675 tenant_name=vim.get("vim_tenant_name"),
1676 url=vim["vim_url"],
1677 url_admin=None,
1678 user=vim["vim_user"],
1679 passwd=vim["vim_password"],
1680 config=vim.get("config") or {},
1681 persistent_info={},
1682 )
1683 else: # sdn
1684 plugin_name = "rosdn_" + vim["type"]
1685 step = "Loading plugin '{}'".format(plugin_name)
1686 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1687 step = "Loading {}'".format(target_id)
1688 wim = deepcopy(vim)
1689 wim_config = wim.pop("config", {}) or {}
1690 wim["uuid"] = wim["_id"]
1691 wim["wim_url"] = wim["url"]
1692
1693 if wim.get("dpid"):
1694 wim_config["dpid"] = wim.pop("dpid")
1695
1696 if wim.get("switch_id"):
1697 wim_config["switch_id"] = wim.pop("switch_id")
1698
1699 # wim, wim_account, config
1700 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1701 self.db_vims[target_id] = vim
1702 self.error_status = None
1703
1704 self.logger.info(
1705 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1706 )
1707 except Exception as e:
1708 self.logger.error(
1709 "Cannot load {} plugin={}: {} {}".format(
1710 target_id, plugin_name, step, e
1711 )
1712 )
1713
1714 self.db_vims[target_id] = vim or {}
1715 self.db_vims[target_id] = FailingConnector(str(e))
1716 error_status = "{} Error: {}".format(step, e)
1717
1718 return error_status
1719 finally:
1720 if target_id not in self.vim_targets:
1721 self.vim_targets.append(target_id)
1722
1723 def _get_db_task(self):
1724 """
1725 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1726 :return: None
1727 """
1728 now = time.time()
1729
1730 if not self.time_last_task_processed:
1731 self.time_last_task_processed = now
1732
1733 try:
1734 while True:
1735 """
1736 # Log RO tasks only when loglevel is DEBUG
1737 if self.logger.getEffectiveLevel() == logging.DEBUG:
1738 self._log_ro_task(
1739 None,
1740 None,
1741 None,
1742 "TASK_WF",
1743 "task_locked_time="
1744 + str(self.task_locked_time)
1745 + " "
1746 + "time_last_task_processed="
1747 + str(self.time_last_task_processed)
1748 + " "
1749 + "now="
1750 + str(now),
1751 )
1752 """
1753 locked = self.db.set_one(
1754 "ro_tasks",
1755 q_filter={
1756 "target_id": self.vim_targets,
1757 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1758 "locked_at.lt": now - self.task_locked_time,
1759 "to_check_at.lt": self.time_last_task_processed,
1760 },
1761 update_dict={"locked_by": self.my_id, "locked_at": now},
1762 fail_on_empty=False,
1763 )
1764
1765 if locked:
1766 # read and return
1767 ro_task = self.db.get_one(
1768 "ro_tasks",
1769 q_filter={
1770 "target_id": self.vim_targets,
1771 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1772 "locked_at": now,
1773 },
1774 )
1775 return ro_task
1776
1777 if self.time_last_task_processed == now:
1778 self.time_last_task_processed = None
1779 return None
1780 else:
1781 self.time_last_task_processed = now
1782 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1783
1784 except DbException as e:
1785 self.logger.error("Database exception at _get_db_task: {}".format(e))
1786 except Exception as e:
1787 self.logger.critical(
1788 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1789 )
1790
1791 return None
1792
1793 def _get_db_all_tasks(self):
1794 """
1795 Read all content of table ro_tasks to log it
1796 :return: None
1797 """
1798 try:
1799 # Checking the content of the BD:
1800
1801 # read and return
1802 ro_task = self.db.get_list("ro_tasks")
1803 for rt in ro_task:
1804 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1805 return ro_task
1806
1807 except DbException as e:
1808 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1809 except Exception as e:
1810 self.logger.critical(
1811 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1812 )
1813
1814 return None
1815
1816 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1817 """
1818 Generate a log with the following format:
1819
1820 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1821 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1822 task_array_index;task_id;task_action;task_item;task_args
1823
1824 Example:
1825
1826 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1827 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1828 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1829 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1830 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1831 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1832 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1833 """
1834 try:
1835 line = []
1836 i = 0
1837 if ro_task is not None and isinstance(ro_task, dict):
1838 for t in ro_task["tasks"]:
1839 line.clear()
1840 line.append(mark)
1841 line.append(event)
1842 line.append(ro_task.get("_id", ""))
1843 line.append(str(ro_task.get("locked_at", "")))
1844 line.append(str(ro_task.get("modified_at", "")))
1845 line.append(str(ro_task.get("created_at", "")))
1846 line.append(str(ro_task.get("to_check_at", "")))
1847 line.append(str(ro_task.get("locked_by", "")))
1848 line.append(str(ro_task.get("target_id", "")))
1849 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1850 line.append(str(ro_task.get("vim_info", "")))
1851 line.append(str(ro_task.get("tasks", "")))
1852 if isinstance(t, dict):
1853 line.append(str(t.get("status", "")))
1854 line.append(str(t.get("action_id", "")))
1855 line.append(str(i))
1856 line.append(str(t.get("task_id", "")))
1857 line.append(str(t.get("action", "")))
1858 line.append(str(t.get("item", "")))
1859 line.append(str(t.get("find_params", "")))
1860 line.append(str(t.get("params", "")))
1861 else:
1862 line.extend([""] * 2)
1863 line.append(str(i))
1864 line.extend([""] * 5)
1865
1866 i += 1
1867 self.logger.debug(";".join(line))
1868 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1869 i = 0
1870 while True:
1871 st = "tasks.{}.status".format(i)
1872 if st not in db_ro_task_update:
1873 break
1874 line.clear()
1875 line.append(mark)
1876 line.append(event)
1877 line.append(db_ro_task_update.get("_id", ""))
1878 line.append(str(db_ro_task_update.get("locked_at", "")))
1879 line.append(str(db_ro_task_update.get("modified_at", "")))
1880 line.append("")
1881 line.append(str(db_ro_task_update.get("to_check_at", "")))
1882 line.append(str(db_ro_task_update.get("locked_by", "")))
1883 line.append("")
1884 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1885 line.append("")
1886 line.append(str(db_ro_task_update.get("vim_info", "")))
1887 line.append(str(str(db_ro_task_update).count(".status")))
1888 line.append(db_ro_task_update.get(st, ""))
1889 line.append("")
1890 line.append(str(i))
1891 line.extend([""] * 3)
1892 i += 1
1893 self.logger.debug(";".join(line))
1894
1895 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1896 line.clear()
1897 line.append(mark)
1898 line.append(event)
1899 line.append(db_ro_task_delete.get("_id", ""))
1900 line.append("")
1901 line.append(db_ro_task_delete.get("modified_at", ""))
1902 line.extend([""] * 13)
1903 self.logger.debug(";".join(line))
1904
1905 else:
1906 line.clear()
1907 line.append(mark)
1908 line.append(event)
1909 line.extend([""] * 16)
1910 self.logger.debug(";".join(line))
1911
1912 except Exception as e:
1913 self.logger.error("Error logging ro_task: {}".format(e))
1914
1915 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1916 """
1917 Determine if this task need to be done or superseded
1918 :return: None
1919 """
1920 my_task = ro_task["tasks"][task_index]
1921 task_id = my_task["task_id"]
1922 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1923 "created_items", False
1924 )
1925
1926 self.logger.warning("Needed delete: {}".format(needed_delete))
1927 if my_task["status"] == "FAILED":
1928 return None, None # TODO need to be retry??
1929
1930 try:
1931 for index, task in enumerate(ro_task["tasks"]):
1932 if index == task_index or not task:
1933 continue # own task
1934
1935 if (
1936 my_task["target_record"] == task["target_record"]
1937 and task["action"] == "CREATE"
1938 ):
1939 # set to finished
1940 db_update["tasks.{}.status".format(index)] = task[
1941 "status"
1942 ] = "FINISHED"
1943 elif task["action"] == "CREATE" and task["status"] not in (
1944 "FINISHED",
1945 "SUPERSEDED",
1946 ):
1947 needed_delete = False
1948
1949 if needed_delete:
1950 self.logger.warning(
1951 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
1952 )
1953 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1954 else:
1955 return "SUPERSEDED", None
1956 except Exception as e:
1957 if not isinstance(e, NsWorkerException):
1958 self.logger.critical(
1959 "Unexpected exception at _delete_task task={}: {}".format(
1960 task_id, e
1961 ),
1962 exc_info=True,
1963 )
1964
1965 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
1966
1967 def _create_task(self, ro_task, task_index, task_depends, db_update):
1968 """
1969 Determine if this task need to create something at VIM
1970 :return: None
1971 """
1972 my_task = ro_task["tasks"][task_index]
1973 task_id = my_task["task_id"]
1974 task_status = None
1975
1976 if my_task["status"] == "FAILED":
1977 return None, None # TODO need to be retry??
1978 elif my_task["status"] == "SCHEDULED":
1979 # check if already created by another task
1980 for index, task in enumerate(ro_task["tasks"]):
1981 if index == task_index or not task:
1982 continue # own task
1983
1984 if task["action"] == "CREATE" and task["status"] not in (
1985 "SCHEDULED",
1986 "FINISHED",
1987 "SUPERSEDED",
1988 ):
1989 return task["status"], "COPY_VIM_INFO"
1990
1991 try:
1992 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1993 ro_task, task_index, task_depends
1994 )
1995 # TODO update other CREATE tasks
1996 except Exception as e:
1997 if not isinstance(e, NsWorkerException):
1998 self.logger.error(
1999 "Error executing task={}: {}".format(task_id, e), exc_info=True
2000 )
2001
2002 task_status = "FAILED"
2003 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2004 # TODO update ro_vim_item_update
2005
2006 return task_status, ro_vim_item_update
2007 else:
2008 return None, None
2009
2010 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2011 """
2012 Look for dependency task
2013 :param task_id: Can be one of
2014 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2015 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2016 3. task.task_id: "<action_id>:number"
2017 :param ro_task:
2018 :param target_id:
2019 :return: database ro_task plus index of task
2020 """
2021 if (
2022 task_id.startswith("vim:")
2023 or task_id.startswith("sdn:")
2024 or task_id.startswith("wim:")
2025 ):
2026 target_id, _, task_id = task_id.partition(" ")
2027
2028 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2029 ro_task_dependency = self.db.get_one(
2030 "ro_tasks",
2031 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2032 fail_on_empty=False,
2033 )
2034
2035 if ro_task_dependency:
2036 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2037 if task["target_record_id"] == task_id:
2038 return ro_task_dependency, task_index
2039
2040 else:
2041 if ro_task:
2042 for task_index, task in enumerate(ro_task["tasks"]):
2043 if task and task["task_id"] == task_id:
2044 return ro_task, task_index
2045
2046 ro_task_dependency = self.db.get_one(
2047 "ro_tasks",
2048 q_filter={
2049 "tasks.ANYINDEX.task_id": task_id,
2050 "tasks.ANYINDEX.target_record.ne": None,
2051 },
2052 fail_on_empty=False,
2053 )
2054
2055 self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
2056 if ro_task_dependency:
2057 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2058 if task["task_id"] == task_id:
2059 return ro_task_dependency, task_index
2060 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2061
2062 def _process_pending_tasks(self, ro_task):
2063 ro_task_id = ro_task["_id"]
2064 now = time.time()
2065 # one day
2066 next_check_at = now + (24 * 60 * 60)
2067 db_ro_task_update = {}
2068
2069 def _update_refresh(new_status):
2070 # compute next_refresh
2071 nonlocal task
2072 nonlocal next_check_at
2073 nonlocal db_ro_task_update
2074 nonlocal ro_task
2075
2076 next_refresh = time.time()
2077
2078 if task["item"] in ("image", "flavor"):
2079 next_refresh += self.REFRESH_IMAGE
2080 elif new_status == "BUILD":
2081 next_refresh += self.REFRESH_BUILD
2082 elif new_status == "DONE":
2083 next_refresh += self.REFRESH_ACTIVE
2084 else:
2085 next_refresh += self.REFRESH_ERROR
2086
2087 next_check_at = min(next_check_at, next_refresh)
2088 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2089 ro_task["vim_info"]["refresh_at"] = next_refresh
2090
2091 try:
2092 """
2093 # Log RO tasks only when loglevel is DEBUG
2094 if self.logger.getEffectiveLevel() == logging.DEBUG:
2095 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2096 """
2097 # 0: get task_status_create
2098 lock_object = None
2099 task_status_create = None
2100 task_create = next(
2101 (
2102 t
2103 for t in ro_task["tasks"]
2104 if t
2105 and t["action"] == "CREATE"
2106 and t["status"] in ("BUILD", "DONE")
2107 ),
2108 None,
2109 )
2110
2111 if task_create:
2112 task_status_create = task_create["status"]
2113
2114 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2115 for task_action in ("DELETE", "CREATE", "EXEC"):
2116 db_vim_update = None
2117 new_status = None
2118
2119 for task_index, task in enumerate(ro_task["tasks"]):
2120 if not task:
2121 continue # task deleted
2122
2123 task_depends = {}
2124 target_update = None
2125
2126 if (
2127 (
2128 task_action in ("DELETE", "EXEC")
2129 and task["status"] not in ("SCHEDULED", "BUILD")
2130 )
2131 or task["action"] != task_action
2132 or (
2133 task_action == "CREATE"
2134 and task["status"] in ("FINISHED", "SUPERSEDED")
2135 )
2136 ):
2137 continue
2138
2139 task_path = "tasks.{}.status".format(task_index)
2140 try:
2141 db_vim_info_update = None
2142
2143 if task["status"] == "SCHEDULED":
2144 # check if tasks that this depends on have been completed
2145 dependency_not_completed = False
2146
2147 for dependency_task_id in task.get("depends_on") or ():
2148 (
2149 dependency_ro_task,
2150 dependency_task_index,
2151 ) = self._get_dependency(
2152 dependency_task_id, target_id=ro_task["target_id"]
2153 )
2154 dependency_task = dependency_ro_task["tasks"][
2155 dependency_task_index
2156 ]
2157 self.logger.warning(
2158 "dependency_ro_task={} dependency_task_index={}".format(
2159 dependency_ro_task, dependency_task_index
2160 )
2161 )
2162
2163 if dependency_task["status"] == "SCHEDULED":
2164 dependency_not_completed = True
2165 next_check_at = min(
2166 next_check_at, dependency_ro_task["to_check_at"]
2167 )
2168 # must allow dependent task to be processed first
2169 # to do this set time after last_task_processed
2170 next_check_at = max(
2171 self.time_last_task_processed, next_check_at
2172 )
2173 break
2174 elif dependency_task["status"] == "FAILED":
2175 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2176 task["action"],
2177 task["item"],
2178 dependency_task["action"],
2179 dependency_task["item"],
2180 dependency_task_id,
2181 dependency_ro_task["vim_info"].get(
2182 "vim_message"
2183 ),
2184 )
2185 self.logger.error(
2186 "task={} {}".format(task["task_id"], error_text)
2187 )
2188 raise NsWorkerException(error_text)
2189
2190 task_depends[dependency_task_id] = dependency_ro_task[
2191 "vim_info"
2192 ]["vim_id"]
2193 task_depends[
2194 "TASK-{}".format(dependency_task_id)
2195 ] = dependency_ro_task["vim_info"]["vim_id"]
2196
2197 if dependency_not_completed:
2198 self.logger.warning(
2199 "DEPENDENCY NOT COMPLETED {}".format(
2200 dependency_ro_task["vim_info"]["vim_id"]
2201 )
2202 )
2203 # TODO set at vim_info.vim_details that it is waiting
2204 continue
2205
2206 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2207 # the task of renew this locking. It will update database locket_at periodically
2208 if not lock_object:
2209 lock_object = LockRenew.add_lock_object(
2210 "ro_tasks", ro_task, self
2211 )
2212
2213 if task["action"] == "DELETE":
2214 (new_status, db_vim_info_update,) = self._delete_task(
2215 ro_task, task_index, task_depends, db_ro_task_update
2216 )
2217 new_status = (
2218 "FINISHED" if new_status == "DONE" else new_status
2219 )
2220 # ^with FINISHED instead of DONE it will not be refreshing
2221
2222 if new_status in ("FINISHED", "SUPERSEDED"):
2223 target_update = "DELETE"
2224 elif task["action"] == "EXEC":
2225 (
2226 new_status,
2227 db_vim_info_update,
2228 db_task_update,
2229 ) = self.item2class[task["item"]].exec(
2230 ro_task, task_index, task_depends
2231 )
2232 new_status = (
2233 "FINISHED" if new_status == "DONE" else new_status
2234 )
2235 # ^with FINISHED instead of DONE it will not be refreshing
2236
2237 if db_task_update:
2238 # load into database the modified db_task_update "retries" and "next_retry"
2239 if db_task_update.get("retries"):
2240 db_ro_task_update[
2241 "tasks.{}.retries".format(task_index)
2242 ] = db_task_update["retries"]
2243
2244 next_check_at = time.time() + db_task_update.get(
2245 "next_retry", 60
2246 )
2247 target_update = None
2248 elif task["action"] == "CREATE":
2249 if task["status"] == "SCHEDULED":
2250 if task_status_create:
2251 new_status = task_status_create
2252 target_update = "COPY_VIM_INFO"
2253 else:
2254 new_status, db_vim_info_update = self.item2class[
2255 task["item"]
2256 ].new(ro_task, task_index, task_depends)
2257 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2258 _update_refresh(new_status)
2259 else:
2260 if (
2261 ro_task["vim_info"]["refresh_at"]
2262 and now > ro_task["vim_info"]["refresh_at"]
2263 ):
2264 new_status, db_vim_info_update = self.item2class[
2265 task["item"]
2266 ].refresh(ro_task)
2267 _update_refresh(new_status)
2268 else:
2269 # The refresh is updated to avoid set the value of "refresh_at" to
2270 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2271 # because it can happen that in this case the task is never processed
2272 _update_refresh(task["status"])
2273
2274 except Exception as e:
2275 new_status = "FAILED"
2276 db_vim_info_update = {
2277 "vim_status": "VIM_ERROR",
2278 "vim_message": str(e),
2279 }
2280
2281 if not isinstance(
2282 e, (NsWorkerException, vimconn.VimConnException)
2283 ):
2284 self.logger.error(
2285 "Unexpected exception at _delete_task task={}: {}".format(
2286 task["task_id"], e
2287 ),
2288 exc_info=True,
2289 )
2290
2291 try:
2292 if db_vim_info_update:
2293 db_vim_update = db_vim_info_update.copy()
2294 db_ro_task_update.update(
2295 {
2296 "vim_info." + k: v
2297 for k, v in db_vim_info_update.items()
2298 }
2299 )
2300 ro_task["vim_info"].update(db_vim_info_update)
2301
2302 if new_status:
2303 if task_action == "CREATE":
2304 task_status_create = new_status
2305 db_ro_task_update[task_path] = new_status
2306
2307 if target_update or db_vim_update:
2308 if target_update == "DELETE":
2309 self._update_target(task, None)
2310 elif target_update == "COPY_VIM_INFO":
2311 self._update_target(task, ro_task["vim_info"])
2312 else:
2313 self._update_target(task, db_vim_update)
2314
2315 except Exception as e:
2316 if (
2317 isinstance(e, DbException)
2318 and e.http_code == HTTPStatus.NOT_FOUND
2319 ):
2320 # if the vnfrs or nsrs has been removed from database, this task must be removed
2321 self.logger.debug(
2322 "marking to delete task={}".format(task["task_id"])
2323 )
2324 self.tasks_to_delete.append(task)
2325 else:
2326 self.logger.error(
2327 "Unexpected exception at _update_target task={}: {}".format(
2328 task["task_id"], e
2329 ),
2330 exc_info=True,
2331 )
2332
2333 locked_at = ro_task["locked_at"]
2334
2335 if lock_object:
2336 locked_at = [
2337 lock_object["locked_at"],
2338 lock_object["locked_at"] + self.task_locked_time,
2339 ]
2340 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2341 # contain exactly locked_at + self.task_locked_time
2342 LockRenew.remove_lock_object(lock_object)
2343
2344 q_filter = {
2345 "_id": ro_task["_id"],
2346 "to_check_at": ro_task["to_check_at"],
2347 "locked_at": locked_at,
2348 }
2349 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2350 # outside this task (by ro_nbi) do not update it
2351 db_ro_task_update["locked_by"] = None
2352 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2353 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2354 db_ro_task_update["modified_at"] = now
2355 db_ro_task_update["to_check_at"] = next_check_at
2356
2357 """
2358 # Log RO tasks only when loglevel is DEBUG
2359 if self.logger.getEffectiveLevel() == logging.DEBUG:
2360 db_ro_task_update_log = db_ro_task_update.copy()
2361 db_ro_task_update_log["_id"] = q_filter["_id"]
2362 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2363 """
2364
2365 if not self.db.set_one(
2366 "ro_tasks",
2367 update_dict=db_ro_task_update,
2368 q_filter=q_filter,
2369 fail_on_empty=False,
2370 ):
2371 del db_ro_task_update["to_check_at"]
2372 del q_filter["to_check_at"]
2373 """
2374 # Log RO tasks only when loglevel is DEBUG
2375 if self.logger.getEffectiveLevel() == logging.DEBUG:
2376 self._log_ro_task(
2377 None,
2378 db_ro_task_update_log,
2379 None,
2380 "TASK_WF",
2381 "SET_TASK " + str(q_filter),
2382 )
2383 """
2384 self.db.set_one(
2385 "ro_tasks",
2386 q_filter=q_filter,
2387 update_dict=db_ro_task_update,
2388 fail_on_empty=True,
2389 )
2390 except DbException as e:
2391 self.logger.error(
2392 "ro_task={} Error updating database {}".format(ro_task_id, e)
2393 )
2394 except Exception as e:
2395 self.logger.error(
2396 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2397 )
2398
2399 def _update_target(self, task, ro_vim_item_update):
2400 table, _, temp = task["target_record"].partition(":")
2401 _id, _, path_vim_status = temp.partition(":")
2402 path_item = path_vim_status[: path_vim_status.rfind(".")]
2403 path_item = path_item[: path_item.rfind(".")]
2404 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2405 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2406
2407 if ro_vim_item_update:
2408 update_dict = {
2409 path_vim_status + "." + k: v
2410 for k, v in ro_vim_item_update.items()
2411 if k
2412 in (
2413 "vim_id",
2414 "vim_details",
2415 "vim_message",
2416 "vim_name",
2417 "vim_status",
2418 "interfaces",
2419 "interfaces_backup",
2420 )
2421 }
2422
2423 if path_vim_status.startswith("vdur."):
2424 # for backward compatibility, add vdur.name apart from vdur.vim_name
2425 if ro_vim_item_update.get("vim_name"):
2426 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2427
2428 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2429 if ro_vim_item_update.get("vim_id"):
2430 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2431
2432 # update general status
2433 if ro_vim_item_update.get("vim_status"):
2434 update_dict[path_item + ".status"] = ro_vim_item_update[
2435 "vim_status"
2436 ]
2437
2438 if ro_vim_item_update.get("interfaces"):
2439 path_interfaces = path_item + ".interfaces"
2440
2441 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2442 if iface:
2443 update_dict.update(
2444 {
2445 path_interfaces + ".{}.".format(i) + k: v
2446 for k, v in iface.items()
2447 if k in ("vlan", "compute_node", "pci")
2448 }
2449 )
2450
2451 # put ip_address and mac_address with ip-address and mac-address
2452 if iface.get("ip_address"):
2453 update_dict[
2454 path_interfaces + ".{}.".format(i) + "ip-address"
2455 ] = iface["ip_address"]
2456
2457 if iface.get("mac_address"):
2458 update_dict[
2459 path_interfaces + ".{}.".format(i) + "mac-address"
2460 ] = iface["mac_address"]
2461
2462 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2463 update_dict["ip-address"] = iface.get("ip_address").split(
2464 ";"
2465 )[0]
2466
2467 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2468 update_dict[path_item + ".ip-address"] = iface.get(
2469 "ip_address"
2470 ).split(";")[0]
2471
2472 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2473
2474 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2475 if ro_vim_item_update.get("interfaces"):
2476 search_key = path_vim_status + ".interfaces"
2477 if update_dict.get(search_key):
2478 interfaces_backup_update = {
2479 path_vim_status + ".interfaces_backup": update_dict[search_key]
2480 }
2481
2482 self.db.set_one(
2483 table,
2484 q_filter={"_id": _id},
2485 update_dict=interfaces_backup_update,
2486 )
2487
2488 else:
2489 update_dict = {path_item + ".status": "DELETED"}
2490 self.db.set_one(
2491 table,
2492 q_filter={"_id": _id},
2493 update_dict=update_dict,
2494 unset={path_vim_status: None},
2495 )
2496
2497 def _process_delete_db_tasks(self):
2498 """
2499 Delete task from database because vnfrs or nsrs or both have been deleted
2500 :return: None. Uses and modify self.tasks_to_delete
2501 """
2502 while self.tasks_to_delete:
2503 task = self.tasks_to_delete[0]
2504 vnfrs_deleted = None
2505 nsr_id = task["nsr_id"]
2506
2507 if task["target_record"].startswith("vnfrs:"):
2508 # check if nsrs is present
2509 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2510 vnfrs_deleted = task["target_record"].split(":")[1]
2511
2512 try:
2513 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2514 except Exception as e:
2515 self.logger.error(
2516 "Error deleting task={}: {}".format(task["task_id"], e)
2517 )
2518 self.tasks_to_delete.pop(0)
2519
2520 @staticmethod
2521 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2522 """
2523 Static method because it is called from osm_ng_ro.ns
2524 :param db: instance of database to use
2525 :param nsr_id: affected nsrs id
2526 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2527 :return: None, exception is fails
2528 """
2529 retries = 5
2530 for retry in range(retries):
2531 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2532 now = time.time()
2533 conflict = False
2534
2535 for ro_task in ro_tasks:
2536 db_update = {}
2537 to_delete_ro_task = True
2538
2539 for index, task in enumerate(ro_task["tasks"]):
2540 if not task:
2541 pass
2542 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2543 vnfrs_deleted
2544 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2545 ):
2546 db_update["tasks.{}".format(index)] = None
2547 else:
2548 # used by other nsr, ro_task cannot be deleted
2549 to_delete_ro_task = False
2550
2551 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2552 if to_delete_ro_task:
2553 if not db.del_one(
2554 "ro_tasks",
2555 q_filter={
2556 "_id": ro_task["_id"],
2557 "modified_at": ro_task["modified_at"],
2558 },
2559 fail_on_empty=False,
2560 ):
2561 conflict = True
2562 elif db_update:
2563 db_update["modified_at"] = now
2564 if not db.set_one(
2565 "ro_tasks",
2566 q_filter={
2567 "_id": ro_task["_id"],
2568 "modified_at": ro_task["modified_at"],
2569 },
2570 update_dict=db_update,
2571 fail_on_empty=False,
2572 ):
2573 conflict = True
2574 if not conflict:
2575 return
2576 else:
2577 raise NsWorkerException("Exceeded {} retries".format(retries))
2578
2579 def run(self):
2580 # load database
2581 self.logger.info("Starting")
2582 while True:
2583 # step 1: get commands from queue
2584 try:
2585 if self.vim_targets:
2586 task = self.task_queue.get(block=False)
2587 else:
2588 if not self.idle:
2589 self.logger.debug("enters in idle state")
2590 self.idle = True
2591 task = self.task_queue.get(block=True)
2592 self.idle = False
2593
2594 if task[0] == "terminate":
2595 break
2596 elif task[0] == "load_vim":
2597 self.logger.info("order to load vim {}".format(task[1]))
2598 self._load_vim(task[1])
2599 elif task[0] == "unload_vim":
2600 self.logger.info("order to unload vim {}".format(task[1]))
2601 self._unload_vim(task[1])
2602 elif task[0] == "reload_vim":
2603 self._reload_vim(task[1])
2604 elif task[0] == "check_vim":
2605 self.logger.info("order to check vim {}".format(task[1]))
2606 self._check_vim(task[1])
2607 continue
2608 except Exception as e:
2609 if isinstance(e, queue.Empty):
2610 pass
2611 else:
2612 self.logger.critical(
2613 "Error processing task: {}".format(e), exc_info=True
2614 )
2615
2616 # step 2: process pending_tasks, delete not needed tasks
2617 try:
2618 if self.tasks_to_delete:
2619 self._process_delete_db_tasks()
2620 busy = False
2621 """
2622 # Log RO tasks only when loglevel is DEBUG
2623 if self.logger.getEffectiveLevel() == logging.DEBUG:
2624 _ = self._get_db_all_tasks()
2625 """
2626 ro_task = self._get_db_task()
2627 if ro_task:
2628 self.logger.warning("Task to process: {}".format(ro_task))
2629 time.sleep(1)
2630 self._process_pending_tasks(ro_task)
2631 busy = True
2632 if not busy:
2633 time.sleep(5)
2634 except Exception as e:
2635 self.logger.critical(
2636 "Unexpected exception at run: " + str(e), exc_info=True
2637 )
2638
2639 self.logger.info("Finishing")