9ed04d7bd6e894e01a6bd7710065a2e263d5edf2
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from unittest.mock import Mock
37
38 from importlib_metadata import entry_points
39 from osm_common.dbbase import DbException
40 from osm_ng_ro.vim_admin import LockRenew
41 from osm_ro_plugin import sdnconn, vimconn
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin.vim_dummy import VimDummyConnector
44 import yaml
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 "vim_message": None,
215 }
216 self.logger.debug(
217 "task={} {} new-net={} created={}".format(
218 task_id, ro_task["target_id"], vim_net_id, created
219 )
220 )
221
222 return "BUILD", ro_vim_item_update
223 except (vimconn.VimConnException, NsWorkerException) as e:
224 self.logger.error(
225 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
226 )
227 ro_vim_item_update = {
228 "vim_status": "VIM_ERROR",
229 "created": created,
230 "vim_message": str(e),
231 }
232
233 return "FAILED", ro_vim_item_update
234
235 def refresh(self, ro_task):
236 """Call VIM to get network status"""
237 ro_task_id = ro_task["_id"]
238 target_vim = self.my_vims[ro_task["target_id"]]
239 vim_id = ro_task["vim_info"]["vim_id"]
240 net_to_refresh_list = [vim_id]
241
242 try:
243 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
244 vim_info = vim_dict[vim_id]
245
246 if vim_info["status"] == "ACTIVE":
247 task_status = "DONE"
248 elif vim_info["status"] == "BUILD":
249 task_status = "BUILD"
250 else:
251 task_status = "FAILED"
252 except vimconn.VimConnException as e:
253 # Mark all tasks at VIM_ERROR status
254 self.logger.error(
255 "ro_task={} vim={} get-net={}: {}".format(
256 ro_task_id, ro_task["target_id"], vim_id, e
257 )
258 )
259 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
260 task_status = "FAILED"
261
262 ro_vim_item_update = {}
263 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
264 ro_vim_item_update["vim_status"] = vim_info["status"]
265
266 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
267 ro_vim_item_update["vim_name"] = vim_info.get("name")
268
269 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
270 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
271 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
272 elif vim_info["status"] == "DELETED":
273 ro_vim_item_update["vim_id"] = None
274 ro_vim_item_update["vim_message"] = "Deleted externally"
275 else:
276 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
277 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
278
279 if ro_vim_item_update:
280 self.logger.debug(
281 "ro_task={} {} get-net={}: status={} {}".format(
282 ro_task_id,
283 ro_task["target_id"],
284 vim_id,
285 ro_vim_item_update.get("vim_status"),
286 ro_vim_item_update.get("vim_message")
287 if ro_vim_item_update.get("vim_status") != "ACTIVE"
288 else "",
289 )
290 )
291
292 return task_status, ro_vim_item_update
293
294 def delete(self, ro_task, task_index):
295 task = ro_task["tasks"][task_index]
296 task_id = task["task_id"]
297 net_vim_id = ro_task["vim_info"]["vim_id"]
298 ro_vim_item_update_ok = {
299 "vim_status": "DELETED",
300 "created": False,
301 "vim_message": "DELETED",
302 "vim_id": None,
303 }
304
305 try:
306 if net_vim_id or ro_task["vim_info"]["created_items"]:
307 target_vim = self.my_vims[ro_task["target_id"]]
308 target_vim.delete_network(
309 net_vim_id, ro_task["vim_info"]["created_items"]
310 )
311 except vimconn.VimConnNotFoundException:
312 ro_vim_item_update_ok["vim_message"] = "already deleted"
313 except vimconn.VimConnException as e:
314 self.logger.error(
315 "ro_task={} vim={} del-net={}: {}".format(
316 ro_task["_id"], ro_task["target_id"], net_vim_id, e
317 )
318 )
319 ro_vim_item_update = {
320 "vim_status": "VIM_ERROR",
321 "vim_message": "Error while deleting: {}".format(e),
322 }
323
324 return "FAILED", ro_vim_item_update
325
326 self.logger.debug(
327 "task={} {} del-net={} {}".format(
328 task_id,
329 ro_task["target_id"],
330 net_vim_id,
331 ro_vim_item_update_ok.get("vim_message", ""),
332 )
333 )
334
335 return "DONE", ro_vim_item_update_ok
336
337
338 class VimInteractionVdu(VimInteractionBase):
339 max_retries_inject_ssh_key = 20 # 20 times
340 time_retries_inject_ssh_key = 30 # wevery 30 seconds
341
342 def new(self, ro_task, task_index, task_depends):
343 task = ro_task["tasks"][task_index]
344 task_id = task["task_id"]
345 created = False
346 created_items = {}
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390
391 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
392 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393
394 # add to created items previous_created_volumes (healing)
395 if task.get("previous_created_volumes"):
396 for k, v in task["previous_created_volumes"].items():
397 created_items[k] = v
398
399 ro_vim_item_update = {
400 "vim_id": vim_vm_id,
401 "vim_status": "BUILD",
402 "created": created,
403 "created_items": created_items,
404 "vim_details": None,
405 "vim_message": None,
406 "interfaces_vim_ids": interfaces,
407 "interfaces": [],
408 "interfaces_backup": [],
409 }
410 self.logger.debug(
411 "task={} {} new-vm={} created={}".format(
412 task_id, ro_task["target_id"], vim_vm_id, created
413 )
414 )
415
416 return "BUILD", ro_vim_item_update
417 except (vimconn.VimConnException, NsWorkerException) as e:
418 self.logger.error(
419 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
420 )
421 ro_vim_item_update = {
422 "vim_status": "VIM_ERROR",
423 "created": created,
424 "vim_message": str(e),
425 }
426
427 return "FAILED", ro_vim_item_update
428
429 def delete(self, ro_task, task_index):
430 task = ro_task["tasks"][task_index]
431 task_id = task["task_id"]
432 vm_vim_id = ro_task["vim_info"]["vim_id"]
433 ro_vim_item_update_ok = {
434 "vim_status": "DELETED",
435 "created": False,
436 "vim_message": "DELETED",
437 "vim_id": None,
438 }
439
440 try:
441 self.logger.debug(
442 "delete_vminstance: vm_vim_id={} created_items={}".format(
443 vm_vim_id, ro_task["vim_info"]["created_items"]
444 )
445 )
446 if vm_vim_id or ro_task["vim_info"]["created_items"]:
447 target_vim = self.my_vims[ro_task["target_id"]]
448 target_vim.delete_vminstance(
449 vm_vim_id,
450 ro_task["vim_info"]["created_items"],
451 ro_task["vim_info"].get("volumes_to_hold", []),
452 )
453 except vimconn.VimConnNotFoundException:
454 ro_vim_item_update_ok["vim_message"] = "already deleted"
455 except vimconn.VimConnException as e:
456 self.logger.error(
457 "ro_task={} vim={} del-vm={}: {}".format(
458 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
459 )
460 )
461 ro_vim_item_update = {
462 "vim_status": "VIM_ERROR",
463 "vim_message": "Error while deleting: {}".format(e),
464 }
465
466 return "FAILED", ro_vim_item_update
467
468 self.logger.debug(
469 "task={} {} del-vm={} {}".format(
470 task_id,
471 ro_task["target_id"],
472 vm_vim_id,
473 ro_vim_item_update_ok.get("vim_message", ""),
474 )
475 )
476
477 return "DONE", ro_vim_item_update_ok
478
479 def refresh(self, ro_task):
480 """Call VIM to get vm status"""
481 ro_task_id = ro_task["_id"]
482 target_vim = self.my_vims[ro_task["target_id"]]
483 vim_id = ro_task["vim_info"]["vim_id"]
484
485 if not vim_id:
486 return None, None
487
488 vm_to_refresh_list = [vim_id]
489 try:
490 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
491 vim_info = vim_dict[vim_id]
492
493 if vim_info["status"] == "ACTIVE":
494 task_status = "DONE"
495 elif vim_info["status"] == "BUILD":
496 task_status = "BUILD"
497 else:
498 task_status = "FAILED"
499
500 # try to load and parse vim_information
501 try:
502 vim_info_info = yaml.safe_load(vim_info["vim_info"])
503 if vim_info_info.get("name"):
504 vim_info["name"] = vim_info_info["name"]
505 except Exception:
506 pass
507 except vimconn.VimConnException as e:
508 # Mark all tasks at VIM_ERROR status
509 self.logger.error(
510 "ro_task={} vim={} get-vm={}: {}".format(
511 ro_task_id, ro_task["target_id"], vim_id, e
512 )
513 )
514 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
515 task_status = "FAILED"
516
517 ro_vim_item_update = {}
518
519 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
520 vim_interfaces = []
521 if vim_info.get("interfaces"):
522 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
523 iface = next(
524 (
525 iface
526 for iface in vim_info["interfaces"]
527 if vim_iface_id == iface["vim_interface_id"]
528 ),
529 None,
530 )
531 # if iface:
532 # iface.pop("vim_info", None)
533 vim_interfaces.append(iface)
534
535 task_create = next(
536 t
537 for t in ro_task["tasks"]
538 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
539 )
540 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
541 vim_interfaces[task_create["mgmt_vnf_interface"]][
542 "mgmt_vnf_interface"
543 ] = True
544
545 mgmt_vdu_iface = task_create.get(
546 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
547 )
548 if vim_interfaces:
549 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
550
551 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
552 ro_vim_item_update["interfaces"] = vim_interfaces
553
554 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
555 ro_vim_item_update["vim_status"] = vim_info["status"]
556
557 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
558 ro_vim_item_update["vim_name"] = vim_info.get("name")
559
560 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
561 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
562 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
563 elif vim_info["status"] == "DELETED":
564 ro_vim_item_update["vim_id"] = None
565 ro_vim_item_update["vim_message"] = "Deleted externally"
566 else:
567 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
568 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
569
570 if ro_vim_item_update:
571 self.logger.debug(
572 "ro_task={} {} get-vm={}: status={} {}".format(
573 ro_task_id,
574 ro_task["target_id"],
575 vim_id,
576 ro_vim_item_update.get("vim_status"),
577 ro_vim_item_update.get("vim_message")
578 if ro_vim_item_update.get("vim_status") != "ACTIVE"
579 else "",
580 )
581 )
582
583 return task_status, ro_vim_item_update
584
585 def exec(self, ro_task, task_index, task_depends):
586 task = ro_task["tasks"][task_index]
587 task_id = task["task_id"]
588 target_vim = self.my_vims[ro_task["target_id"]]
589 db_task_update = {"retries": 0}
590 retries = task.get("retries", 0)
591
592 try:
593 params = task["params"]
594 params_copy = deepcopy(params)
595 params_copy["ro_key"] = self.db.decrypt(
596 params_copy.pop("private_key"),
597 params_copy.pop("schema_version"),
598 params_copy.pop("salt"),
599 )
600 params_copy["ip_addr"] = params_copy.pop("ip_address")
601 target_vim.inject_user_key(**params_copy)
602 self.logger.debug(
603 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
604 )
605
606 return (
607 "DONE",
608 None,
609 db_task_update,
610 ) # params_copy["key"]
611 except (vimconn.VimConnException, NsWorkerException) as e:
612 retries += 1
613
614 self.logger.debug(traceback.format_exc())
615 if retries < self.max_retries_inject_ssh_key:
616 return (
617 "BUILD",
618 None,
619 {
620 "retries": retries,
621 "next_retry": self.time_retries_inject_ssh_key,
622 },
623 )
624
625 self.logger.error(
626 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
627 )
628 ro_vim_item_update = {"vim_message": str(e)}
629
630 return "FAILED", ro_vim_item_update, db_task_update
631
632
633 class VimInteractionImage(VimInteractionBase):
634 def new(self, ro_task, task_index, task_depends):
635 task = ro_task["tasks"][task_index]
636 task_id = task["task_id"]
637 created = False
638 created_items = {}
639 target_vim = self.my_vims[ro_task["target_id"]]
640
641 try:
642 # FIND
643 if task.get("find_params"):
644 vim_images = target_vim.get_image_list(**task["find_params"])
645
646 if not vim_images:
647 raise NsWorkerExceptionNotFound(
648 "Image not found with this criteria: '{}'".format(
649 task["find_params"]
650 )
651 )
652 elif len(vim_images) > 1:
653 raise NsWorkerException(
654 "More than one image found with this criteria: '{}'".format(
655 task["find_params"]
656 )
657 )
658 else:
659 vim_image_id = vim_images[0]["id"]
660
661 ro_vim_item_update = {
662 "vim_id": vim_image_id,
663 "vim_status": "DONE",
664 "created": created,
665 "created_items": created_items,
666 "vim_details": None,
667 "vim_message": None,
668 }
669 self.logger.debug(
670 "task={} {} new-image={} created={}".format(
671 task_id, ro_task["target_id"], vim_image_id, created
672 )
673 )
674
675 return "DONE", ro_vim_item_update
676 except (NsWorkerException, vimconn.VimConnException) as e:
677 self.logger.error(
678 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
679 )
680 ro_vim_item_update = {
681 "vim_status": "VIM_ERROR",
682 "created": created,
683 "vim_message": str(e),
684 }
685
686 return "FAILED", ro_vim_item_update
687
688
689 class VimInteractionFlavor(VimInteractionBase):
690 def delete(self, ro_task, task_index):
691 task = ro_task["tasks"][task_index]
692 task_id = task["task_id"]
693 flavor_vim_id = ro_task["vim_info"]["vim_id"]
694 ro_vim_item_update_ok = {
695 "vim_status": "DELETED",
696 "created": False,
697 "vim_message": "DELETED",
698 "vim_id": None,
699 }
700
701 try:
702 if flavor_vim_id:
703 target_vim = self.my_vims[ro_task["target_id"]]
704 target_vim.delete_flavor(flavor_vim_id)
705 except vimconn.VimConnNotFoundException:
706 ro_vim_item_update_ok["vim_message"] = "already deleted"
707 except vimconn.VimConnException as e:
708 self.logger.error(
709 "ro_task={} vim={} del-flavor={}: {}".format(
710 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
711 )
712 )
713 ro_vim_item_update = {
714 "vim_status": "VIM_ERROR",
715 "vim_message": "Error while deleting: {}".format(e),
716 }
717
718 return "FAILED", ro_vim_item_update
719
720 self.logger.debug(
721 "task={} {} del-flavor={} {}".format(
722 task_id,
723 ro_task["target_id"],
724 flavor_vim_id,
725 ro_vim_item_update_ok.get("vim_message", ""),
726 )
727 )
728
729 return "DONE", ro_vim_item_update_ok
730
731 def new(self, ro_task, task_index, task_depends):
732 task = ro_task["tasks"][task_index]
733 task_id = task["task_id"]
734 created = False
735 created_items = {}
736 target_vim = self.my_vims[ro_task["target_id"]]
737
738 try:
739 # FIND
740 vim_flavor_id = None
741
742 if task.get("find_params"):
743 try:
744 flavor_data = task["find_params"]["flavor_data"]
745 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
746 except vimconn.VimConnNotFoundException:
747 pass
748
749 if not vim_flavor_id and task.get("params"):
750 # CREATE
751 flavor_data = task["params"]["flavor_data"]
752 vim_flavor_id = target_vim.new_flavor(flavor_data)
753 created = True
754
755 ro_vim_item_update = {
756 "vim_id": vim_flavor_id,
757 "vim_status": "DONE",
758 "created": created,
759 "created_items": created_items,
760 "vim_details": None,
761 "vim_message": None,
762 }
763 self.logger.debug(
764 "task={} {} new-flavor={} created={}".format(
765 task_id, ro_task["target_id"], vim_flavor_id, created
766 )
767 )
768
769 return "DONE", ro_vim_item_update
770 except (vimconn.VimConnException, NsWorkerException) as e:
771 self.logger.error(
772 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
773 )
774 ro_vim_item_update = {
775 "vim_status": "VIM_ERROR",
776 "created": created,
777 "vim_message": str(e),
778 }
779
780 return "FAILED", ro_vim_item_update
781
782
783 class VimInteractionAffinityGroup(VimInteractionBase):
784 def delete(self, ro_task, task_index):
785 task = ro_task["tasks"][task_index]
786 task_id = task["task_id"]
787 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
788 ro_vim_item_update_ok = {
789 "vim_status": "DELETED",
790 "created": False,
791 "vim_message": "DELETED",
792 "vim_id": None,
793 }
794
795 try:
796 if affinity_group_vim_id:
797 target_vim = self.my_vims[ro_task["target_id"]]
798 target_vim.delete_affinity_group(affinity_group_vim_id)
799 except vimconn.VimConnNotFoundException:
800 ro_vim_item_update_ok["vim_message"] = "already deleted"
801 except vimconn.VimConnException as e:
802 self.logger.error(
803 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
804 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
805 )
806 )
807 ro_vim_item_update = {
808 "vim_status": "VIM_ERROR",
809 "vim_message": "Error while deleting: {}".format(e),
810 }
811
812 return "FAILED", ro_vim_item_update
813
814 self.logger.debug(
815 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
816 task_id,
817 ro_task["target_id"],
818 affinity_group_vim_id,
819 ro_vim_item_update_ok.get("vim_message", ""),
820 )
821 )
822
823 return "DONE", ro_vim_item_update_ok
824
825 def new(self, ro_task, task_index, task_depends):
826 task = ro_task["tasks"][task_index]
827 task_id = task["task_id"]
828 created = False
829 created_items = {}
830 target_vim = self.my_vims[ro_task["target_id"]]
831
832 try:
833 affinity_group_vim_id = None
834 affinity_group_data = None
835
836 if task.get("params"):
837 affinity_group_data = task["params"].get("affinity_group_data")
838
839 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
840 try:
841 param_affinity_group_id = task["params"]["affinity_group_data"].get(
842 "vim-affinity-group-id"
843 )
844 affinity_group_vim_id = target_vim.get_affinity_group(
845 param_affinity_group_id
846 ).get("id")
847 except vimconn.VimConnNotFoundException:
848 self.logger.error(
849 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
850 "could not be found at VIM. Creating a new one.".format(
851 task_id, ro_task["target_id"], param_affinity_group_id
852 )
853 )
854
855 if not affinity_group_vim_id and affinity_group_data:
856 affinity_group_vim_id = target_vim.new_affinity_group(
857 affinity_group_data
858 )
859 created = True
860
861 ro_vim_item_update = {
862 "vim_id": affinity_group_vim_id,
863 "vim_status": "DONE",
864 "created": created,
865 "created_items": created_items,
866 "vim_details": None,
867 "vim_message": None,
868 }
869 self.logger.debug(
870 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
871 task_id, ro_task["target_id"], affinity_group_vim_id, created
872 )
873 )
874
875 return "DONE", ro_vim_item_update
876 except (vimconn.VimConnException, NsWorkerException) as e:
877 self.logger.error(
878 "task={} vim={} new-affinity-or-anti-affinity-group:"
879 " {}".format(task_id, ro_task["target_id"], e)
880 )
881 ro_vim_item_update = {
882 "vim_status": "VIM_ERROR",
883 "created": created,
884 "vim_message": str(e),
885 }
886
887 return "FAILED", ro_vim_item_update
888
889
890 class VimInteractionUpdateVdu(VimInteractionBase):
891 def exec(self, ro_task, task_index, task_depends):
892 task = ro_task["tasks"][task_index]
893 task_id = task["task_id"]
894 db_task_update = {"retries": 0}
895 created = False
896 created_items = {}
897 target_vim = self.my_vims[ro_task["target_id"]]
898
899 try:
900 if task.get("params"):
901 vim_vm_id = task["params"].get("vim_vm_id")
902 action = task["params"].get("action")
903 context = {action: action}
904 target_vim.action_vminstance(vim_vm_id, context)
905 # created = True
906 ro_vim_item_update = {
907 "vim_id": vim_vm_id,
908 "vim_status": "DONE",
909 "created": created,
910 "created_items": created_items,
911 "vim_details": None,
912 "vim_message": None,
913 }
914 self.logger.debug(
915 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
916 )
917 return "DONE", ro_vim_item_update, db_task_update
918 except (vimconn.VimConnException, NsWorkerException) as e:
919 self.logger.error(
920 "task={} vim={} VM Migration:"
921 " {}".format(task_id, ro_task["target_id"], e)
922 )
923 ro_vim_item_update = {
924 "vim_status": "VIM_ERROR",
925 "created": created,
926 "vim_message": str(e),
927 }
928
929 return "FAILED", ro_vim_item_update, db_task_update
930
931
932 class VimInteractionSdnNet(VimInteractionBase):
933 @staticmethod
934 def _match_pci(port_pci, mapping):
935 """
936 Check if port_pci matches with mapping
937 mapping can have brackets to indicate that several chars are accepted. e.g
938 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
939 :param port_pci: text
940 :param mapping: text, can contain brackets to indicate several chars are available
941 :return: True if matches, False otherwise
942 """
943 if not port_pci or not mapping:
944 return False
945 if port_pci == mapping:
946 return True
947
948 mapping_index = 0
949 pci_index = 0
950 while True:
951 bracket_start = mapping.find("[", mapping_index)
952
953 if bracket_start == -1:
954 break
955
956 bracket_end = mapping.find("]", bracket_start)
957 if bracket_end == -1:
958 break
959
960 length = bracket_start - mapping_index
961 if (
962 length
963 and port_pci[pci_index : pci_index + length]
964 != mapping[mapping_index:bracket_start]
965 ):
966 return False
967
968 if (
969 port_pci[pci_index + length]
970 not in mapping[bracket_start + 1 : bracket_end]
971 ):
972 return False
973
974 pci_index += length + 1
975 mapping_index = bracket_end + 1
976
977 if port_pci[pci_index:] != mapping[mapping_index:]:
978 return False
979
980 return True
981
982 def _get_interfaces(self, vlds_to_connect, vim_account_id):
983 """
984 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
985 :param vim_account_id:
986 :return:
987 """
988 interfaces = []
989
990 for vld in vlds_to_connect:
991 table, _, db_id = vld.partition(":")
992 db_id, _, vld = db_id.partition(":")
993 _, _, vld_id = vld.partition(".")
994
995 if table == "vnfrs":
996 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
997 iface_key = "vnf-vld-id"
998 else: # table == "nsrs"
999 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1000 iface_key = "ns-vld-id"
1001
1002 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1003
1004 for db_vnfr in db_vnfrs:
1005 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1006 for iface_index, interface in enumerate(vdur["interfaces"]):
1007 if interface.get(iface_key) == vld_id and interface.get(
1008 "type"
1009 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1010 # only SR-IOV o PT
1011 interface_ = interface.copy()
1012 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1013 db_vnfr["_id"], vdu_index, iface_index
1014 )
1015
1016 if vdur.get("status") == "ERROR":
1017 interface_["status"] = "ERROR"
1018
1019 interfaces.append(interface_)
1020
1021 return interfaces
1022
1023 def refresh(self, ro_task):
1024 # look for task create
1025 task_create_index, _ = next(
1026 i_t
1027 for i_t in enumerate(ro_task["tasks"])
1028 if i_t[1]
1029 and i_t[1]["action"] == "CREATE"
1030 and i_t[1]["status"] != "FINISHED"
1031 )
1032
1033 return self.new(ro_task, task_create_index, None)
1034
1035 def new(self, ro_task, task_index, task_depends):
1036
1037 task = ro_task["tasks"][task_index]
1038 task_id = task["task_id"]
1039 target_vim = self.my_vims[ro_task["target_id"]]
1040
1041 sdn_net_id = ro_task["vim_info"]["vim_id"]
1042
1043 created_items = ro_task["vim_info"].get("created_items")
1044 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1045 new_connected_ports = []
1046 last_update = ro_task["vim_info"].get("last_update", 0)
1047 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1048 error_list = []
1049 created = ro_task["vim_info"].get("created", False)
1050
1051 try:
1052 # CREATE
1053 params = task["params"]
1054 vlds_to_connect = params["vlds"]
1055 associated_vim = params["target_vim"]
1056 # external additional ports
1057 additional_ports = params.get("sdn-ports") or ()
1058 _, _, vim_account_id = associated_vim.partition(":")
1059
1060 if associated_vim:
1061 # get associated VIM
1062 if associated_vim not in self.db_vims:
1063 self.db_vims[associated_vim] = self.db.get_one(
1064 "vim_accounts", {"_id": vim_account_id}
1065 )
1066
1067 db_vim = self.db_vims[associated_vim]
1068
1069 # look for ports to connect
1070 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1071 # print(ports)
1072
1073 sdn_ports = []
1074 pending_ports = error_ports = 0
1075 vlan_used = None
1076 sdn_need_update = False
1077
1078 for port in ports:
1079 vlan_used = port.get("vlan") or vlan_used
1080
1081 # TODO. Do not connect if already done
1082 if not port.get("compute_node") or not port.get("pci"):
1083 if port.get("status") == "ERROR":
1084 error_ports += 1
1085 else:
1086 pending_ports += 1
1087 continue
1088
1089 pmap = None
1090 compute_node_mappings = next(
1091 (
1092 c
1093 for c in db_vim["config"].get("sdn-port-mapping", ())
1094 if c and c["compute_node"] == port["compute_node"]
1095 ),
1096 None,
1097 )
1098
1099 if compute_node_mappings:
1100 # process port_mapping pci of type 0000:af:1[01].[1357]
1101 pmap = next(
1102 (
1103 p
1104 for p in compute_node_mappings["ports"]
1105 if self._match_pci(port["pci"], p.get("pci"))
1106 ),
1107 None,
1108 )
1109
1110 if not pmap:
1111 if not db_vim["config"].get("mapping_not_needed"):
1112 error_list.append(
1113 "Port mapping not found for compute_node={} pci={}".format(
1114 port["compute_node"], port["pci"]
1115 )
1116 )
1117 continue
1118
1119 pmap = {}
1120
1121 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1122 new_port = {
1123 "service_endpoint_id": pmap.get("service_endpoint_id")
1124 or service_endpoint_id,
1125 "service_endpoint_encapsulation_type": "dot1q"
1126 if port["type"] == "SR-IOV"
1127 else None,
1128 "service_endpoint_encapsulation_info": {
1129 "vlan": port.get("vlan"),
1130 "mac": port.get("mac-address"),
1131 "device_id": pmap.get("device_id") or port["compute_node"],
1132 "device_interface_id": pmap.get("device_interface_id")
1133 or port["pci"],
1134 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1135 "switch_port": pmap.get("switch_port"),
1136 "service_mapping_info": pmap.get("service_mapping_info"),
1137 },
1138 }
1139
1140 # TODO
1141 # if port["modified_at"] > last_update:
1142 # sdn_need_update = True
1143 new_connected_ports.append(port["id"]) # TODO
1144 sdn_ports.append(new_port)
1145
1146 if error_ports:
1147 error_list.append(
1148 "{} interfaces have not been created as VDU is on ERROR status".format(
1149 error_ports
1150 )
1151 )
1152
1153 # connect external ports
1154 for index, additional_port in enumerate(additional_ports):
1155 additional_port_id = additional_port.get(
1156 "service_endpoint_id"
1157 ) or "external-{}".format(index)
1158 sdn_ports.append(
1159 {
1160 "service_endpoint_id": additional_port_id,
1161 "service_endpoint_encapsulation_type": additional_port.get(
1162 "service_endpoint_encapsulation_type", "dot1q"
1163 ),
1164 "service_endpoint_encapsulation_info": {
1165 "vlan": additional_port.get("vlan") or vlan_used,
1166 "mac": additional_port.get("mac_address"),
1167 "device_id": additional_port.get("device_id"),
1168 "device_interface_id": additional_port.get(
1169 "device_interface_id"
1170 ),
1171 "switch_dpid": additional_port.get("switch_dpid")
1172 or additional_port.get("switch_id"),
1173 "switch_port": additional_port.get("switch_port"),
1174 "service_mapping_info": additional_port.get(
1175 "service_mapping_info"
1176 ),
1177 },
1178 }
1179 )
1180 new_connected_ports.append(additional_port_id)
1181 sdn_info = ""
1182
1183 # if there are more ports to connect or they have been modified, call create/update
1184 if error_list:
1185 sdn_status = "ERROR"
1186 sdn_info = "; ".join(error_list)
1187 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1188 last_update = time.time()
1189
1190 if not sdn_net_id:
1191 if len(sdn_ports) < 2:
1192 sdn_status = "ACTIVE"
1193
1194 if not pending_ports:
1195 self.logger.debug(
1196 "task={} {} new-sdn-net done, less than 2 ports".format(
1197 task_id, ro_task["target_id"]
1198 )
1199 )
1200 else:
1201 net_type = params.get("type") or "ELAN"
1202 (
1203 sdn_net_id,
1204 created_items,
1205 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1206 created = True
1207 self.logger.debug(
1208 "task={} {} new-sdn-net={} created={}".format(
1209 task_id, ro_task["target_id"], sdn_net_id, created
1210 )
1211 )
1212 else:
1213 created_items = target_vim.edit_connectivity_service(
1214 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1215 )
1216 created = True
1217 self.logger.debug(
1218 "task={} {} update-sdn-net={} created={}".format(
1219 task_id, ro_task["target_id"], sdn_net_id, created
1220 )
1221 )
1222
1223 connected_ports = new_connected_ports
1224 elif sdn_net_id:
1225 wim_status_dict = target_vim.get_connectivity_service_status(
1226 sdn_net_id, conn_info=created_items
1227 )
1228 sdn_status = wim_status_dict["sdn_status"]
1229
1230 if wim_status_dict.get("sdn_info"):
1231 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1232
1233 if wim_status_dict.get("error_msg"):
1234 sdn_info = wim_status_dict.get("error_msg") or ""
1235
1236 if pending_ports:
1237 if sdn_status != "ERROR":
1238 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1239 len(ports) - pending_ports, len(ports)
1240 )
1241
1242 if sdn_status == "ACTIVE":
1243 sdn_status = "BUILD"
1244
1245 ro_vim_item_update = {
1246 "vim_id": sdn_net_id,
1247 "vim_status": sdn_status,
1248 "created": created,
1249 "created_items": created_items,
1250 "connected_ports": connected_ports,
1251 "vim_details": sdn_info,
1252 "vim_message": None,
1253 "last_update": last_update,
1254 }
1255
1256 return sdn_status, ro_vim_item_update
1257 except Exception as e:
1258 self.logger.error(
1259 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1260 exc_info=not isinstance(
1261 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1262 ),
1263 )
1264 ro_vim_item_update = {
1265 "vim_status": "VIM_ERROR",
1266 "created": created,
1267 "vim_message": str(e),
1268 }
1269
1270 return "FAILED", ro_vim_item_update
1271
1272 def delete(self, ro_task, task_index):
1273 task = ro_task["tasks"][task_index]
1274 task_id = task["task_id"]
1275 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1276 ro_vim_item_update_ok = {
1277 "vim_status": "DELETED",
1278 "created": False,
1279 "vim_message": "DELETED",
1280 "vim_id": None,
1281 }
1282
1283 try:
1284 if sdn_vim_id:
1285 target_vim = self.my_vims[ro_task["target_id"]]
1286 target_vim.delete_connectivity_service(
1287 sdn_vim_id, ro_task["vim_info"].get("created_items")
1288 )
1289
1290 except Exception as e:
1291 if (
1292 isinstance(e, sdnconn.SdnConnectorError)
1293 and e.http_code == HTTPStatus.NOT_FOUND.value
1294 ):
1295 ro_vim_item_update_ok["vim_message"] = "already deleted"
1296 else:
1297 self.logger.error(
1298 "ro_task={} vim={} del-sdn-net={}: {}".format(
1299 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1300 ),
1301 exc_info=not isinstance(
1302 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1303 ),
1304 )
1305 ro_vim_item_update = {
1306 "vim_status": "VIM_ERROR",
1307 "vim_message": "Error while deleting: {}".format(e),
1308 }
1309
1310 return "FAILED", ro_vim_item_update
1311
1312 self.logger.debug(
1313 "task={} {} del-sdn-net={} {}".format(
1314 task_id,
1315 ro_task["target_id"],
1316 sdn_vim_id,
1317 ro_vim_item_update_ok.get("vim_message", ""),
1318 )
1319 )
1320
1321 return "DONE", ro_vim_item_update_ok
1322
1323
1324 class VimInteractionMigration(VimInteractionBase):
1325 def exec(self, ro_task, task_index, task_depends):
1326 task = ro_task["tasks"][task_index]
1327 task_id = task["task_id"]
1328 db_task_update = {"retries": 0}
1329 target_vim = self.my_vims[ro_task["target_id"]]
1330 vim_interfaces = []
1331 created = False
1332 created_items = {}
1333 refreshed_vim_info = {}
1334
1335 try:
1336 if task.get("params"):
1337 vim_vm_id = task["params"].get("vim_vm_id")
1338 migrate_host = task["params"].get("migrate_host")
1339 _, migrated_compute_node = target_vim.migrate_instance(
1340 vim_vm_id, migrate_host
1341 )
1342
1343 if migrated_compute_node:
1344 # When VM is migrated, vdu["vim_info"] needs to be updated
1345 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1346 ro_task["target_id"]
1347 )
1348
1349 # Refresh VM to get new vim_info
1350 vm_to_refresh_list = [vim_vm_id]
1351 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1352 refreshed_vim_info = vim_dict[vim_vm_id]
1353
1354 if refreshed_vim_info.get("interfaces"):
1355 for old_iface in vdu_old_vim_info.get("interfaces"):
1356 iface = next(
1357 (
1358 iface
1359 for iface in refreshed_vim_info["interfaces"]
1360 if old_iface["vim_interface_id"]
1361 == iface["vim_interface_id"]
1362 ),
1363 None,
1364 )
1365 vim_interfaces.append(iface)
1366
1367 ro_vim_item_update = {
1368 "vim_id": vim_vm_id,
1369 "vim_status": "ACTIVE",
1370 "created": created,
1371 "created_items": created_items,
1372 "vim_details": None,
1373 "vim_message": None,
1374 }
1375
1376 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1377 "ERROR",
1378 "VIM_ERROR",
1379 ):
1380 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1381
1382 if vim_interfaces:
1383 ro_vim_item_update["interfaces"] = vim_interfaces
1384
1385 self.logger.debug(
1386 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1387 )
1388
1389 return "DONE", ro_vim_item_update, db_task_update
1390
1391 except (vimconn.VimConnException, NsWorkerException) as e:
1392 self.logger.error(
1393 "task={} vim={} VM Migration:"
1394 " {}".format(task_id, ro_task["target_id"], e)
1395 )
1396 ro_vim_item_update = {
1397 "vim_status": "VIM_ERROR",
1398 "created": created,
1399 "vim_message": str(e),
1400 }
1401
1402 return "FAILED", ro_vim_item_update, db_task_update
1403
1404
1405 class NsWorker(threading.Thread):
1406 REFRESH_BUILD = 5 # 5 seconds
1407 REFRESH_ACTIVE = 60 # 1 minute
1408 REFRESH_ERROR = 600
1409 REFRESH_IMAGE = 3600 * 10
1410 REFRESH_DELETE = 3600 * 10
1411 QUEUE_SIZE = 100
1412 terminate = False
1413
1414 def __init__(self, worker_index, config, plugins, db):
1415 """
1416
1417 :param worker_index: thread index
1418 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1419 :param plugins: global shared dict with the loaded plugins
1420 :param db: database class instance to use
1421 """
1422 threading.Thread.__init__(self)
1423 self.config = config
1424 self.plugins = plugins
1425 self.plugin_name = "unknown"
1426 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1427 self.worker_index = worker_index
1428 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1429 # targetvim: vimplugin class
1430 self.my_vims = {}
1431 # targetvim: vim information from database
1432 self.db_vims = {}
1433 # targetvim list
1434 self.vim_targets = []
1435 self.my_id = config["process_id"] + ":" + str(worker_index)
1436 self.db = db
1437 self.item2class = {
1438 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1439 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1440 "image": VimInteractionImage(
1441 self.db, self.my_vims, self.db_vims, self.logger
1442 ),
1443 "flavor": VimInteractionFlavor(
1444 self.db, self.my_vims, self.db_vims, self.logger
1445 ),
1446 "sdn_net": VimInteractionSdnNet(
1447 self.db, self.my_vims, self.db_vims, self.logger
1448 ),
1449 "update": VimInteractionUpdateVdu(
1450 self.db, self.my_vims, self.db_vims, self.logger
1451 ),
1452 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1453 self.db, self.my_vims, self.db_vims, self.logger
1454 ),
1455 "migrate": VimInteractionMigration(
1456 self.db, self.my_vims, self.db_vims, self.logger
1457 ),
1458 }
1459 self.time_last_task_processed = None
1460 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1461 self.tasks_to_delete = []
1462 # it is idle when there are not vim_targets associated
1463 self.idle = True
1464 self.task_locked_time = config["global"]["task_locked_time"]
1465
1466 def insert_task(self, task):
1467 try:
1468 self.task_queue.put(task, False)
1469 return None
1470 except queue.Full:
1471 raise NsWorkerException("timeout inserting a task")
1472
1473 def terminate(self):
1474 self.insert_task("exit")
1475
1476 def del_task(self, task):
1477 with self.task_lock:
1478 if task["status"] == "SCHEDULED":
1479 task["status"] = "SUPERSEDED"
1480 return True
1481 else: # task["status"] == "processing"
1482 self.task_lock.release()
1483 return False
1484
1485 def _process_vim_config(self, target_id, db_vim):
1486 """
1487 Process vim config, creating vim configuration files as ca_cert
1488 :param target_id: vim/sdn/wim + id
1489 :param db_vim: Vim dictionary obtained from database
1490 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1491 """
1492 if not db_vim.get("config"):
1493 return
1494
1495 file_name = ""
1496
1497 try:
1498 if db_vim["config"].get("ca_cert_content"):
1499 file_name = "{}:{}".format(target_id, self.worker_index)
1500
1501 try:
1502 mkdir(file_name)
1503 except FileExistsError:
1504 pass
1505
1506 file_name = file_name + "/ca_cert"
1507
1508 with open(file_name, "w") as f:
1509 f.write(db_vim["config"]["ca_cert_content"])
1510 del db_vim["config"]["ca_cert_content"]
1511 db_vim["config"]["ca_cert"] = file_name
1512 except Exception as e:
1513 raise NsWorkerException(
1514 "Error writing to file '{}': {}".format(file_name, e)
1515 )
1516
1517 def _load_plugin(self, name, type="vim"):
1518 # type can be vim or sdn
1519 if "rovim_dummy" not in self.plugins:
1520 self.plugins["rovim_dummy"] = VimDummyConnector
1521
1522 if "rosdn_dummy" not in self.plugins:
1523 self.plugins["rosdn_dummy"] = SdnDummyConnector
1524
1525 if name in self.plugins:
1526 return self.plugins[name]
1527
1528 try:
1529 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1530 self.plugins[name] = ep.load()
1531 except Exception as e:
1532 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1533
1534 if name and name not in self.plugins:
1535 raise NsWorkerException(
1536 "Plugin 'osm_{n}' has not been installed".format(n=name)
1537 )
1538
1539 return self.plugins[name]
1540
1541 def _unload_vim(self, target_id):
1542 """
1543 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1544 :param target_id: Contains type:_id; where type can be 'vim', ...
1545 :return: None.
1546 """
1547 try:
1548 self.db_vims.pop(target_id, None)
1549 self.my_vims.pop(target_id, None)
1550
1551 if target_id in self.vim_targets:
1552 self.vim_targets.remove(target_id)
1553
1554 self.logger.info("Unloaded {}".format(target_id))
1555 rmtree("{}:{}".format(target_id, self.worker_index))
1556 except FileNotFoundError:
1557 pass # this is raised by rmtree if folder does not exist
1558 except Exception as e:
1559 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1560
1561 def _check_vim(self, target_id):
1562 """
1563 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1564 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1565 :return: None.
1566 """
1567 target, _, _id = target_id.partition(":")
1568 now = time.time()
1569 update_dict = {}
1570 unset_dict = {}
1571 op_text = ""
1572 step = ""
1573 loaded = target_id in self.vim_targets
1574 target_database = (
1575 "vim_accounts"
1576 if target == "vim"
1577 else "wim_accounts"
1578 if target == "wim"
1579 else "sdns"
1580 )
1581
1582 try:
1583 step = "Getting {} from db".format(target_id)
1584 db_vim = self.db.get_one(target_database, {"_id": _id})
1585
1586 for op_index, operation in enumerate(
1587 db_vim["_admin"].get("operations", ())
1588 ):
1589 if operation["operationState"] != "PROCESSING":
1590 continue
1591
1592 locked_at = operation.get("locked_at")
1593
1594 if locked_at is not None and locked_at >= now - self.task_locked_time:
1595 # some other thread is doing this operation
1596 return
1597
1598 # lock
1599 op_text = "_admin.operations.{}.".format(op_index)
1600
1601 if not self.db.set_one(
1602 target_database,
1603 q_filter={
1604 "_id": _id,
1605 op_text + "operationState": "PROCESSING",
1606 op_text + "locked_at": locked_at,
1607 },
1608 update_dict={
1609 op_text + "locked_at": now,
1610 "admin.current_operation": op_index,
1611 },
1612 fail_on_empty=False,
1613 ):
1614 return
1615
1616 unset_dict[op_text + "locked_at"] = None
1617 unset_dict["current_operation"] = None
1618 step = "Loading " + target_id
1619 error_text = self._load_vim(target_id)
1620
1621 if not error_text:
1622 step = "Checking connectivity"
1623
1624 if target == "vim":
1625 self.my_vims[target_id].check_vim_connectivity()
1626 else:
1627 self.my_vims[target_id].check_credentials()
1628
1629 update_dict["_admin.operationalState"] = "ENABLED"
1630 update_dict["_admin.detailed-status"] = ""
1631 unset_dict[op_text + "detailed-status"] = None
1632 update_dict[op_text + "operationState"] = "COMPLETED"
1633
1634 return
1635
1636 except Exception as e:
1637 error_text = "{}: {}".format(step, e)
1638 self.logger.error("{} for {}: {}".format(step, target_id, e))
1639
1640 finally:
1641 if update_dict or unset_dict:
1642 if error_text:
1643 update_dict[op_text + "operationState"] = "FAILED"
1644 update_dict[op_text + "detailed-status"] = error_text
1645 unset_dict.pop(op_text + "detailed-status", None)
1646 update_dict["_admin.operationalState"] = "ERROR"
1647 update_dict["_admin.detailed-status"] = error_text
1648
1649 if op_text:
1650 update_dict[op_text + "statusEnteredTime"] = now
1651
1652 self.db.set_one(
1653 target_database,
1654 q_filter={"_id": _id},
1655 update_dict=update_dict,
1656 unset=unset_dict,
1657 fail_on_empty=False,
1658 )
1659
1660 if not loaded:
1661 self._unload_vim(target_id)
1662
1663 def _reload_vim(self, target_id):
1664 if target_id in self.vim_targets:
1665 self._load_vim(target_id)
1666 else:
1667 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1668 # just remove it to force load again next time it is needed
1669 self.db_vims.pop(target_id, None)
1670
1671 def _load_vim(self, target_id):
1672 """
1673 Load or reload a vim_account, sdn_controller or wim_account.
1674 Read content from database, load the plugin if not loaded.
1675 In case of error loading the plugin, it load a failing VIM_connector
1676 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1677 :param target_id: Contains type:_id; where type can be 'vim', ...
1678 :return: None if ok, descriptive text if error
1679 """
1680 target, _, _id = target_id.partition(":")
1681 target_database = (
1682 "vim_accounts"
1683 if target == "vim"
1684 else "wim_accounts"
1685 if target == "wim"
1686 else "sdns"
1687 )
1688 plugin_name = ""
1689 vim = None
1690
1691 try:
1692 step = "Getting {}={} from db".format(target, _id)
1693 # TODO process for wim, sdnc, ...
1694 vim = self.db.get_one(target_database, {"_id": _id})
1695
1696 # if deep_get(vim, "config", "sdn-controller"):
1697 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1698 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1699
1700 step = "Decrypting password"
1701 schema_version = vim.get("schema_version")
1702 self.db.encrypt_decrypt_fields(
1703 vim,
1704 "decrypt",
1705 fields=("password", "secret"),
1706 schema_version=schema_version,
1707 salt=_id,
1708 )
1709 self._process_vim_config(target_id, vim)
1710
1711 if target == "vim":
1712 plugin_name = "rovim_" + vim["vim_type"]
1713 step = "Loading plugin '{}'".format(plugin_name)
1714 vim_module_conn = self._load_plugin(plugin_name)
1715 step = "Loading {}'".format(target_id)
1716 self.my_vims[target_id] = vim_module_conn(
1717 uuid=vim["_id"],
1718 name=vim["name"],
1719 tenant_id=vim.get("vim_tenant_id"),
1720 tenant_name=vim.get("vim_tenant_name"),
1721 url=vim["vim_url"],
1722 url_admin=None,
1723 user=vim["vim_user"],
1724 passwd=vim["vim_password"],
1725 config=vim.get("config") or {},
1726 persistent_info={},
1727 )
1728 else: # sdn
1729 plugin_name = "rosdn_" + vim["type"]
1730 step = "Loading plugin '{}'".format(plugin_name)
1731 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1732 step = "Loading {}'".format(target_id)
1733 wim = deepcopy(vim)
1734 wim_config = wim.pop("config", {}) or {}
1735 wim["uuid"] = wim["_id"]
1736 wim["wim_url"] = wim["url"]
1737
1738 if wim.get("dpid"):
1739 wim_config["dpid"] = wim.pop("dpid")
1740
1741 if wim.get("switch_id"):
1742 wim_config["switch_id"] = wim.pop("switch_id")
1743
1744 # wim, wim_account, config
1745 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1746 self.db_vims[target_id] = vim
1747 self.error_status = None
1748
1749 self.logger.info(
1750 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1751 )
1752 except Exception as e:
1753 self.logger.error(
1754 "Cannot load {} plugin={}: {} {}".format(
1755 target_id, plugin_name, step, e
1756 )
1757 )
1758
1759 self.db_vims[target_id] = vim or {}
1760 self.db_vims[target_id] = FailingConnector(str(e))
1761 error_status = "{} Error: {}".format(step, e)
1762
1763 return error_status
1764 finally:
1765 if target_id not in self.vim_targets:
1766 self.vim_targets.append(target_id)
1767
1768 def _get_db_task(self):
1769 """
1770 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1771 :return: None
1772 """
1773 now = time.time()
1774
1775 if not self.time_last_task_processed:
1776 self.time_last_task_processed = now
1777
1778 try:
1779 while True:
1780 """
1781 # Log RO tasks only when loglevel is DEBUG
1782 if self.logger.getEffectiveLevel() == logging.DEBUG:
1783 self._log_ro_task(
1784 None,
1785 None,
1786 None,
1787 "TASK_WF",
1788 "task_locked_time="
1789 + str(self.task_locked_time)
1790 + " "
1791 + "time_last_task_processed="
1792 + str(self.time_last_task_processed)
1793 + " "
1794 + "now="
1795 + str(now),
1796 )
1797 """
1798 locked = self.db.set_one(
1799 "ro_tasks",
1800 q_filter={
1801 "target_id": self.vim_targets,
1802 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1803 "locked_at.lt": now - self.task_locked_time,
1804 "to_check_at.lt": self.time_last_task_processed,
1805 },
1806 update_dict={"locked_by": self.my_id, "locked_at": now},
1807 fail_on_empty=False,
1808 )
1809
1810 if locked:
1811 # read and return
1812 ro_task = self.db.get_one(
1813 "ro_tasks",
1814 q_filter={
1815 "target_id": self.vim_targets,
1816 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1817 "locked_at": now,
1818 },
1819 )
1820 return ro_task
1821
1822 if self.time_last_task_processed == now:
1823 self.time_last_task_processed = None
1824 return None
1825 else:
1826 self.time_last_task_processed = now
1827 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1828
1829 except DbException as e:
1830 self.logger.error("Database exception at _get_db_task: {}".format(e))
1831 except Exception as e:
1832 self.logger.critical(
1833 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1834 )
1835
1836 return None
1837
1838 def _get_db_all_tasks(self):
1839 """
1840 Read all content of table ro_tasks to log it
1841 :return: None
1842 """
1843 try:
1844 # Checking the content of the BD:
1845
1846 # read and return
1847 ro_task = self.db.get_list("ro_tasks")
1848 for rt in ro_task:
1849 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1850 return ro_task
1851
1852 except DbException as e:
1853 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1854 except Exception as e:
1855 self.logger.critical(
1856 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1857 )
1858
1859 return None
1860
1861 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1862 """
1863 Generate a log with the following format:
1864
1865 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1866 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1867 task_array_index;task_id;task_action;task_item;task_args
1868
1869 Example:
1870
1871 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1872 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1873 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1874 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1875 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1876 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1877 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1878 """
1879 try:
1880 line = []
1881 i = 0
1882 if ro_task is not None and isinstance(ro_task, dict):
1883 for t in ro_task["tasks"]:
1884 line.clear()
1885 line.append(mark)
1886 line.append(event)
1887 line.append(ro_task.get("_id", ""))
1888 line.append(str(ro_task.get("locked_at", "")))
1889 line.append(str(ro_task.get("modified_at", "")))
1890 line.append(str(ro_task.get("created_at", "")))
1891 line.append(str(ro_task.get("to_check_at", "")))
1892 line.append(str(ro_task.get("locked_by", "")))
1893 line.append(str(ro_task.get("target_id", "")))
1894 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1895 line.append(str(ro_task.get("vim_info", "")))
1896 line.append(str(ro_task.get("tasks", "")))
1897 if isinstance(t, dict):
1898 line.append(str(t.get("status", "")))
1899 line.append(str(t.get("action_id", "")))
1900 line.append(str(i))
1901 line.append(str(t.get("task_id", "")))
1902 line.append(str(t.get("action", "")))
1903 line.append(str(t.get("item", "")))
1904 line.append(str(t.get("find_params", "")))
1905 line.append(str(t.get("params", "")))
1906 else:
1907 line.extend([""] * 2)
1908 line.append(str(i))
1909 line.extend([""] * 5)
1910
1911 i += 1
1912 self.logger.debug(";".join(line))
1913 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1914 i = 0
1915 while True:
1916 st = "tasks.{}.status".format(i)
1917 if st not in db_ro_task_update:
1918 break
1919 line.clear()
1920 line.append(mark)
1921 line.append(event)
1922 line.append(db_ro_task_update.get("_id", ""))
1923 line.append(str(db_ro_task_update.get("locked_at", "")))
1924 line.append(str(db_ro_task_update.get("modified_at", "")))
1925 line.append("")
1926 line.append(str(db_ro_task_update.get("to_check_at", "")))
1927 line.append(str(db_ro_task_update.get("locked_by", "")))
1928 line.append("")
1929 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1930 line.append("")
1931 line.append(str(db_ro_task_update.get("vim_info", "")))
1932 line.append(str(str(db_ro_task_update).count(".status")))
1933 line.append(db_ro_task_update.get(st, ""))
1934 line.append("")
1935 line.append(str(i))
1936 line.extend([""] * 3)
1937 i += 1
1938 self.logger.debug(";".join(line))
1939
1940 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1941 line.clear()
1942 line.append(mark)
1943 line.append(event)
1944 line.append(db_ro_task_delete.get("_id", ""))
1945 line.append("")
1946 line.append(db_ro_task_delete.get("modified_at", ""))
1947 line.extend([""] * 13)
1948 self.logger.debug(";".join(line))
1949
1950 else:
1951 line.clear()
1952 line.append(mark)
1953 line.append(event)
1954 line.extend([""] * 16)
1955 self.logger.debug(";".join(line))
1956
1957 except Exception as e:
1958 self.logger.error("Error logging ro_task: {}".format(e))
1959
1960 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1961 """
1962 Determine if this task need to be done or superseded
1963 :return: None
1964 """
1965 my_task = ro_task["tasks"][task_index]
1966 task_id = my_task["task_id"]
1967 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1968 "created_items", False
1969 )
1970
1971 self.logger.warning("Needed delete: {}".format(needed_delete))
1972 if my_task["status"] == "FAILED":
1973 return None, None # TODO need to be retry??
1974
1975 try:
1976 for index, task in enumerate(ro_task["tasks"]):
1977 if index == task_index or not task:
1978 continue # own task
1979
1980 if (
1981 my_task["target_record"] == task["target_record"]
1982 and task["action"] == "CREATE"
1983 ):
1984 # set to finished
1985 db_update["tasks.{}.status".format(index)] = task[
1986 "status"
1987 ] = "FINISHED"
1988 elif task["action"] == "CREATE" and task["status"] not in (
1989 "FINISHED",
1990 "SUPERSEDED",
1991 ):
1992 needed_delete = False
1993
1994 if needed_delete:
1995 self.logger.warning(
1996 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
1997 )
1998 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1999 else:
2000 return "SUPERSEDED", None
2001 except Exception as e:
2002 if not isinstance(e, NsWorkerException):
2003 self.logger.critical(
2004 "Unexpected exception at _delete_task task={}: {}".format(
2005 task_id, e
2006 ),
2007 exc_info=True,
2008 )
2009
2010 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2011
2012 def _create_task(self, ro_task, task_index, task_depends, db_update):
2013 """
2014 Determine if this task need to create something at VIM
2015 :return: None
2016 """
2017 my_task = ro_task["tasks"][task_index]
2018 task_id = my_task["task_id"]
2019 task_status = None
2020
2021 if my_task["status"] == "FAILED":
2022 return None, None # TODO need to be retry??
2023 elif my_task["status"] == "SCHEDULED":
2024 # check if already created by another task
2025 for index, task in enumerate(ro_task["tasks"]):
2026 if index == task_index or not task:
2027 continue # own task
2028
2029 if task["action"] == "CREATE" and task["status"] not in (
2030 "SCHEDULED",
2031 "FINISHED",
2032 "SUPERSEDED",
2033 ):
2034 return task["status"], "COPY_VIM_INFO"
2035
2036 try:
2037 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2038 ro_task, task_index, task_depends
2039 )
2040 # TODO update other CREATE tasks
2041 except Exception as e:
2042 if not isinstance(e, NsWorkerException):
2043 self.logger.error(
2044 "Error executing task={}: {}".format(task_id, e), exc_info=True
2045 )
2046
2047 task_status = "FAILED"
2048 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2049 # TODO update ro_vim_item_update
2050
2051 return task_status, ro_vim_item_update
2052 else:
2053 return None, None
2054
2055 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2056 """
2057 Look for dependency task
2058 :param task_id: Can be one of
2059 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2060 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2061 3. task.task_id: "<action_id>:number"
2062 :param ro_task:
2063 :param target_id:
2064 :return: database ro_task plus index of task
2065 """
2066 if (
2067 task_id.startswith("vim:")
2068 or task_id.startswith("sdn:")
2069 or task_id.startswith("wim:")
2070 ):
2071 target_id, _, task_id = task_id.partition(" ")
2072
2073 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2074 ro_task_dependency = self.db.get_one(
2075 "ro_tasks",
2076 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2077 fail_on_empty=False,
2078 )
2079
2080 if ro_task_dependency:
2081 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2082 if task["target_record_id"] == task_id:
2083 return ro_task_dependency, task_index
2084
2085 else:
2086 if ro_task:
2087 for task_index, task in enumerate(ro_task["tasks"]):
2088 if task and task["task_id"] == task_id:
2089 return ro_task, task_index
2090
2091 ro_task_dependency = self.db.get_one(
2092 "ro_tasks",
2093 q_filter={
2094 "tasks.ANYINDEX.task_id": task_id,
2095 "tasks.ANYINDEX.target_record.ne": None,
2096 },
2097 fail_on_empty=False,
2098 )
2099
2100 self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
2101 if ro_task_dependency:
2102 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2103 if task["task_id"] == task_id:
2104 return ro_task_dependency, task_index
2105 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2106
2107 def _process_pending_tasks(self, ro_task):
2108 ro_task_id = ro_task["_id"]
2109 now = time.time()
2110 # one day
2111 next_check_at = now + (24 * 60 * 60)
2112 db_ro_task_update = {}
2113
2114 def _update_refresh(new_status):
2115 # compute next_refresh
2116 nonlocal task
2117 nonlocal next_check_at
2118 nonlocal db_ro_task_update
2119 nonlocal ro_task
2120
2121 next_refresh = time.time()
2122
2123 if task["item"] in ("image", "flavor"):
2124 next_refresh += self.REFRESH_IMAGE
2125 elif new_status == "BUILD":
2126 next_refresh += self.REFRESH_BUILD
2127 elif new_status == "DONE":
2128 next_refresh += self.REFRESH_ACTIVE
2129 else:
2130 next_refresh += self.REFRESH_ERROR
2131
2132 next_check_at = min(next_check_at, next_refresh)
2133 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2134 ro_task["vim_info"]["refresh_at"] = next_refresh
2135
2136 try:
2137 """
2138 # Log RO tasks only when loglevel is DEBUG
2139 if self.logger.getEffectiveLevel() == logging.DEBUG:
2140 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2141 """
2142 # 0: get task_status_create
2143 lock_object = None
2144 task_status_create = None
2145 task_create = next(
2146 (
2147 t
2148 for t in ro_task["tasks"]
2149 if t
2150 and t["action"] == "CREATE"
2151 and t["status"] in ("BUILD", "DONE")
2152 ),
2153 None,
2154 )
2155
2156 if task_create:
2157 task_status_create = task_create["status"]
2158
2159 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2160 for task_action in ("DELETE", "CREATE", "EXEC"):
2161 db_vim_update = None
2162 new_status = None
2163
2164 for task_index, task in enumerate(ro_task["tasks"]):
2165 if not task:
2166 continue # task deleted
2167
2168 task_depends = {}
2169 target_update = None
2170
2171 if (
2172 (
2173 task_action in ("DELETE", "EXEC")
2174 and task["status"] not in ("SCHEDULED", "BUILD")
2175 )
2176 or task["action"] != task_action
2177 or (
2178 task_action == "CREATE"
2179 and task["status"] in ("FINISHED", "SUPERSEDED")
2180 )
2181 ):
2182 continue
2183
2184 task_path = "tasks.{}.status".format(task_index)
2185 try:
2186 db_vim_info_update = None
2187
2188 if task["status"] == "SCHEDULED":
2189 # check if tasks that this depends on have been completed
2190 dependency_not_completed = False
2191
2192 for dependency_task_id in task.get("depends_on") or ():
2193 (
2194 dependency_ro_task,
2195 dependency_task_index,
2196 ) = self._get_dependency(
2197 dependency_task_id, target_id=ro_task["target_id"]
2198 )
2199 dependency_task = dependency_ro_task["tasks"][
2200 dependency_task_index
2201 ]
2202 self.logger.warning(
2203 "dependency_ro_task={} dependency_task_index={}".format(
2204 dependency_ro_task, dependency_task_index
2205 )
2206 )
2207
2208 if dependency_task["status"] == "SCHEDULED":
2209 dependency_not_completed = True
2210 next_check_at = min(
2211 next_check_at, dependency_ro_task["to_check_at"]
2212 )
2213 # must allow dependent task to be processed first
2214 # to do this set time after last_task_processed
2215 next_check_at = max(
2216 self.time_last_task_processed, next_check_at
2217 )
2218 break
2219 elif dependency_task["status"] == "FAILED":
2220 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2221 task["action"],
2222 task["item"],
2223 dependency_task["action"],
2224 dependency_task["item"],
2225 dependency_task_id,
2226 dependency_ro_task["vim_info"].get(
2227 "vim_message"
2228 ),
2229 )
2230 self.logger.error(
2231 "task={} {}".format(task["task_id"], error_text)
2232 )
2233 raise NsWorkerException(error_text)
2234
2235 task_depends[dependency_task_id] = dependency_ro_task[
2236 "vim_info"
2237 ]["vim_id"]
2238 task_depends[
2239 "TASK-{}".format(dependency_task_id)
2240 ] = dependency_ro_task["vim_info"]["vim_id"]
2241
2242 if dependency_not_completed:
2243 self.logger.warning(
2244 "DEPENDENCY NOT COMPLETED {}".format(
2245 dependency_ro_task["vim_info"]["vim_id"]
2246 )
2247 )
2248 # TODO set at vim_info.vim_details that it is waiting
2249 continue
2250
2251 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2252 # the task of renew this locking. It will update database locket_at periodically
2253 if not lock_object:
2254 lock_object = LockRenew.add_lock_object(
2255 "ro_tasks", ro_task, self
2256 )
2257
2258 if task["action"] == "DELETE":
2259 (new_status, db_vim_info_update,) = self._delete_task(
2260 ro_task, task_index, task_depends, db_ro_task_update
2261 )
2262 new_status = (
2263 "FINISHED" if new_status == "DONE" else new_status
2264 )
2265 # ^with FINISHED instead of DONE it will not be refreshing
2266
2267 if new_status in ("FINISHED", "SUPERSEDED"):
2268 target_update = "DELETE"
2269 elif task["action"] == "EXEC":
2270 (
2271 new_status,
2272 db_vim_info_update,
2273 db_task_update,
2274 ) = self.item2class[task["item"]].exec(
2275 ro_task, task_index, task_depends
2276 )
2277 new_status = (
2278 "FINISHED" if new_status == "DONE" else new_status
2279 )
2280 # ^with FINISHED instead of DONE it will not be refreshing
2281
2282 if db_task_update:
2283 # load into database the modified db_task_update "retries" and "next_retry"
2284 if db_task_update.get("retries"):
2285 db_ro_task_update[
2286 "tasks.{}.retries".format(task_index)
2287 ] = db_task_update["retries"]
2288
2289 next_check_at = time.time() + db_task_update.get(
2290 "next_retry", 60
2291 )
2292 target_update = None
2293 elif task["action"] == "CREATE":
2294 if task["status"] == "SCHEDULED":
2295 if task_status_create:
2296 new_status = task_status_create
2297 target_update = "COPY_VIM_INFO"
2298 else:
2299 new_status, db_vim_info_update = self.item2class[
2300 task["item"]
2301 ].new(ro_task, task_index, task_depends)
2302 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2303 _update_refresh(new_status)
2304 else:
2305 if (
2306 ro_task["vim_info"]["refresh_at"]
2307 and now > ro_task["vim_info"]["refresh_at"]
2308 ):
2309 new_status, db_vim_info_update = self.item2class[
2310 task["item"]
2311 ].refresh(ro_task)
2312 _update_refresh(new_status)
2313 else:
2314 # The refresh is updated to avoid set the value of "refresh_at" to
2315 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2316 # because it can happen that in this case the task is never processed
2317 _update_refresh(task["status"])
2318
2319 except Exception as e:
2320 new_status = "FAILED"
2321 db_vim_info_update = {
2322 "vim_status": "VIM_ERROR",
2323 "vim_message": str(e),
2324 }
2325
2326 if not isinstance(
2327 e, (NsWorkerException, vimconn.VimConnException)
2328 ):
2329 self.logger.error(
2330 "Unexpected exception at _delete_task task={}: {}".format(
2331 task["task_id"], e
2332 ),
2333 exc_info=True,
2334 )
2335
2336 try:
2337 if db_vim_info_update:
2338 db_vim_update = db_vim_info_update.copy()
2339 db_ro_task_update.update(
2340 {
2341 "vim_info." + k: v
2342 for k, v in db_vim_info_update.items()
2343 }
2344 )
2345 ro_task["vim_info"].update(db_vim_info_update)
2346
2347 if new_status:
2348 if task_action == "CREATE":
2349 task_status_create = new_status
2350 db_ro_task_update[task_path] = new_status
2351
2352 if target_update or db_vim_update:
2353 if target_update == "DELETE":
2354 self._update_target(task, None)
2355 elif target_update == "COPY_VIM_INFO":
2356 self._update_target(task, ro_task["vim_info"])
2357 else:
2358 self._update_target(task, db_vim_update)
2359
2360 except Exception as e:
2361 if (
2362 isinstance(e, DbException)
2363 and e.http_code == HTTPStatus.NOT_FOUND
2364 ):
2365 # if the vnfrs or nsrs has been removed from database, this task must be removed
2366 self.logger.debug(
2367 "marking to delete task={}".format(task["task_id"])
2368 )
2369 self.tasks_to_delete.append(task)
2370 else:
2371 self.logger.error(
2372 "Unexpected exception at _update_target task={}: {}".format(
2373 task["task_id"], e
2374 ),
2375 exc_info=True,
2376 )
2377
2378 locked_at = ro_task["locked_at"]
2379
2380 if lock_object:
2381 locked_at = [
2382 lock_object["locked_at"],
2383 lock_object["locked_at"] + self.task_locked_time,
2384 ]
2385 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2386 # contain exactly locked_at + self.task_locked_time
2387 LockRenew.remove_lock_object(lock_object)
2388
2389 q_filter = {
2390 "_id": ro_task["_id"],
2391 "to_check_at": ro_task["to_check_at"],
2392 "locked_at": locked_at,
2393 }
2394 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2395 # outside this task (by ro_nbi) do not update it
2396 db_ro_task_update["locked_by"] = None
2397 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2398 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2399 db_ro_task_update["modified_at"] = now
2400 db_ro_task_update["to_check_at"] = next_check_at
2401
2402 """
2403 # Log RO tasks only when loglevel is DEBUG
2404 if self.logger.getEffectiveLevel() == logging.DEBUG:
2405 db_ro_task_update_log = db_ro_task_update.copy()
2406 db_ro_task_update_log["_id"] = q_filter["_id"]
2407 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2408 """
2409
2410 if not self.db.set_one(
2411 "ro_tasks",
2412 update_dict=db_ro_task_update,
2413 q_filter=q_filter,
2414 fail_on_empty=False,
2415 ):
2416 del db_ro_task_update["to_check_at"]
2417 del q_filter["to_check_at"]
2418 """
2419 # Log RO tasks only when loglevel is DEBUG
2420 if self.logger.getEffectiveLevel() == logging.DEBUG:
2421 self._log_ro_task(
2422 None,
2423 db_ro_task_update_log,
2424 None,
2425 "TASK_WF",
2426 "SET_TASK " + str(q_filter),
2427 )
2428 """
2429 self.db.set_one(
2430 "ro_tasks",
2431 q_filter=q_filter,
2432 update_dict=db_ro_task_update,
2433 fail_on_empty=True,
2434 )
2435 except DbException as e:
2436 self.logger.error(
2437 "ro_task={} Error updating database {}".format(ro_task_id, e)
2438 )
2439 except Exception as e:
2440 self.logger.error(
2441 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2442 )
2443
2444 def _update_target(self, task, ro_vim_item_update):
2445 table, _, temp = task["target_record"].partition(":")
2446 _id, _, path_vim_status = temp.partition(":")
2447 path_item = path_vim_status[: path_vim_status.rfind(".")]
2448 path_item = path_item[: path_item.rfind(".")]
2449 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2450 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2451
2452 if ro_vim_item_update:
2453 update_dict = {
2454 path_vim_status + "." + k: v
2455 for k, v in ro_vim_item_update.items()
2456 if k
2457 in (
2458 "vim_id",
2459 "vim_details",
2460 "vim_message",
2461 "vim_name",
2462 "vim_status",
2463 "interfaces",
2464 "interfaces_backup",
2465 )
2466 }
2467
2468 if path_vim_status.startswith("vdur."):
2469 # for backward compatibility, add vdur.name apart from vdur.vim_name
2470 if ro_vim_item_update.get("vim_name"):
2471 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2472
2473 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2474 if ro_vim_item_update.get("vim_id"):
2475 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2476
2477 # update general status
2478 if ro_vim_item_update.get("vim_status"):
2479 update_dict[path_item + ".status"] = ro_vim_item_update[
2480 "vim_status"
2481 ]
2482
2483 if ro_vim_item_update.get("interfaces"):
2484 path_interfaces = path_item + ".interfaces"
2485
2486 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2487 if iface:
2488 update_dict.update(
2489 {
2490 path_interfaces + ".{}.".format(i) + k: v
2491 for k, v in iface.items()
2492 if k in ("vlan", "compute_node", "pci")
2493 }
2494 )
2495
2496 # put ip_address and mac_address with ip-address and mac-address
2497 if iface.get("ip_address"):
2498 update_dict[
2499 path_interfaces + ".{}.".format(i) + "ip-address"
2500 ] = iface["ip_address"]
2501
2502 if iface.get("mac_address"):
2503 update_dict[
2504 path_interfaces + ".{}.".format(i) + "mac-address"
2505 ] = iface["mac_address"]
2506
2507 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2508 update_dict["ip-address"] = iface.get("ip_address").split(
2509 ";"
2510 )[0]
2511
2512 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2513 update_dict[path_item + ".ip-address"] = iface.get(
2514 "ip_address"
2515 ).split(";")[0]
2516
2517 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2518
2519 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2520 if ro_vim_item_update.get("interfaces"):
2521 search_key = path_vim_status + ".interfaces"
2522 if update_dict.get(search_key):
2523 interfaces_backup_update = {
2524 path_vim_status + ".interfaces_backup": update_dict[search_key]
2525 }
2526
2527 self.db.set_one(
2528 table,
2529 q_filter={"_id": _id},
2530 update_dict=interfaces_backup_update,
2531 )
2532
2533 else:
2534 update_dict = {path_item + ".status": "DELETED"}
2535 self.db.set_one(
2536 table,
2537 q_filter={"_id": _id},
2538 update_dict=update_dict,
2539 unset={path_vim_status: None},
2540 )
2541
2542 def _process_delete_db_tasks(self):
2543 """
2544 Delete task from database because vnfrs or nsrs or both have been deleted
2545 :return: None. Uses and modify self.tasks_to_delete
2546 """
2547 while self.tasks_to_delete:
2548 task = self.tasks_to_delete[0]
2549 vnfrs_deleted = None
2550 nsr_id = task["nsr_id"]
2551
2552 if task["target_record"].startswith("vnfrs:"):
2553 # check if nsrs is present
2554 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2555 vnfrs_deleted = task["target_record"].split(":")[1]
2556
2557 try:
2558 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2559 except Exception as e:
2560 self.logger.error(
2561 "Error deleting task={}: {}".format(task["task_id"], e)
2562 )
2563 self.tasks_to_delete.pop(0)
2564
2565 @staticmethod
2566 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2567 """
2568 Static method because it is called from osm_ng_ro.ns
2569 :param db: instance of database to use
2570 :param nsr_id: affected nsrs id
2571 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2572 :return: None, exception is fails
2573 """
2574 retries = 5
2575 for retry in range(retries):
2576 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2577 now = time.time()
2578 conflict = False
2579
2580 for ro_task in ro_tasks:
2581 db_update = {}
2582 to_delete_ro_task = True
2583
2584 for index, task in enumerate(ro_task["tasks"]):
2585 if not task:
2586 pass
2587 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2588 vnfrs_deleted
2589 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2590 ):
2591 db_update["tasks.{}".format(index)] = None
2592 else:
2593 # used by other nsr, ro_task cannot be deleted
2594 to_delete_ro_task = False
2595
2596 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2597 if to_delete_ro_task:
2598 if not db.del_one(
2599 "ro_tasks",
2600 q_filter={
2601 "_id": ro_task["_id"],
2602 "modified_at": ro_task["modified_at"],
2603 },
2604 fail_on_empty=False,
2605 ):
2606 conflict = True
2607 elif db_update:
2608 db_update["modified_at"] = now
2609 if not db.set_one(
2610 "ro_tasks",
2611 q_filter={
2612 "_id": ro_task["_id"],
2613 "modified_at": ro_task["modified_at"],
2614 },
2615 update_dict=db_update,
2616 fail_on_empty=False,
2617 ):
2618 conflict = True
2619 if not conflict:
2620 return
2621 else:
2622 raise NsWorkerException("Exceeded {} retries".format(retries))
2623
2624 def run(self):
2625 # load database
2626 self.logger.info("Starting")
2627 while True:
2628 # step 1: get commands from queue
2629 try:
2630 if self.vim_targets:
2631 task = self.task_queue.get(block=False)
2632 else:
2633 if not self.idle:
2634 self.logger.debug("enters in idle state")
2635 self.idle = True
2636 task = self.task_queue.get(block=True)
2637 self.idle = False
2638
2639 if task[0] == "terminate":
2640 break
2641 elif task[0] == "load_vim":
2642 self.logger.info("order to load vim {}".format(task[1]))
2643 self._load_vim(task[1])
2644 elif task[0] == "unload_vim":
2645 self.logger.info("order to unload vim {}".format(task[1]))
2646 self._unload_vim(task[1])
2647 elif task[0] == "reload_vim":
2648 self._reload_vim(task[1])
2649 elif task[0] == "check_vim":
2650 self.logger.info("order to check vim {}".format(task[1]))
2651 self._check_vim(task[1])
2652 continue
2653 except Exception as e:
2654 if isinstance(e, queue.Empty):
2655 pass
2656 else:
2657 self.logger.critical(
2658 "Error processing task: {}".format(e), exc_info=True
2659 )
2660
2661 # step 2: process pending_tasks, delete not needed tasks
2662 try:
2663 if self.tasks_to_delete:
2664 self._process_delete_db_tasks()
2665 busy = False
2666 """
2667 # Log RO tasks only when loglevel is DEBUG
2668 if self.logger.getEffectiveLevel() == logging.DEBUG:
2669 _ = self._get_db_all_tasks()
2670 """
2671 ro_task = self._get_db_task()
2672 if ro_task:
2673 self.logger.warning("Task to process: {}".format(ro_task))
2674 time.sleep(1)
2675 self._process_pending_tasks(ro_task)
2676 busy = True
2677 if not busy:
2678 time.sleep(5)
2679 except Exception as e:
2680 self.logger.critical(
2681 "Unexpected exception at run: " + str(e), exc_info=True
2682 )
2683
2684 self.logger.info("Finishing")