617f58bb8d70db21063ca8c7c068bb1022875198
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from unittest.mock import Mock
37
38 from importlib_metadata import entry_points
39 from osm_common.dbbase import DbException
40 from osm_ng_ro.vim_admin import LockRenew
41 from osm_ro_plugin import sdnconn, vimconn
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin.vim_dummy import VimDummyConnector
44 import yaml
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 "vim_message": None,
215 }
216 self.logger.debug(
217 "task={} {} new-net={} created={}".format(
218 task_id, ro_task["target_id"], vim_net_id, created
219 )
220 )
221
222 return "BUILD", ro_vim_item_update
223 except (vimconn.VimConnException, NsWorkerException) as e:
224 self.logger.error(
225 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
226 )
227 ro_vim_item_update = {
228 "vim_status": "VIM_ERROR",
229 "created": created,
230 "vim_message": str(e),
231 }
232
233 return "FAILED", ro_vim_item_update
234
235 def refresh(self, ro_task):
236 """Call VIM to get network status"""
237 ro_task_id = ro_task["_id"]
238 target_vim = self.my_vims[ro_task["target_id"]]
239 vim_id = ro_task["vim_info"]["vim_id"]
240 net_to_refresh_list = [vim_id]
241
242 try:
243 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
244 vim_info = vim_dict[vim_id]
245
246 if vim_info["status"] == "ACTIVE":
247 task_status = "DONE"
248 elif vim_info["status"] == "BUILD":
249 task_status = "BUILD"
250 else:
251 task_status = "FAILED"
252 except vimconn.VimConnException as e:
253 # Mark all tasks at VIM_ERROR status
254 self.logger.error(
255 "ro_task={} vim={} get-net={}: {}".format(
256 ro_task_id, ro_task["target_id"], vim_id, e
257 )
258 )
259 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
260 task_status = "FAILED"
261
262 ro_vim_item_update = {}
263 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
264 ro_vim_item_update["vim_status"] = vim_info["status"]
265
266 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
267 ro_vim_item_update["vim_name"] = vim_info.get("name")
268
269 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
270 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
271 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
272 elif vim_info["status"] == "DELETED":
273 ro_vim_item_update["vim_id"] = None
274 ro_vim_item_update["vim_message"] = "Deleted externally"
275 else:
276 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
277 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
278
279 if ro_vim_item_update:
280 self.logger.debug(
281 "ro_task={} {} get-net={}: status={} {}".format(
282 ro_task_id,
283 ro_task["target_id"],
284 vim_id,
285 ro_vim_item_update.get("vim_status"),
286 ro_vim_item_update.get("vim_message")
287 if ro_vim_item_update.get("vim_status") != "ACTIVE"
288 else "",
289 )
290 )
291
292 return task_status, ro_vim_item_update
293
294 def delete(self, ro_task, task_index):
295 task = ro_task["tasks"][task_index]
296 task_id = task["task_id"]
297 net_vim_id = ro_task["vim_info"]["vim_id"]
298 ro_vim_item_update_ok = {
299 "vim_status": "DELETED",
300 "created": False,
301 "vim_message": "DELETED",
302 "vim_id": None,
303 }
304
305 try:
306 if net_vim_id or ro_task["vim_info"]["created_items"]:
307 target_vim = self.my_vims[ro_task["target_id"]]
308 target_vim.delete_network(
309 net_vim_id, ro_task["vim_info"]["created_items"]
310 )
311 except vimconn.VimConnNotFoundException:
312 ro_vim_item_update_ok["vim_message"] = "already deleted"
313 except vimconn.VimConnException as e:
314 self.logger.error(
315 "ro_task={} vim={} del-net={}: {}".format(
316 ro_task["_id"], ro_task["target_id"], net_vim_id, e
317 )
318 )
319 ro_vim_item_update = {
320 "vim_status": "VIM_ERROR",
321 "vim_message": "Error while deleting: {}".format(e),
322 }
323
324 return "FAILED", ro_vim_item_update
325
326 self.logger.debug(
327 "task={} {} del-net={} {}".format(
328 task_id,
329 ro_task["target_id"],
330 net_vim_id,
331 ro_vim_item_update_ok.get("vim_message", ""),
332 )
333 )
334
335 return "DONE", ro_vim_item_update_ok
336
337
338 class VimInteractionVdu(VimInteractionBase):
339 max_retries_inject_ssh_key = 20 # 20 times
340 time_retries_inject_ssh_key = 30 # wevery 30 seconds
341
342 def new(self, ro_task, task_index, task_depends):
343 task = ro_task["tasks"][task_index]
344 task_id = task["task_id"]
345 created = False
346 created_items = {}
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390
391 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
392 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393
394 ro_vim_item_update = {
395 "vim_id": vim_vm_id,
396 "vim_status": "BUILD",
397 "created": created,
398 "created_items": created_items,
399 "vim_details": None,
400 "vim_message": None,
401 "interfaces_vim_ids": interfaces,
402 "interfaces": [],
403 }
404 self.logger.debug(
405 "task={} {} new-vm={} created={}".format(
406 task_id, ro_task["target_id"], vim_vm_id, created
407 )
408 )
409
410 return "BUILD", ro_vim_item_update
411 except (vimconn.VimConnException, NsWorkerException) as e:
412 self.logger.error(
413 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
414 )
415 ro_vim_item_update = {
416 "vim_status": "VIM_ERROR",
417 "created": created,
418 "vim_message": str(e),
419 }
420
421 return "FAILED", ro_vim_item_update
422
423 def delete(self, ro_task, task_index):
424 task = ro_task["tasks"][task_index]
425 task_id = task["task_id"]
426 vm_vim_id = ro_task["vim_info"]["vim_id"]
427 ro_vim_item_update_ok = {
428 "vim_status": "DELETED",
429 "created": False,
430 "vim_message": "DELETED",
431 "vim_id": None,
432 }
433
434 try:
435 self.logger.debug(
436 "delete_vminstance: vm_vim_id={} created_items={}".format(
437 vm_vim_id, ro_task["vim_info"]["created_items"]
438 )
439 )
440 if vm_vim_id or ro_task["vim_info"]["created_items"]:
441 target_vim = self.my_vims[ro_task["target_id"]]
442 target_vim.delete_vminstance(
443 vm_vim_id,
444 ro_task["vim_info"]["created_items"],
445 ro_task["vim_info"].get("volumes_to_hold", []),
446 )
447 except vimconn.VimConnNotFoundException:
448 ro_vim_item_update_ok["vim_message"] = "already deleted"
449 except vimconn.VimConnException as e:
450 self.logger.error(
451 "ro_task={} vim={} del-vm={}: {}".format(
452 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
453 )
454 )
455 ro_vim_item_update = {
456 "vim_status": "VIM_ERROR",
457 "vim_message": "Error while deleting: {}".format(e),
458 }
459
460 return "FAILED", ro_vim_item_update
461
462 self.logger.debug(
463 "task={} {} del-vm={} {}".format(
464 task_id,
465 ro_task["target_id"],
466 vm_vim_id,
467 ro_vim_item_update_ok.get("vim_message", ""),
468 )
469 )
470
471 return "DONE", ro_vim_item_update_ok
472
473 def refresh(self, ro_task):
474 """Call VIM to get vm status"""
475 ro_task_id = ro_task["_id"]
476 target_vim = self.my_vims[ro_task["target_id"]]
477 vim_id = ro_task["vim_info"]["vim_id"]
478
479 if not vim_id:
480 return None, None
481
482 vm_to_refresh_list = [vim_id]
483 try:
484 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
485 vim_info = vim_dict[vim_id]
486
487 if vim_info["status"] == "ACTIVE":
488 task_status = "DONE"
489 elif vim_info["status"] == "BUILD":
490 task_status = "BUILD"
491 else:
492 task_status = "FAILED"
493
494 # try to load and parse vim_information
495 try:
496 vim_info_info = yaml.safe_load(vim_info["vim_info"])
497 if vim_info_info.get("name"):
498 vim_info["name"] = vim_info_info["name"]
499 except Exception:
500 pass
501 except vimconn.VimConnException as e:
502 # Mark all tasks at VIM_ERROR status
503 self.logger.error(
504 "ro_task={} vim={} get-vm={}: {}".format(
505 ro_task_id, ro_task["target_id"], vim_id, e
506 )
507 )
508 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
509 task_status = "FAILED"
510
511 ro_vim_item_update = {}
512
513 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
514 vim_interfaces = []
515 if vim_info.get("interfaces"):
516 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
517 iface = next(
518 (
519 iface
520 for iface in vim_info["interfaces"]
521 if vim_iface_id == iface["vim_interface_id"]
522 ),
523 None,
524 )
525 # if iface:
526 # iface.pop("vim_info", None)
527 vim_interfaces.append(iface)
528
529 task_create = next(
530 t
531 for t in ro_task["tasks"]
532 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
533 )
534 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
535 vim_interfaces[task_create["mgmt_vnf_interface"]][
536 "mgmt_vnf_interface"
537 ] = True
538
539 mgmt_vdu_iface = task_create.get(
540 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
541 )
542 if vim_interfaces:
543 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
544
545 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
546 ro_vim_item_update["interfaces"] = vim_interfaces
547
548 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
549 ro_vim_item_update["vim_status"] = vim_info["status"]
550
551 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
552 ro_vim_item_update["vim_name"] = vim_info.get("name")
553
554 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
555 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
556 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
557 elif vim_info["status"] == "DELETED":
558 ro_vim_item_update["vim_id"] = None
559 ro_vim_item_update["vim_message"] = "Deleted externally"
560 else:
561 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
562 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
563
564 if ro_vim_item_update:
565 self.logger.debug(
566 "ro_task={} {} get-vm={}: status={} {}".format(
567 ro_task_id,
568 ro_task["target_id"],
569 vim_id,
570 ro_vim_item_update.get("vim_status"),
571 ro_vim_item_update.get("vim_message")
572 if ro_vim_item_update.get("vim_status") != "ACTIVE"
573 else "",
574 )
575 )
576
577 return task_status, ro_vim_item_update
578
579 def exec(self, ro_task, task_index, task_depends):
580 task = ro_task["tasks"][task_index]
581 task_id = task["task_id"]
582 target_vim = self.my_vims[ro_task["target_id"]]
583 db_task_update = {"retries": 0}
584 retries = task.get("retries", 0)
585
586 try:
587 params = task["params"]
588 params_copy = deepcopy(params)
589 params_copy["ro_key"] = self.db.decrypt(
590 params_copy.pop("private_key"),
591 params_copy.pop("schema_version"),
592 params_copy.pop("salt"),
593 )
594 params_copy["ip_addr"] = params_copy.pop("ip_address")
595 target_vim.inject_user_key(**params_copy)
596 self.logger.debug(
597 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
598 )
599
600 return (
601 "DONE",
602 None,
603 db_task_update,
604 ) # params_copy["key"]
605 except (vimconn.VimConnException, NsWorkerException) as e:
606 retries += 1
607
608 self.logger.debug(traceback.format_exc())
609 if retries < self.max_retries_inject_ssh_key:
610 return (
611 "BUILD",
612 None,
613 {
614 "retries": retries,
615 "next_retry": self.time_retries_inject_ssh_key,
616 },
617 )
618
619 self.logger.error(
620 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
621 )
622 ro_vim_item_update = {"vim_message": str(e)}
623
624 return "FAILED", ro_vim_item_update, db_task_update
625
626
627 class VimInteractionImage(VimInteractionBase):
628 def new(self, ro_task, task_index, task_depends):
629 task = ro_task["tasks"][task_index]
630 task_id = task["task_id"]
631 created = False
632 created_items = {}
633 target_vim = self.my_vims[ro_task["target_id"]]
634
635 try:
636 # FIND
637 if task.get("find_params"):
638 vim_images = target_vim.get_image_list(**task["find_params"])
639
640 if not vim_images:
641 raise NsWorkerExceptionNotFound(
642 "Image not found with this criteria: '{}'".format(
643 task["find_params"]
644 )
645 )
646 elif len(vim_images) > 1:
647 raise NsWorkerException(
648 "More than one image found with this criteria: '{}'".format(
649 task["find_params"]
650 )
651 )
652 else:
653 vim_image_id = vim_images[0]["id"]
654
655 ro_vim_item_update = {
656 "vim_id": vim_image_id,
657 "vim_status": "DONE",
658 "created": created,
659 "created_items": created_items,
660 "vim_details": None,
661 "vim_message": None,
662 }
663 self.logger.debug(
664 "task={} {} new-image={} created={}".format(
665 task_id, ro_task["target_id"], vim_image_id, created
666 )
667 )
668
669 return "DONE", ro_vim_item_update
670 except (NsWorkerException, vimconn.VimConnException) as e:
671 self.logger.error(
672 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
673 )
674 ro_vim_item_update = {
675 "vim_status": "VIM_ERROR",
676 "created": created,
677 "vim_message": str(e),
678 }
679
680 return "FAILED", ro_vim_item_update
681
682
683 class VimInteractionFlavor(VimInteractionBase):
684 def delete(self, ro_task, task_index):
685 task = ro_task["tasks"][task_index]
686 task_id = task["task_id"]
687 flavor_vim_id = ro_task["vim_info"]["vim_id"]
688 ro_vim_item_update_ok = {
689 "vim_status": "DELETED",
690 "created": False,
691 "vim_message": "DELETED",
692 "vim_id": None,
693 }
694
695 try:
696 if flavor_vim_id:
697 target_vim = self.my_vims[ro_task["target_id"]]
698 target_vim.delete_flavor(flavor_vim_id)
699 except vimconn.VimConnNotFoundException:
700 ro_vim_item_update_ok["vim_message"] = "already deleted"
701 except vimconn.VimConnException as e:
702 self.logger.error(
703 "ro_task={} vim={} del-flavor={}: {}".format(
704 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
705 )
706 )
707 ro_vim_item_update = {
708 "vim_status": "VIM_ERROR",
709 "vim_message": "Error while deleting: {}".format(e),
710 }
711
712 return "FAILED", ro_vim_item_update
713
714 self.logger.debug(
715 "task={} {} del-flavor={} {}".format(
716 task_id,
717 ro_task["target_id"],
718 flavor_vim_id,
719 ro_vim_item_update_ok.get("vim_message", ""),
720 )
721 )
722
723 return "DONE", ro_vim_item_update_ok
724
725 def new(self, ro_task, task_index, task_depends):
726 task = ro_task["tasks"][task_index]
727 task_id = task["task_id"]
728 created = False
729 created_items = {}
730 target_vim = self.my_vims[ro_task["target_id"]]
731
732 try:
733 # FIND
734 vim_flavor_id = None
735
736 if task.get("find_params"):
737 try:
738 flavor_data = task["find_params"]["flavor_data"]
739 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
740 except vimconn.VimConnNotFoundException:
741 pass
742
743 if not vim_flavor_id and task.get("params"):
744 # CREATE
745 flavor_data = task["params"]["flavor_data"]
746 vim_flavor_id = target_vim.new_flavor(flavor_data)
747 created = True
748
749 ro_vim_item_update = {
750 "vim_id": vim_flavor_id,
751 "vim_status": "DONE",
752 "created": created,
753 "created_items": created_items,
754 "vim_details": None,
755 "vim_message": None,
756 }
757 self.logger.debug(
758 "task={} {} new-flavor={} created={}".format(
759 task_id, ro_task["target_id"], vim_flavor_id, created
760 )
761 )
762
763 return "DONE", ro_vim_item_update
764 except (vimconn.VimConnException, NsWorkerException) as e:
765 self.logger.error(
766 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
767 )
768 ro_vim_item_update = {
769 "vim_status": "VIM_ERROR",
770 "created": created,
771 "vim_message": str(e),
772 }
773
774 return "FAILED", ro_vim_item_update
775
776
777 class VimInteractionAffinityGroup(VimInteractionBase):
778 def delete(self, ro_task, task_index):
779 task = ro_task["tasks"][task_index]
780 task_id = task["task_id"]
781 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
782 ro_vim_item_update_ok = {
783 "vim_status": "DELETED",
784 "created": False,
785 "vim_message": "DELETED",
786 "vim_id": None,
787 }
788
789 try:
790 if affinity_group_vim_id:
791 target_vim = self.my_vims[ro_task["target_id"]]
792 target_vim.delete_affinity_group(affinity_group_vim_id)
793 except vimconn.VimConnNotFoundException:
794 ro_vim_item_update_ok["vim_message"] = "already deleted"
795 except vimconn.VimConnException as e:
796 self.logger.error(
797 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
798 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
799 )
800 )
801 ro_vim_item_update = {
802 "vim_status": "VIM_ERROR",
803 "vim_message": "Error while deleting: {}".format(e),
804 }
805
806 return "FAILED", ro_vim_item_update
807
808 self.logger.debug(
809 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
810 task_id,
811 ro_task["target_id"],
812 affinity_group_vim_id,
813 ro_vim_item_update_ok.get("vim_message", ""),
814 )
815 )
816
817 return "DONE", ro_vim_item_update_ok
818
819 def new(self, ro_task, task_index, task_depends):
820 task = ro_task["tasks"][task_index]
821 task_id = task["task_id"]
822 created = False
823 created_items = {}
824 target_vim = self.my_vims[ro_task["target_id"]]
825
826 try:
827 affinity_group_vim_id = None
828 affinity_group_data = None
829
830 if task.get("params"):
831 affinity_group_data = task["params"].get("affinity_group_data")
832
833 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
834 try:
835 param_affinity_group_id = task["params"]["affinity_group_data"].get(
836 "vim-affinity-group-id"
837 )
838 affinity_group_vim_id = target_vim.get_affinity_group(
839 param_affinity_group_id
840 ).get("id")
841 except vimconn.VimConnNotFoundException:
842 self.logger.error(
843 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
844 "could not be found at VIM. Creating a new one.".format(
845 task_id, ro_task["target_id"], param_affinity_group_id
846 )
847 )
848
849 if not affinity_group_vim_id and affinity_group_data:
850 affinity_group_vim_id = target_vim.new_affinity_group(
851 affinity_group_data
852 )
853 created = True
854
855 ro_vim_item_update = {
856 "vim_id": affinity_group_vim_id,
857 "vim_status": "DONE",
858 "created": created,
859 "created_items": created_items,
860 "vim_details": None,
861 "vim_message": None,
862 }
863 self.logger.debug(
864 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
865 task_id, ro_task["target_id"], affinity_group_vim_id, created
866 )
867 )
868
869 return "DONE", ro_vim_item_update
870 except (vimconn.VimConnException, NsWorkerException) as e:
871 self.logger.error(
872 "task={} vim={} new-affinity-or-anti-affinity-group:"
873 " {}".format(task_id, ro_task["target_id"], e)
874 )
875 ro_vim_item_update = {
876 "vim_status": "VIM_ERROR",
877 "created": created,
878 "vim_message": str(e),
879 }
880
881 return "FAILED", ro_vim_item_update
882
883
884 class VimInteractionSdnNet(VimInteractionBase):
885 @staticmethod
886 def _match_pci(port_pci, mapping):
887 """
888 Check if port_pci matches with mapping
889 mapping can have brackets to indicate that several chars are accepted. e.g
890 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
891 :param port_pci: text
892 :param mapping: text, can contain brackets to indicate several chars are available
893 :return: True if matches, False otherwise
894 """
895 if not port_pci or not mapping:
896 return False
897 if port_pci == mapping:
898 return True
899
900 mapping_index = 0
901 pci_index = 0
902 while True:
903 bracket_start = mapping.find("[", mapping_index)
904
905 if bracket_start == -1:
906 break
907
908 bracket_end = mapping.find("]", bracket_start)
909 if bracket_end == -1:
910 break
911
912 length = bracket_start - mapping_index
913 if (
914 length
915 and port_pci[pci_index : pci_index + length]
916 != mapping[mapping_index:bracket_start]
917 ):
918 return False
919
920 if (
921 port_pci[pci_index + length]
922 not in mapping[bracket_start + 1 : bracket_end]
923 ):
924 return False
925
926 pci_index += length + 1
927 mapping_index = bracket_end + 1
928
929 if port_pci[pci_index:] != mapping[mapping_index:]:
930 return False
931
932 return True
933
934 def _get_interfaces(self, vlds_to_connect, vim_account_id):
935 """
936 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
937 :param vim_account_id:
938 :return:
939 """
940 interfaces = []
941
942 for vld in vlds_to_connect:
943 table, _, db_id = vld.partition(":")
944 db_id, _, vld = db_id.partition(":")
945 _, _, vld_id = vld.partition(".")
946
947 if table == "vnfrs":
948 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
949 iface_key = "vnf-vld-id"
950 else: # table == "nsrs"
951 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
952 iface_key = "ns-vld-id"
953
954 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
955
956 for db_vnfr in db_vnfrs:
957 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
958 for iface_index, interface in enumerate(vdur["interfaces"]):
959 if interface.get(iface_key) == vld_id and interface.get(
960 "type"
961 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
962 # only SR-IOV o PT
963 interface_ = interface.copy()
964 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
965 db_vnfr["_id"], vdu_index, iface_index
966 )
967
968 if vdur.get("status") == "ERROR":
969 interface_["status"] = "ERROR"
970
971 interfaces.append(interface_)
972
973 return interfaces
974
975 def refresh(self, ro_task):
976 # look for task create
977 task_create_index, _ = next(
978 i_t
979 for i_t in enumerate(ro_task["tasks"])
980 if i_t[1]
981 and i_t[1]["action"] == "CREATE"
982 and i_t[1]["status"] != "FINISHED"
983 )
984
985 return self.new(ro_task, task_create_index, None)
986
987 def new(self, ro_task, task_index, task_depends):
988
989 task = ro_task["tasks"][task_index]
990 task_id = task["task_id"]
991 target_vim = self.my_vims[ro_task["target_id"]]
992
993 sdn_net_id = ro_task["vim_info"]["vim_id"]
994
995 created_items = ro_task["vim_info"].get("created_items")
996 connected_ports = ro_task["vim_info"].get("connected_ports", [])
997 new_connected_ports = []
998 last_update = ro_task["vim_info"].get("last_update", 0)
999 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1000 error_list = []
1001 created = ro_task["vim_info"].get("created", False)
1002
1003 try:
1004 # CREATE
1005 params = task["params"]
1006 vlds_to_connect = params["vlds"]
1007 associated_vim = params["target_vim"]
1008 # external additional ports
1009 additional_ports = params.get("sdn-ports") or ()
1010 _, _, vim_account_id = associated_vim.partition(":")
1011
1012 if associated_vim:
1013 # get associated VIM
1014 if associated_vim not in self.db_vims:
1015 self.db_vims[associated_vim] = self.db.get_one(
1016 "vim_accounts", {"_id": vim_account_id}
1017 )
1018
1019 db_vim = self.db_vims[associated_vim]
1020
1021 # look for ports to connect
1022 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1023 # print(ports)
1024
1025 sdn_ports = []
1026 pending_ports = error_ports = 0
1027 vlan_used = None
1028 sdn_need_update = False
1029
1030 for port in ports:
1031 vlan_used = port.get("vlan") or vlan_used
1032
1033 # TODO. Do not connect if already done
1034 if not port.get("compute_node") or not port.get("pci"):
1035 if port.get("status") == "ERROR":
1036 error_ports += 1
1037 else:
1038 pending_ports += 1
1039 continue
1040
1041 pmap = None
1042 compute_node_mappings = next(
1043 (
1044 c
1045 for c in db_vim["config"].get("sdn-port-mapping", ())
1046 if c and c["compute_node"] == port["compute_node"]
1047 ),
1048 None,
1049 )
1050
1051 if compute_node_mappings:
1052 # process port_mapping pci of type 0000:af:1[01].[1357]
1053 pmap = next(
1054 (
1055 p
1056 for p in compute_node_mappings["ports"]
1057 if self._match_pci(port["pci"], p.get("pci"))
1058 ),
1059 None,
1060 )
1061
1062 if not pmap:
1063 if not db_vim["config"].get("mapping_not_needed"):
1064 error_list.append(
1065 "Port mapping not found for compute_node={} pci={}".format(
1066 port["compute_node"], port["pci"]
1067 )
1068 )
1069 continue
1070
1071 pmap = {}
1072
1073 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1074 new_port = {
1075 "service_endpoint_id": pmap.get("service_endpoint_id")
1076 or service_endpoint_id,
1077 "service_endpoint_encapsulation_type": "dot1q"
1078 if port["type"] == "SR-IOV"
1079 else None,
1080 "service_endpoint_encapsulation_info": {
1081 "vlan": port.get("vlan"),
1082 "mac": port.get("mac-address"),
1083 "device_id": pmap.get("device_id") or port["compute_node"],
1084 "device_interface_id": pmap.get("device_interface_id")
1085 or port["pci"],
1086 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1087 "switch_port": pmap.get("switch_port"),
1088 "service_mapping_info": pmap.get("service_mapping_info"),
1089 },
1090 }
1091
1092 # TODO
1093 # if port["modified_at"] > last_update:
1094 # sdn_need_update = True
1095 new_connected_ports.append(port["id"]) # TODO
1096 sdn_ports.append(new_port)
1097
1098 if error_ports:
1099 error_list.append(
1100 "{} interfaces have not been created as VDU is on ERROR status".format(
1101 error_ports
1102 )
1103 )
1104
1105 # connect external ports
1106 for index, additional_port in enumerate(additional_ports):
1107 additional_port_id = additional_port.get(
1108 "service_endpoint_id"
1109 ) or "external-{}".format(index)
1110 sdn_ports.append(
1111 {
1112 "service_endpoint_id": additional_port_id,
1113 "service_endpoint_encapsulation_type": additional_port.get(
1114 "service_endpoint_encapsulation_type", "dot1q"
1115 ),
1116 "service_endpoint_encapsulation_info": {
1117 "vlan": additional_port.get("vlan") or vlan_used,
1118 "mac": additional_port.get("mac_address"),
1119 "device_id": additional_port.get("device_id"),
1120 "device_interface_id": additional_port.get(
1121 "device_interface_id"
1122 ),
1123 "switch_dpid": additional_port.get("switch_dpid")
1124 or additional_port.get("switch_id"),
1125 "switch_port": additional_port.get("switch_port"),
1126 "service_mapping_info": additional_port.get(
1127 "service_mapping_info"
1128 ),
1129 },
1130 }
1131 )
1132 new_connected_ports.append(additional_port_id)
1133 sdn_info = ""
1134
1135 # if there are more ports to connect or they have been modified, call create/update
1136 if error_list:
1137 sdn_status = "ERROR"
1138 sdn_info = "; ".join(error_list)
1139 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1140 last_update = time.time()
1141
1142 if not sdn_net_id:
1143 if len(sdn_ports) < 2:
1144 sdn_status = "ACTIVE"
1145
1146 if not pending_ports:
1147 self.logger.debug(
1148 "task={} {} new-sdn-net done, less than 2 ports".format(
1149 task_id, ro_task["target_id"]
1150 )
1151 )
1152 else:
1153 net_type = params.get("type") or "ELAN"
1154 (
1155 sdn_net_id,
1156 created_items,
1157 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1158 created = True
1159 self.logger.debug(
1160 "task={} {} new-sdn-net={} created={}".format(
1161 task_id, ro_task["target_id"], sdn_net_id, created
1162 )
1163 )
1164 else:
1165 created_items = target_vim.edit_connectivity_service(
1166 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1167 )
1168 created = True
1169 self.logger.debug(
1170 "task={} {} update-sdn-net={} created={}".format(
1171 task_id, ro_task["target_id"], sdn_net_id, created
1172 )
1173 )
1174
1175 connected_ports = new_connected_ports
1176 elif sdn_net_id:
1177 wim_status_dict = target_vim.get_connectivity_service_status(
1178 sdn_net_id, conn_info=created_items
1179 )
1180 sdn_status = wim_status_dict["sdn_status"]
1181
1182 if wim_status_dict.get("sdn_info"):
1183 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1184
1185 if wim_status_dict.get("error_msg"):
1186 sdn_info = wim_status_dict.get("error_msg") or ""
1187
1188 if pending_ports:
1189 if sdn_status != "ERROR":
1190 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1191 len(ports) - pending_ports, len(ports)
1192 )
1193
1194 if sdn_status == "ACTIVE":
1195 sdn_status = "BUILD"
1196
1197 ro_vim_item_update = {
1198 "vim_id": sdn_net_id,
1199 "vim_status": sdn_status,
1200 "created": created,
1201 "created_items": created_items,
1202 "connected_ports": connected_ports,
1203 "vim_details": sdn_info,
1204 "vim_message": None,
1205 "last_update": last_update,
1206 }
1207
1208 return sdn_status, ro_vim_item_update
1209 except Exception as e:
1210 self.logger.error(
1211 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1212 exc_info=not isinstance(
1213 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1214 ),
1215 )
1216 ro_vim_item_update = {
1217 "vim_status": "VIM_ERROR",
1218 "created": created,
1219 "vim_message": str(e),
1220 }
1221
1222 return "FAILED", ro_vim_item_update
1223
1224 def delete(self, ro_task, task_index):
1225 task = ro_task["tasks"][task_index]
1226 task_id = task["task_id"]
1227 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1228 ro_vim_item_update_ok = {
1229 "vim_status": "DELETED",
1230 "created": False,
1231 "vim_message": "DELETED",
1232 "vim_id": None,
1233 }
1234
1235 try:
1236 if sdn_vim_id:
1237 target_vim = self.my_vims[ro_task["target_id"]]
1238 target_vim.delete_connectivity_service(
1239 sdn_vim_id, ro_task["vim_info"].get("created_items")
1240 )
1241
1242 except Exception as e:
1243 if (
1244 isinstance(e, sdnconn.SdnConnectorError)
1245 and e.http_code == HTTPStatus.NOT_FOUND.value
1246 ):
1247 ro_vim_item_update_ok["vim_message"] = "already deleted"
1248 else:
1249 self.logger.error(
1250 "ro_task={} vim={} del-sdn-net={}: {}".format(
1251 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1252 ),
1253 exc_info=not isinstance(
1254 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1255 ),
1256 )
1257 ro_vim_item_update = {
1258 "vim_status": "VIM_ERROR",
1259 "vim_message": "Error while deleting: {}".format(e),
1260 }
1261
1262 return "FAILED", ro_vim_item_update
1263
1264 self.logger.debug(
1265 "task={} {} del-sdn-net={} {}".format(
1266 task_id,
1267 ro_task["target_id"],
1268 sdn_vim_id,
1269 ro_vim_item_update_ok.get("vim_message", ""),
1270 )
1271 )
1272
1273 return "DONE", ro_vim_item_update_ok
1274
1275
1276 class VimInteractionMigration(VimInteractionBase):
1277 def exec(self, ro_task, task_index, task_depends):
1278 task = ro_task["tasks"][task_index]
1279 task_id = task["task_id"]
1280 db_task_update = {"retries": 0}
1281 target_vim = self.my_vims[ro_task["target_id"]]
1282 vim_interfaces = []
1283 created = False
1284 created_items = {}
1285 refreshed_vim_info = {}
1286
1287 try:
1288 if task.get("params"):
1289 vim_vm_id = task["params"].get("vim_vm_id")
1290 migrate_host = task["params"].get("migrate_host")
1291 _, migrated_compute_node = target_vim.migrate_instance(
1292 vim_vm_id, migrate_host
1293 )
1294
1295 if migrated_compute_node:
1296 # When VM is migrated, vdu["vim_info"] needs to be updated
1297 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1298 ro_task["target_id"]
1299 )
1300
1301 # Refresh VM to get new vim_info
1302 vm_to_refresh_list = [vim_vm_id]
1303 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1304 refreshed_vim_info = vim_dict[vim_vm_id]
1305
1306 if refreshed_vim_info.get("interfaces"):
1307 for old_iface in vdu_old_vim_info.get("interfaces"):
1308 iface = next(
1309 (
1310 iface
1311 for iface in refreshed_vim_info["interfaces"]
1312 if old_iface["vim_interface_id"]
1313 == iface["vim_interface_id"]
1314 ),
1315 None,
1316 )
1317 vim_interfaces.append(iface)
1318
1319 ro_vim_item_update = {
1320 "vim_id": vim_vm_id,
1321 "vim_status": "ACTIVE",
1322 "created": created,
1323 "created_items": created_items,
1324 "vim_details": None,
1325 "vim_message": None,
1326 }
1327
1328 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1329 "ERROR",
1330 "VIM_ERROR",
1331 ):
1332 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1333
1334 if vim_interfaces:
1335 ro_vim_item_update["interfaces"] = vim_interfaces
1336
1337 self.logger.debug(
1338 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1339 )
1340
1341 return "DONE", ro_vim_item_update, db_task_update
1342
1343 except (vimconn.VimConnException, NsWorkerException) as e:
1344 self.logger.error(
1345 "task={} vim={} VM Migration:"
1346 " {}".format(task_id, ro_task["target_id"], e)
1347 )
1348 ro_vim_item_update = {
1349 "vim_status": "VIM_ERROR",
1350 "created": created,
1351 "vim_message": str(e),
1352 }
1353
1354 return "FAILED", ro_vim_item_update, db_task_update
1355
1356
1357 class NsWorker(threading.Thread):
1358 REFRESH_BUILD = 5 # 5 seconds
1359 REFRESH_ACTIVE = 60 # 1 minute
1360 REFRESH_ERROR = 600
1361 REFRESH_IMAGE = 3600 * 10
1362 REFRESH_DELETE = 3600 * 10
1363 QUEUE_SIZE = 100
1364 terminate = False
1365
1366 def __init__(self, worker_index, config, plugins, db):
1367 """
1368
1369 :param worker_index: thread index
1370 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1371 :param plugins: global shared dict with the loaded plugins
1372 :param db: database class instance to use
1373 """
1374 threading.Thread.__init__(self)
1375 self.config = config
1376 self.plugins = plugins
1377 self.plugin_name = "unknown"
1378 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1379 self.worker_index = worker_index
1380 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1381 # targetvim: vimplugin class
1382 self.my_vims = {}
1383 # targetvim: vim information from database
1384 self.db_vims = {}
1385 # targetvim list
1386 self.vim_targets = []
1387 self.my_id = config["process_id"] + ":" + str(worker_index)
1388 self.db = db
1389 self.item2class = {
1390 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1391 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1392 "image": VimInteractionImage(
1393 self.db, self.my_vims, self.db_vims, self.logger
1394 ),
1395 "flavor": VimInteractionFlavor(
1396 self.db, self.my_vims, self.db_vims, self.logger
1397 ),
1398 "sdn_net": VimInteractionSdnNet(
1399 self.db, self.my_vims, self.db_vims, self.logger
1400 ),
1401 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1402 self.db, self.my_vims, self.db_vims, self.logger
1403 ),
1404 "migrate": VimInteractionMigration(
1405 self.db, self.my_vims, self.db_vims, self.logger
1406 ),
1407 }
1408 self.time_last_task_processed = None
1409 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1410 self.tasks_to_delete = []
1411 # it is idle when there are not vim_targets associated
1412 self.idle = True
1413 self.task_locked_time = config["global"]["task_locked_time"]
1414
1415 def insert_task(self, task):
1416 try:
1417 self.task_queue.put(task, False)
1418 return None
1419 except queue.Full:
1420 raise NsWorkerException("timeout inserting a task")
1421
1422 def terminate(self):
1423 self.insert_task("exit")
1424
1425 def del_task(self, task):
1426 with self.task_lock:
1427 if task["status"] == "SCHEDULED":
1428 task["status"] = "SUPERSEDED"
1429 return True
1430 else: # task["status"] == "processing"
1431 self.task_lock.release()
1432 return False
1433
1434 def _process_vim_config(self, target_id, db_vim):
1435 """
1436 Process vim config, creating vim configuration files as ca_cert
1437 :param target_id: vim/sdn/wim + id
1438 :param db_vim: Vim dictionary obtained from database
1439 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1440 """
1441 if not db_vim.get("config"):
1442 return
1443
1444 file_name = ""
1445
1446 try:
1447 if db_vim["config"].get("ca_cert_content"):
1448 file_name = "{}:{}".format(target_id, self.worker_index)
1449
1450 try:
1451 mkdir(file_name)
1452 except FileExistsError:
1453 pass
1454
1455 file_name = file_name + "/ca_cert"
1456
1457 with open(file_name, "w") as f:
1458 f.write(db_vim["config"]["ca_cert_content"])
1459 del db_vim["config"]["ca_cert_content"]
1460 db_vim["config"]["ca_cert"] = file_name
1461 except Exception as e:
1462 raise NsWorkerException(
1463 "Error writing to file '{}': {}".format(file_name, e)
1464 )
1465
1466 def _load_plugin(self, name, type="vim"):
1467 # type can be vim or sdn
1468 if "rovim_dummy" not in self.plugins:
1469 self.plugins["rovim_dummy"] = VimDummyConnector
1470
1471 if "rosdn_dummy" not in self.plugins:
1472 self.plugins["rosdn_dummy"] = SdnDummyConnector
1473
1474 if name in self.plugins:
1475 return self.plugins[name]
1476
1477 try:
1478 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1479 self.plugins[name] = ep.load()
1480 except Exception as e:
1481 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1482
1483 if name and name not in self.plugins:
1484 raise NsWorkerException(
1485 "Plugin 'osm_{n}' has not been installed".format(n=name)
1486 )
1487
1488 return self.plugins[name]
1489
1490 def _unload_vim(self, target_id):
1491 """
1492 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1493 :param target_id: Contains type:_id; where type can be 'vim', ...
1494 :return: None.
1495 """
1496 try:
1497 self.db_vims.pop(target_id, None)
1498 self.my_vims.pop(target_id, None)
1499
1500 if target_id in self.vim_targets:
1501 self.vim_targets.remove(target_id)
1502
1503 self.logger.info("Unloaded {}".format(target_id))
1504 rmtree("{}:{}".format(target_id, self.worker_index))
1505 except FileNotFoundError:
1506 pass # this is raised by rmtree if folder does not exist
1507 except Exception as e:
1508 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1509
1510 def _check_vim(self, target_id):
1511 """
1512 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1513 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1514 :return: None.
1515 """
1516 target, _, _id = target_id.partition(":")
1517 now = time.time()
1518 update_dict = {}
1519 unset_dict = {}
1520 op_text = ""
1521 step = ""
1522 loaded = target_id in self.vim_targets
1523 target_database = (
1524 "vim_accounts"
1525 if target == "vim"
1526 else "wim_accounts"
1527 if target == "wim"
1528 else "sdns"
1529 )
1530
1531 try:
1532 step = "Getting {} from db".format(target_id)
1533 db_vim = self.db.get_one(target_database, {"_id": _id})
1534
1535 for op_index, operation in enumerate(
1536 db_vim["_admin"].get("operations", ())
1537 ):
1538 if operation["operationState"] != "PROCESSING":
1539 continue
1540
1541 locked_at = operation.get("locked_at")
1542
1543 if locked_at is not None and locked_at >= now - self.task_locked_time:
1544 # some other thread is doing this operation
1545 return
1546
1547 # lock
1548 op_text = "_admin.operations.{}.".format(op_index)
1549
1550 if not self.db.set_one(
1551 target_database,
1552 q_filter={
1553 "_id": _id,
1554 op_text + "operationState": "PROCESSING",
1555 op_text + "locked_at": locked_at,
1556 },
1557 update_dict={
1558 op_text + "locked_at": now,
1559 "admin.current_operation": op_index,
1560 },
1561 fail_on_empty=False,
1562 ):
1563 return
1564
1565 unset_dict[op_text + "locked_at"] = None
1566 unset_dict["current_operation"] = None
1567 step = "Loading " + target_id
1568 error_text = self._load_vim(target_id)
1569
1570 if not error_text:
1571 step = "Checking connectivity"
1572
1573 if target == "vim":
1574 self.my_vims[target_id].check_vim_connectivity()
1575 else:
1576 self.my_vims[target_id].check_credentials()
1577
1578 update_dict["_admin.operationalState"] = "ENABLED"
1579 update_dict["_admin.detailed-status"] = ""
1580 unset_dict[op_text + "detailed-status"] = None
1581 update_dict[op_text + "operationState"] = "COMPLETED"
1582
1583 return
1584
1585 except Exception as e:
1586 error_text = "{}: {}".format(step, e)
1587 self.logger.error("{} for {}: {}".format(step, target_id, e))
1588
1589 finally:
1590 if update_dict or unset_dict:
1591 if error_text:
1592 update_dict[op_text + "operationState"] = "FAILED"
1593 update_dict[op_text + "detailed-status"] = error_text
1594 unset_dict.pop(op_text + "detailed-status", None)
1595 update_dict["_admin.operationalState"] = "ERROR"
1596 update_dict["_admin.detailed-status"] = error_text
1597
1598 if op_text:
1599 update_dict[op_text + "statusEnteredTime"] = now
1600
1601 self.db.set_one(
1602 target_database,
1603 q_filter={"_id": _id},
1604 update_dict=update_dict,
1605 unset=unset_dict,
1606 fail_on_empty=False,
1607 )
1608
1609 if not loaded:
1610 self._unload_vim(target_id)
1611
1612 def _reload_vim(self, target_id):
1613 if target_id in self.vim_targets:
1614 self._load_vim(target_id)
1615 else:
1616 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1617 # just remove it to force load again next time it is needed
1618 self.db_vims.pop(target_id, None)
1619
1620 def _load_vim(self, target_id):
1621 """
1622 Load or reload a vim_account, sdn_controller or wim_account.
1623 Read content from database, load the plugin if not loaded.
1624 In case of error loading the plugin, it load a failing VIM_connector
1625 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1626 :param target_id: Contains type:_id; where type can be 'vim', ...
1627 :return: None if ok, descriptive text if error
1628 """
1629 target, _, _id = target_id.partition(":")
1630 target_database = (
1631 "vim_accounts"
1632 if target == "vim"
1633 else "wim_accounts"
1634 if target == "wim"
1635 else "sdns"
1636 )
1637 plugin_name = ""
1638 vim = None
1639
1640 try:
1641 step = "Getting {}={} from db".format(target, _id)
1642 # TODO process for wim, sdnc, ...
1643 vim = self.db.get_one(target_database, {"_id": _id})
1644
1645 # if deep_get(vim, "config", "sdn-controller"):
1646 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1647 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1648
1649 step = "Decrypting password"
1650 schema_version = vim.get("schema_version")
1651 self.db.encrypt_decrypt_fields(
1652 vim,
1653 "decrypt",
1654 fields=("password", "secret"),
1655 schema_version=schema_version,
1656 salt=_id,
1657 )
1658 self._process_vim_config(target_id, vim)
1659
1660 if target == "vim":
1661 plugin_name = "rovim_" + vim["vim_type"]
1662 step = "Loading plugin '{}'".format(plugin_name)
1663 vim_module_conn = self._load_plugin(plugin_name)
1664 step = "Loading {}'".format(target_id)
1665 self.my_vims[target_id] = vim_module_conn(
1666 uuid=vim["_id"],
1667 name=vim["name"],
1668 tenant_id=vim.get("vim_tenant_id"),
1669 tenant_name=vim.get("vim_tenant_name"),
1670 url=vim["vim_url"],
1671 url_admin=None,
1672 user=vim["vim_user"],
1673 passwd=vim["vim_password"],
1674 config=vim.get("config") or {},
1675 persistent_info={},
1676 )
1677 else: # sdn
1678 plugin_name = "rosdn_" + vim["type"]
1679 step = "Loading plugin '{}'".format(plugin_name)
1680 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1681 step = "Loading {}'".format(target_id)
1682 wim = deepcopy(vim)
1683 wim_config = wim.pop("config", {}) or {}
1684 wim["uuid"] = wim["_id"]
1685 wim["wim_url"] = wim["url"]
1686
1687 if wim.get("dpid"):
1688 wim_config["dpid"] = wim.pop("dpid")
1689
1690 if wim.get("switch_id"):
1691 wim_config["switch_id"] = wim.pop("switch_id")
1692
1693 # wim, wim_account, config
1694 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1695 self.db_vims[target_id] = vim
1696 self.error_status = None
1697
1698 self.logger.info(
1699 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1700 )
1701 except Exception as e:
1702 self.logger.error(
1703 "Cannot load {} plugin={}: {} {}".format(
1704 target_id, plugin_name, step, e
1705 )
1706 )
1707
1708 self.db_vims[target_id] = vim or {}
1709 self.db_vims[target_id] = FailingConnector(str(e))
1710 error_status = "{} Error: {}".format(step, e)
1711
1712 return error_status
1713 finally:
1714 if target_id not in self.vim_targets:
1715 self.vim_targets.append(target_id)
1716
1717 def _get_db_task(self):
1718 """
1719 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1720 :return: None
1721 """
1722 now = time.time()
1723
1724 if not self.time_last_task_processed:
1725 self.time_last_task_processed = now
1726
1727 try:
1728 while True:
1729 """
1730 # Log RO tasks only when loglevel is DEBUG
1731 if self.logger.getEffectiveLevel() == logging.DEBUG:
1732 self._log_ro_task(
1733 None,
1734 None,
1735 None,
1736 "TASK_WF",
1737 "task_locked_time="
1738 + str(self.task_locked_time)
1739 + " "
1740 + "time_last_task_processed="
1741 + str(self.time_last_task_processed)
1742 + " "
1743 + "now="
1744 + str(now),
1745 )
1746 """
1747 locked = self.db.set_one(
1748 "ro_tasks",
1749 q_filter={
1750 "target_id": self.vim_targets,
1751 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1752 "locked_at.lt": now - self.task_locked_time,
1753 "to_check_at.lt": self.time_last_task_processed,
1754 },
1755 update_dict={"locked_by": self.my_id, "locked_at": now},
1756 fail_on_empty=False,
1757 )
1758
1759 if locked:
1760 # read and return
1761 ro_task = self.db.get_one(
1762 "ro_tasks",
1763 q_filter={
1764 "target_id": self.vim_targets,
1765 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1766 "locked_at": now,
1767 },
1768 )
1769 return ro_task
1770
1771 if self.time_last_task_processed == now:
1772 self.time_last_task_processed = None
1773 return None
1774 else:
1775 self.time_last_task_processed = now
1776 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1777
1778 except DbException as e:
1779 self.logger.error("Database exception at _get_db_task: {}".format(e))
1780 except Exception as e:
1781 self.logger.critical(
1782 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1783 )
1784
1785 return None
1786
1787 def _get_db_all_tasks(self):
1788 """
1789 Read all content of table ro_tasks to log it
1790 :return: None
1791 """
1792 try:
1793 # Checking the content of the BD:
1794
1795 # read and return
1796 ro_task = self.db.get_list("ro_tasks")
1797 for rt in ro_task:
1798 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1799 return ro_task
1800
1801 except DbException as e:
1802 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1803 except Exception as e:
1804 self.logger.critical(
1805 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1806 )
1807
1808 return None
1809
1810 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1811 """
1812 Generate a log with the following format:
1813
1814 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1815 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1816 task_array_index;task_id;task_action;task_item;task_args
1817
1818 Example:
1819
1820 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1821 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1822 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1823 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1824 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1825 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1826 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1827 """
1828 try:
1829 line = []
1830 i = 0
1831 if ro_task is not None and isinstance(ro_task, dict):
1832 for t in ro_task["tasks"]:
1833 line.clear()
1834 line.append(mark)
1835 line.append(event)
1836 line.append(ro_task.get("_id", ""))
1837 line.append(str(ro_task.get("locked_at", "")))
1838 line.append(str(ro_task.get("modified_at", "")))
1839 line.append(str(ro_task.get("created_at", "")))
1840 line.append(str(ro_task.get("to_check_at", "")))
1841 line.append(str(ro_task.get("locked_by", "")))
1842 line.append(str(ro_task.get("target_id", "")))
1843 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1844 line.append(str(ro_task.get("vim_info", "")))
1845 line.append(str(ro_task.get("tasks", "")))
1846 if isinstance(t, dict):
1847 line.append(str(t.get("status", "")))
1848 line.append(str(t.get("action_id", "")))
1849 line.append(str(i))
1850 line.append(str(t.get("task_id", "")))
1851 line.append(str(t.get("action", "")))
1852 line.append(str(t.get("item", "")))
1853 line.append(str(t.get("find_params", "")))
1854 line.append(str(t.get("params", "")))
1855 else:
1856 line.extend([""] * 2)
1857 line.append(str(i))
1858 line.extend([""] * 5)
1859
1860 i += 1
1861 self.logger.debug(";".join(line))
1862 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1863 i = 0
1864 while True:
1865 st = "tasks.{}.status".format(i)
1866 if st not in db_ro_task_update:
1867 break
1868 line.clear()
1869 line.append(mark)
1870 line.append(event)
1871 line.append(db_ro_task_update.get("_id", ""))
1872 line.append(str(db_ro_task_update.get("locked_at", "")))
1873 line.append(str(db_ro_task_update.get("modified_at", "")))
1874 line.append("")
1875 line.append(str(db_ro_task_update.get("to_check_at", "")))
1876 line.append(str(db_ro_task_update.get("locked_by", "")))
1877 line.append("")
1878 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1879 line.append("")
1880 line.append(str(db_ro_task_update.get("vim_info", "")))
1881 line.append(str(str(db_ro_task_update).count(".status")))
1882 line.append(db_ro_task_update.get(st, ""))
1883 line.append("")
1884 line.append(str(i))
1885 line.extend([""] * 3)
1886 i += 1
1887 self.logger.debug(";".join(line))
1888
1889 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1890 line.clear()
1891 line.append(mark)
1892 line.append(event)
1893 line.append(db_ro_task_delete.get("_id", ""))
1894 line.append("")
1895 line.append(db_ro_task_delete.get("modified_at", ""))
1896 line.extend([""] * 13)
1897 self.logger.debug(";".join(line))
1898
1899 else:
1900 line.clear()
1901 line.append(mark)
1902 line.append(event)
1903 line.extend([""] * 16)
1904 self.logger.debug(";".join(line))
1905
1906 except Exception as e:
1907 self.logger.error("Error logging ro_task: {}".format(e))
1908
1909 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1910 """
1911 Determine if this task need to be done or superseded
1912 :return: None
1913 """
1914 my_task = ro_task["tasks"][task_index]
1915 task_id = my_task["task_id"]
1916 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1917 "created_items", False
1918 )
1919
1920 self.logger.warning("Needed delete: {}".format(needed_delete))
1921 if my_task["status"] == "FAILED":
1922 return None, None # TODO need to be retry??
1923
1924 try:
1925 for index, task in enumerate(ro_task["tasks"]):
1926 if index == task_index or not task:
1927 continue # own task
1928
1929 if (
1930 my_task["target_record"] == task["target_record"]
1931 and task["action"] == "CREATE"
1932 ):
1933 # set to finished
1934 db_update["tasks.{}.status".format(index)] = task[
1935 "status"
1936 ] = "FINISHED"
1937 elif task["action"] == "CREATE" and task["status"] not in (
1938 "FINISHED",
1939 "SUPERSEDED",
1940 ):
1941 needed_delete = False
1942
1943 if needed_delete:
1944 self.logger.warning(
1945 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
1946 )
1947 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1948 else:
1949 return "SUPERSEDED", None
1950 except Exception as e:
1951 if not isinstance(e, NsWorkerException):
1952 self.logger.critical(
1953 "Unexpected exception at _delete_task task={}: {}".format(
1954 task_id, e
1955 ),
1956 exc_info=True,
1957 )
1958
1959 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
1960
1961 def _create_task(self, ro_task, task_index, task_depends, db_update):
1962 """
1963 Determine if this task need to create something at VIM
1964 :return: None
1965 """
1966 my_task = ro_task["tasks"][task_index]
1967 task_id = my_task["task_id"]
1968 task_status = None
1969
1970 if my_task["status"] == "FAILED":
1971 return None, None # TODO need to be retry??
1972 elif my_task["status"] == "SCHEDULED":
1973 # check if already created by another task
1974 for index, task in enumerate(ro_task["tasks"]):
1975 if index == task_index or not task:
1976 continue # own task
1977
1978 if task["action"] == "CREATE" and task["status"] not in (
1979 "SCHEDULED",
1980 "FINISHED",
1981 "SUPERSEDED",
1982 ):
1983 return task["status"], "COPY_VIM_INFO"
1984
1985 try:
1986 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1987 ro_task, task_index, task_depends
1988 )
1989 # TODO update other CREATE tasks
1990 except Exception as e:
1991 if not isinstance(e, NsWorkerException):
1992 self.logger.error(
1993 "Error executing task={}: {}".format(task_id, e), exc_info=True
1994 )
1995
1996 task_status = "FAILED"
1997 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
1998 # TODO update ro_vim_item_update
1999
2000 return task_status, ro_vim_item_update
2001 else:
2002 return None, None
2003
2004 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2005 """
2006 Look for dependency task
2007 :param task_id: Can be one of
2008 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2009 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2010 3. task.task_id: "<action_id>:number"
2011 :param ro_task:
2012 :param target_id:
2013 :return: database ro_task plus index of task
2014 """
2015 if (
2016 task_id.startswith("vim:")
2017 or task_id.startswith("sdn:")
2018 or task_id.startswith("wim:")
2019 ):
2020 target_id, _, task_id = task_id.partition(" ")
2021
2022 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2023 ro_task_dependency = self.db.get_one(
2024 "ro_tasks",
2025 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2026 fail_on_empty=False,
2027 )
2028
2029 if ro_task_dependency:
2030 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2031 if task["target_record_id"] == task_id:
2032 return ro_task_dependency, task_index
2033
2034 else:
2035 if ro_task:
2036 for task_index, task in enumerate(ro_task["tasks"]):
2037 if task and task["task_id"] == task_id:
2038 return ro_task, task_index
2039
2040 ro_task_dependency = self.db.get_one(
2041 "ro_tasks",
2042 q_filter={
2043 "tasks.ANYINDEX.task_id": task_id,
2044 "tasks.ANYINDEX.target_record.ne": None,
2045 },
2046 fail_on_empty=False,
2047 )
2048
2049 self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
2050 if ro_task_dependency:
2051 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2052 if task["task_id"] == task_id:
2053 return ro_task_dependency, task_index
2054 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2055
2056 def _process_pending_tasks(self, ro_task):
2057 ro_task_id = ro_task["_id"]
2058 now = time.time()
2059 # one day
2060 next_check_at = now + (24 * 60 * 60)
2061 db_ro_task_update = {}
2062
2063 def _update_refresh(new_status):
2064 # compute next_refresh
2065 nonlocal task
2066 nonlocal next_check_at
2067 nonlocal db_ro_task_update
2068 nonlocal ro_task
2069
2070 next_refresh = time.time()
2071
2072 if task["item"] in ("image", "flavor"):
2073 next_refresh += self.REFRESH_IMAGE
2074 elif new_status == "BUILD":
2075 next_refresh += self.REFRESH_BUILD
2076 elif new_status == "DONE":
2077 next_refresh += self.REFRESH_ACTIVE
2078 else:
2079 next_refresh += self.REFRESH_ERROR
2080
2081 next_check_at = min(next_check_at, next_refresh)
2082 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2083 ro_task["vim_info"]["refresh_at"] = next_refresh
2084
2085 try:
2086 """
2087 # Log RO tasks only when loglevel is DEBUG
2088 if self.logger.getEffectiveLevel() == logging.DEBUG:
2089 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2090 """
2091 # 0: get task_status_create
2092 lock_object = None
2093 task_status_create = None
2094 task_create = next(
2095 (
2096 t
2097 for t in ro_task["tasks"]
2098 if t
2099 and t["action"] == "CREATE"
2100 and t["status"] in ("BUILD", "DONE")
2101 ),
2102 None,
2103 )
2104
2105 if task_create:
2106 task_status_create = task_create["status"]
2107
2108 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2109 for task_action in ("DELETE", "CREATE", "EXEC"):
2110 db_vim_update = None
2111 new_status = None
2112
2113 for task_index, task in enumerate(ro_task["tasks"]):
2114 if not task:
2115 continue # task deleted
2116
2117 task_depends = {}
2118 target_update = None
2119
2120 if (
2121 (
2122 task_action in ("DELETE", "EXEC")
2123 and task["status"] not in ("SCHEDULED", "BUILD")
2124 )
2125 or task["action"] != task_action
2126 or (
2127 task_action == "CREATE"
2128 and task["status"] in ("FINISHED", "SUPERSEDED")
2129 )
2130 ):
2131 continue
2132
2133 task_path = "tasks.{}.status".format(task_index)
2134 try:
2135 db_vim_info_update = None
2136
2137 if task["status"] == "SCHEDULED":
2138 # check if tasks that this depends on have been completed
2139 dependency_not_completed = False
2140
2141 for dependency_task_id in task.get("depends_on") or ():
2142 (
2143 dependency_ro_task,
2144 dependency_task_index,
2145 ) = self._get_dependency(
2146 dependency_task_id, target_id=ro_task["target_id"]
2147 )
2148 dependency_task = dependency_ro_task["tasks"][
2149 dependency_task_index
2150 ]
2151 self.logger.warning(
2152 "dependency_ro_task={} dependency_task_index={}".format(
2153 dependency_ro_task, dependency_task_index
2154 )
2155 )
2156
2157 if dependency_task["status"] == "SCHEDULED":
2158 dependency_not_completed = True
2159 next_check_at = min(
2160 next_check_at, dependency_ro_task["to_check_at"]
2161 )
2162 # must allow dependent task to be processed first
2163 # to do this set time after last_task_processed
2164 next_check_at = max(
2165 self.time_last_task_processed, next_check_at
2166 )
2167 break
2168 elif dependency_task["status"] == "FAILED":
2169 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2170 task["action"],
2171 task["item"],
2172 dependency_task["action"],
2173 dependency_task["item"],
2174 dependency_task_id,
2175 dependency_ro_task["vim_info"].get(
2176 "vim_message"
2177 ),
2178 )
2179 self.logger.error(
2180 "task={} {}".format(task["task_id"], error_text)
2181 )
2182 raise NsWorkerException(error_text)
2183
2184 task_depends[dependency_task_id] = dependency_ro_task[
2185 "vim_info"
2186 ]["vim_id"]
2187 task_depends[
2188 "TASK-{}".format(dependency_task_id)
2189 ] = dependency_ro_task["vim_info"]["vim_id"]
2190
2191 if dependency_not_completed:
2192 self.logger.warning(
2193 "DEPENDENCY NOT COMPLETED {}".format(
2194 dependency_ro_task["vim_info"]["vim_id"]
2195 )
2196 )
2197 # TODO set at vim_info.vim_details that it is waiting
2198 continue
2199
2200 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2201 # the task of renew this locking. It will update database locket_at periodically
2202 if not lock_object:
2203 lock_object = LockRenew.add_lock_object(
2204 "ro_tasks", ro_task, self
2205 )
2206
2207 if task["action"] == "DELETE":
2208 (new_status, db_vim_info_update,) = self._delete_task(
2209 ro_task, task_index, task_depends, db_ro_task_update
2210 )
2211 new_status = (
2212 "FINISHED" if new_status == "DONE" else new_status
2213 )
2214 # ^with FINISHED instead of DONE it will not be refreshing
2215
2216 if new_status in ("FINISHED", "SUPERSEDED"):
2217 target_update = "DELETE"
2218 elif task["action"] == "EXEC":
2219 (
2220 new_status,
2221 db_vim_info_update,
2222 db_task_update,
2223 ) = self.item2class[task["item"]].exec(
2224 ro_task, task_index, task_depends
2225 )
2226 new_status = (
2227 "FINISHED" if new_status == "DONE" else new_status
2228 )
2229 # ^with FINISHED instead of DONE it will not be refreshing
2230
2231 if db_task_update:
2232 # load into database the modified db_task_update "retries" and "next_retry"
2233 if db_task_update.get("retries"):
2234 db_ro_task_update[
2235 "tasks.{}.retries".format(task_index)
2236 ] = db_task_update["retries"]
2237
2238 next_check_at = time.time() + db_task_update.get(
2239 "next_retry", 60
2240 )
2241 target_update = None
2242 elif task["action"] == "CREATE":
2243 if task["status"] == "SCHEDULED":
2244 if task_status_create:
2245 new_status = task_status_create
2246 target_update = "COPY_VIM_INFO"
2247 else:
2248 new_status, db_vim_info_update = self.item2class[
2249 task["item"]
2250 ].new(ro_task, task_index, task_depends)
2251 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2252 _update_refresh(new_status)
2253 else:
2254 if (
2255 ro_task["vim_info"]["refresh_at"]
2256 and now > ro_task["vim_info"]["refresh_at"]
2257 ):
2258 new_status, db_vim_info_update = self.item2class[
2259 task["item"]
2260 ].refresh(ro_task)
2261 _update_refresh(new_status)
2262 else:
2263 # The refresh is updated to avoid set the value of "refresh_at" to
2264 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2265 # because it can happen that in this case the task is never processed
2266 _update_refresh(task["status"])
2267
2268 except Exception as e:
2269 new_status = "FAILED"
2270 db_vim_info_update = {
2271 "vim_status": "VIM_ERROR",
2272 "vim_message": str(e),
2273 }
2274
2275 if not isinstance(
2276 e, (NsWorkerException, vimconn.VimConnException)
2277 ):
2278 self.logger.error(
2279 "Unexpected exception at _delete_task task={}: {}".format(
2280 task["task_id"], e
2281 ),
2282 exc_info=True,
2283 )
2284
2285 try:
2286 if db_vim_info_update:
2287 db_vim_update = db_vim_info_update.copy()
2288 db_ro_task_update.update(
2289 {
2290 "vim_info." + k: v
2291 for k, v in db_vim_info_update.items()
2292 }
2293 )
2294 ro_task["vim_info"].update(db_vim_info_update)
2295
2296 if new_status:
2297 if task_action == "CREATE":
2298 task_status_create = new_status
2299 db_ro_task_update[task_path] = new_status
2300
2301 if target_update or db_vim_update:
2302 if target_update == "DELETE":
2303 self._update_target(task, None)
2304 elif target_update == "COPY_VIM_INFO":
2305 self._update_target(task, ro_task["vim_info"])
2306 else:
2307 self._update_target(task, db_vim_update)
2308
2309 except Exception as e:
2310 if (
2311 isinstance(e, DbException)
2312 and e.http_code == HTTPStatus.NOT_FOUND
2313 ):
2314 # if the vnfrs or nsrs has been removed from database, this task must be removed
2315 self.logger.debug(
2316 "marking to delete task={}".format(task["task_id"])
2317 )
2318 self.tasks_to_delete.append(task)
2319 else:
2320 self.logger.error(
2321 "Unexpected exception at _update_target task={}: {}".format(
2322 task["task_id"], e
2323 ),
2324 exc_info=True,
2325 )
2326
2327 locked_at = ro_task["locked_at"]
2328
2329 if lock_object:
2330 locked_at = [
2331 lock_object["locked_at"],
2332 lock_object["locked_at"] + self.task_locked_time,
2333 ]
2334 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2335 # contain exactly locked_at + self.task_locked_time
2336 LockRenew.remove_lock_object(lock_object)
2337
2338 q_filter = {
2339 "_id": ro_task["_id"],
2340 "to_check_at": ro_task["to_check_at"],
2341 "locked_at": locked_at,
2342 }
2343 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2344 # outside this task (by ro_nbi) do not update it
2345 db_ro_task_update["locked_by"] = None
2346 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2347 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2348 db_ro_task_update["modified_at"] = now
2349 db_ro_task_update["to_check_at"] = next_check_at
2350
2351 """
2352 # Log RO tasks only when loglevel is DEBUG
2353 if self.logger.getEffectiveLevel() == logging.DEBUG:
2354 db_ro_task_update_log = db_ro_task_update.copy()
2355 db_ro_task_update_log["_id"] = q_filter["_id"]
2356 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2357 """
2358
2359 if not self.db.set_one(
2360 "ro_tasks",
2361 update_dict=db_ro_task_update,
2362 q_filter=q_filter,
2363 fail_on_empty=False,
2364 ):
2365 del db_ro_task_update["to_check_at"]
2366 del q_filter["to_check_at"]
2367 """
2368 # Log RO tasks only when loglevel is DEBUG
2369 if self.logger.getEffectiveLevel() == logging.DEBUG:
2370 self._log_ro_task(
2371 None,
2372 db_ro_task_update_log,
2373 None,
2374 "TASK_WF",
2375 "SET_TASK " + str(q_filter),
2376 )
2377 """
2378 self.db.set_one(
2379 "ro_tasks",
2380 q_filter=q_filter,
2381 update_dict=db_ro_task_update,
2382 fail_on_empty=True,
2383 )
2384 except DbException as e:
2385 self.logger.error(
2386 "ro_task={} Error updating database {}".format(ro_task_id, e)
2387 )
2388 except Exception as e:
2389 self.logger.error(
2390 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2391 )
2392
2393 def _update_target(self, task, ro_vim_item_update):
2394 table, _, temp = task["target_record"].partition(":")
2395 _id, _, path_vim_status = temp.partition(":")
2396 path_item = path_vim_status[: path_vim_status.rfind(".")]
2397 path_item = path_item[: path_item.rfind(".")]
2398 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2399 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2400
2401 if ro_vim_item_update:
2402 update_dict = {
2403 path_vim_status + "." + k: v
2404 for k, v in ro_vim_item_update.items()
2405 if k
2406 in (
2407 "vim_id",
2408 "vim_details",
2409 "vim_message",
2410 "vim_name",
2411 "vim_status",
2412 "interfaces",
2413 )
2414 }
2415
2416 if path_vim_status.startswith("vdur."):
2417 # for backward compatibility, add vdur.name apart from vdur.vim_name
2418 if ro_vim_item_update.get("vim_name"):
2419 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2420
2421 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2422 if ro_vim_item_update.get("vim_id"):
2423 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2424
2425 # update general status
2426 if ro_vim_item_update.get("vim_status"):
2427 update_dict[path_item + ".status"] = ro_vim_item_update[
2428 "vim_status"
2429 ]
2430
2431 if ro_vim_item_update.get("interfaces"):
2432 path_interfaces = path_item + ".interfaces"
2433
2434 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2435 if iface:
2436 update_dict.update(
2437 {
2438 path_interfaces + ".{}.".format(i) + k: v
2439 for k, v in iface.items()
2440 if k in ("vlan", "compute_node", "pci")
2441 }
2442 )
2443
2444 # put ip_address and mac_address with ip-address and mac-address
2445 if iface.get("ip_address"):
2446 update_dict[
2447 path_interfaces + ".{}.".format(i) + "ip-address"
2448 ] = iface["ip_address"]
2449
2450 if iface.get("mac_address"):
2451 update_dict[
2452 path_interfaces + ".{}.".format(i) + "mac-address"
2453 ] = iface["mac_address"]
2454
2455 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2456 update_dict["ip-address"] = iface.get("ip_address").split(
2457 ";"
2458 )[0]
2459
2460 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2461 update_dict[path_item + ".ip-address"] = iface.get(
2462 "ip_address"
2463 ).split(";")[0]
2464
2465 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2466 else:
2467 update_dict = {path_item + ".status": "DELETED"}
2468 self.db.set_one(
2469 table,
2470 q_filter={"_id": _id},
2471 update_dict=update_dict,
2472 unset={path_vim_status: None},
2473 )
2474
2475 def _process_delete_db_tasks(self):
2476 """
2477 Delete task from database because vnfrs or nsrs or both have been deleted
2478 :return: None. Uses and modify self.tasks_to_delete
2479 """
2480 while self.tasks_to_delete:
2481 task = self.tasks_to_delete[0]
2482 vnfrs_deleted = None
2483 nsr_id = task["nsr_id"]
2484
2485 if task["target_record"].startswith("vnfrs:"):
2486 # check if nsrs is present
2487 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2488 vnfrs_deleted = task["target_record"].split(":")[1]
2489
2490 try:
2491 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2492 except Exception as e:
2493 self.logger.error(
2494 "Error deleting task={}: {}".format(task["task_id"], e)
2495 )
2496 self.tasks_to_delete.pop(0)
2497
2498 @staticmethod
2499 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2500 """
2501 Static method because it is called from osm_ng_ro.ns
2502 :param db: instance of database to use
2503 :param nsr_id: affected nsrs id
2504 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2505 :return: None, exception is fails
2506 """
2507 retries = 5
2508 for retry in range(retries):
2509 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2510 now = time.time()
2511 conflict = False
2512
2513 for ro_task in ro_tasks:
2514 db_update = {}
2515 to_delete_ro_task = True
2516
2517 for index, task in enumerate(ro_task["tasks"]):
2518 if not task:
2519 pass
2520 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2521 vnfrs_deleted
2522 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2523 ):
2524 db_update["tasks.{}".format(index)] = None
2525 else:
2526 # used by other nsr, ro_task cannot be deleted
2527 to_delete_ro_task = False
2528
2529 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2530 if to_delete_ro_task:
2531 if not db.del_one(
2532 "ro_tasks",
2533 q_filter={
2534 "_id": ro_task["_id"],
2535 "modified_at": ro_task["modified_at"],
2536 },
2537 fail_on_empty=False,
2538 ):
2539 conflict = True
2540 elif db_update:
2541 db_update["modified_at"] = now
2542 if not db.set_one(
2543 "ro_tasks",
2544 q_filter={
2545 "_id": ro_task["_id"],
2546 "modified_at": ro_task["modified_at"],
2547 },
2548 update_dict=db_update,
2549 fail_on_empty=False,
2550 ):
2551 conflict = True
2552 if not conflict:
2553 return
2554 else:
2555 raise NsWorkerException("Exceeded {} retries".format(retries))
2556
2557 def run(self):
2558 # load database
2559 self.logger.info("Starting")
2560 while True:
2561 # step 1: get commands from queue
2562 try:
2563 if self.vim_targets:
2564 task = self.task_queue.get(block=False)
2565 else:
2566 if not self.idle:
2567 self.logger.debug("enters in idle state")
2568 self.idle = True
2569 task = self.task_queue.get(block=True)
2570 self.idle = False
2571
2572 if task[0] == "terminate":
2573 break
2574 elif task[0] == "load_vim":
2575 self.logger.info("order to load vim {}".format(task[1]))
2576 self._load_vim(task[1])
2577 elif task[0] == "unload_vim":
2578 self.logger.info("order to unload vim {}".format(task[1]))
2579 self._unload_vim(task[1])
2580 elif task[0] == "reload_vim":
2581 self._reload_vim(task[1])
2582 elif task[0] == "check_vim":
2583 self.logger.info("order to check vim {}".format(task[1]))
2584 self._check_vim(task[1])
2585 continue
2586 except Exception as e:
2587 if isinstance(e, queue.Empty):
2588 pass
2589 else:
2590 self.logger.critical(
2591 "Error processing task: {}".format(e), exc_info=True
2592 )
2593
2594 # step 2: process pending_tasks, delete not needed tasks
2595 try:
2596 if self.tasks_to_delete:
2597 self._process_delete_db_tasks()
2598 busy = False
2599 """
2600 # Log RO tasks only when loglevel is DEBUG
2601 if self.logger.getEffectiveLevel() == logging.DEBUG:
2602 _ = self._get_db_all_tasks()
2603 """
2604 ro_task = self._get_db_task()
2605 if ro_task:
2606 self.logger.warning("Task to process: {}".format(ro_task))
2607 time.sleep(1)
2608 self._process_pending_tasks(ro_task)
2609 busy = True
2610 if not busy:
2611 time.sleep(5)
2612 except Exception as e:
2613 self.logger.critical(
2614 "Unexpected exception at run: " + str(e), exc_info=True
2615 )
2616
2617 self.logger.info("Finishing")