Fix bug 1899 to select correct WIM connector class and prevent exceptions with missin...
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 }
215 self.logger.debug(
216 "task={} {} new-net={} created={}".format(
217 task_id, ro_task["target_id"], vim_net_id, created
218 )
219 )
220
221 return "BUILD", ro_vim_item_update
222 except (vimconn.VimConnException, NsWorkerException) as e:
223 self.logger.error(
224 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
225 )
226 ro_vim_item_update = {
227 "vim_status": "VIM_ERROR",
228 "created": created,
229 "vim_details": str(e),
230 }
231
232 return "FAILED", ro_vim_item_update
233
234 def refresh(self, ro_task):
235 """Call VIM to get network status"""
236 ro_task_id = ro_task["_id"]
237 target_vim = self.my_vims[ro_task["target_id"]]
238 vim_id = ro_task["vim_info"]["vim_id"]
239 net_to_refresh_list = [vim_id]
240
241 try:
242 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
243 vim_info = vim_dict[vim_id]
244
245 if vim_info["status"] == "ACTIVE":
246 task_status = "DONE"
247 elif vim_info["status"] == "BUILD":
248 task_status = "BUILD"
249 else:
250 task_status = "FAILED"
251 except vimconn.VimConnException as e:
252 # Mark all tasks at VIM_ERROR status
253 self.logger.error(
254 "ro_task={} vim={} get-net={}: {}".format(
255 ro_task_id, ro_task["target_id"], vim_id, e
256 )
257 )
258 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
259 task_status = "FAILED"
260
261 ro_vim_item_update = {}
262 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
263 ro_vim_item_update["vim_status"] = vim_info["status"]
264
265 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
266 ro_vim_item_update["vim_name"] = vim_info.get("name")
267
268 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
269 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
270 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
271 elif vim_info["status"] == "DELETED":
272 ro_vim_item_update["vim_id"] = None
273 ro_vim_item_update["vim_details"] = "Deleted externally"
274 else:
275 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
276 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
277
278 if ro_vim_item_update:
279 self.logger.debug(
280 "ro_task={} {} get-net={}: status={} {}".format(
281 ro_task_id,
282 ro_task["target_id"],
283 vim_id,
284 ro_vim_item_update.get("vim_status"),
285 ro_vim_item_update.get("vim_details")
286 if ro_vim_item_update.get("vim_status") != "ACTIVE"
287 else "",
288 )
289 )
290
291 return task_status, ro_vim_item_update
292
293 def delete(self, ro_task, task_index):
294 task = ro_task["tasks"][task_index]
295 task_id = task["task_id"]
296 net_vim_id = ro_task["vim_info"]["vim_id"]
297 ro_vim_item_update_ok = {
298 "vim_status": "DELETED",
299 "created": False,
300 "vim_details": "DELETED",
301 "vim_id": None,
302 }
303
304 try:
305 if net_vim_id or ro_task["vim_info"]["created_items"]:
306 target_vim = self.my_vims[ro_task["target_id"]]
307 target_vim.delete_network(
308 net_vim_id, ro_task["vim_info"]["created_items"]
309 )
310 except vimconn.VimConnNotFoundException:
311 ro_vim_item_update_ok["vim_details"] = "already deleted"
312 except vimconn.VimConnException as e:
313 self.logger.error(
314 "ro_task={} vim={} del-net={}: {}".format(
315 ro_task["_id"], ro_task["target_id"], net_vim_id, e
316 )
317 )
318 ro_vim_item_update = {
319 "vim_status": "VIM_ERROR",
320 "vim_details": "Error while deleting: {}".format(e),
321 }
322
323 return "FAILED", ro_vim_item_update
324
325 self.logger.debug(
326 "task={} {} del-net={} {}".format(
327 task_id,
328 ro_task["target_id"],
329 net_vim_id,
330 ro_vim_item_update_ok.get("vim_details", ""),
331 )
332 )
333
334 return "DONE", ro_vim_item_update_ok
335
336
337 class VimInteractionVdu(VimInteractionBase):
338 max_retries_inject_ssh_key = 20 # 20 times
339 time_retries_inject_ssh_key = 30 # wevery 30 seconds
340
341 def new(self, ro_task, task_index, task_depends):
342 task = ro_task["tasks"][task_index]
343 task_id = task["task_id"]
344 created = False
345 created_items = {}
346 target_vim = self.my_vims[ro_task["target_id"]]
347
348 try:
349 created = True
350 params = task["params"]
351 params_copy = deepcopy(params)
352 net_list = params_copy["net_list"]
353
354 for net in net_list:
355 # change task_id into network_id
356 if "net_id" in net and net["net_id"].startswith("TASK-"):
357 network_id = task_depends[net["net_id"]]
358
359 if not network_id:
360 raise NsWorkerException(
361 "Cannot create VM because depends on a network not created or found "
362 "for {}".format(net["net_id"])
363 )
364
365 net["net_id"] = network_id
366
367 if params_copy["image_id"].startswith("TASK-"):
368 params_copy["image_id"] = task_depends[params_copy["image_id"]]
369
370 if params_copy["flavor_id"].startswith("TASK-"):
371 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
372
373 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
374 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
375
376 ro_vim_item_update = {
377 "vim_id": vim_vm_id,
378 "vim_status": "BUILD",
379 "created": created,
380 "created_items": created_items,
381 "vim_details": None,
382 "interfaces_vim_ids": interfaces,
383 "interfaces": [],
384 }
385 self.logger.debug(
386 "task={} {} new-vm={} created={}".format(
387 task_id, ro_task["target_id"], vim_vm_id, created
388 )
389 )
390
391 return "BUILD", ro_vim_item_update
392 except (vimconn.VimConnException, NsWorkerException) as e:
393 self.logger.error(
394 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
395 )
396 ro_vim_item_update = {
397 "vim_status": "VIM_ERROR",
398 "created": created,
399 "vim_details": str(e),
400 }
401
402 return "FAILED", ro_vim_item_update
403
404 def delete(self, ro_task, task_index):
405 task = ro_task["tasks"][task_index]
406 task_id = task["task_id"]
407 vm_vim_id = ro_task["vim_info"]["vim_id"]
408 ro_vim_item_update_ok = {
409 "vim_status": "DELETED",
410 "created": False,
411 "vim_details": "DELETED",
412 "vim_id": None,
413 }
414
415 try:
416 if vm_vim_id or ro_task["vim_info"]["created_items"]:
417 target_vim = self.my_vims[ro_task["target_id"]]
418 target_vim.delete_vminstance(
419 vm_vim_id, ro_task["vim_info"]["created_items"]
420 )
421 except vimconn.VimConnNotFoundException:
422 ro_vim_item_update_ok["vim_details"] = "already deleted"
423 except vimconn.VimConnException as e:
424 self.logger.error(
425 "ro_task={} vim={} del-vm={}: {}".format(
426 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
427 )
428 )
429 ro_vim_item_update = {
430 "vim_status": "VIM_ERROR",
431 "vim_details": "Error while deleting: {}".format(e),
432 }
433
434 return "FAILED", ro_vim_item_update
435
436 self.logger.debug(
437 "task={} {} del-vm={} {}".format(
438 task_id,
439 ro_task["target_id"],
440 vm_vim_id,
441 ro_vim_item_update_ok.get("vim_details", ""),
442 )
443 )
444
445 return "DONE", ro_vim_item_update_ok
446
447 def refresh(self, ro_task):
448 """Call VIM to get vm status"""
449 ro_task_id = ro_task["_id"]
450 target_vim = self.my_vims[ro_task["target_id"]]
451 vim_id = ro_task["vim_info"]["vim_id"]
452
453 if not vim_id:
454 return None, None
455
456 vm_to_refresh_list = [vim_id]
457 try:
458 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
459 vim_info = vim_dict[vim_id]
460
461 if vim_info["status"] == "ACTIVE":
462 task_status = "DONE"
463 elif vim_info["status"] == "BUILD":
464 task_status = "BUILD"
465 else:
466 task_status = "FAILED"
467
468 # try to load and parse vim_information
469 try:
470 vim_info_info = yaml.safe_load(vim_info["vim_info"])
471 if vim_info_info.get("name"):
472 vim_info["name"] = vim_info_info["name"]
473 except Exception:
474 pass
475 except vimconn.VimConnException as e:
476 # Mark all tasks at VIM_ERROR status
477 self.logger.error(
478 "ro_task={} vim={} get-vm={}: {}".format(
479 ro_task_id, ro_task["target_id"], vim_id, e
480 )
481 )
482 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
483 task_status = "FAILED"
484
485 ro_vim_item_update = {}
486
487 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
488 vim_interfaces = []
489 if vim_info.get("interfaces"):
490 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
491 iface = next(
492 (
493 iface
494 for iface in vim_info["interfaces"]
495 if vim_iface_id == iface["vim_interface_id"]
496 ),
497 None,
498 )
499 # if iface:
500 # iface.pop("vim_info", None)
501 vim_interfaces.append(iface)
502
503 task_create = next(
504 t
505 for t in ro_task["tasks"]
506 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
507 )
508 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
509 vim_interfaces[task_create["mgmt_vnf_interface"]][
510 "mgmt_vnf_interface"
511 ] = True
512
513 mgmt_vdu_iface = task_create.get(
514 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
515 )
516 if vim_interfaces:
517 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
518
519 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
520 ro_vim_item_update["interfaces"] = vim_interfaces
521
522 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
523 ro_vim_item_update["vim_status"] = vim_info["status"]
524
525 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
526 ro_vim_item_update["vim_name"] = vim_info.get("name")
527
528 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
529 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
530 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
531 elif vim_info["status"] == "DELETED":
532 ro_vim_item_update["vim_id"] = None
533 ro_vim_item_update["vim_details"] = "Deleted externally"
534 else:
535 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
536 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
537
538 if ro_vim_item_update:
539 self.logger.debug(
540 "ro_task={} {} get-vm={}: status={} {}".format(
541 ro_task_id,
542 ro_task["target_id"],
543 vim_id,
544 ro_vim_item_update.get("vim_status"),
545 ro_vim_item_update.get("vim_details")
546 if ro_vim_item_update.get("vim_status") != "ACTIVE"
547 else "",
548 )
549 )
550
551 return task_status, ro_vim_item_update
552
553 def exec(self, ro_task, task_index, task_depends):
554 task = ro_task["tasks"][task_index]
555 task_id = task["task_id"]
556 target_vim = self.my_vims[ro_task["target_id"]]
557 db_task_update = {"retries": 0}
558 retries = task.get("retries", 0)
559
560 try:
561 params = task["params"]
562 params_copy = deepcopy(params)
563 params_copy["ro_key"] = self.db.decrypt(
564 params_copy.pop("private_key"),
565 params_copy.pop("schema_version"),
566 params_copy.pop("salt"),
567 )
568 params_copy["ip_addr"] = params_copy.pop("ip_address")
569 target_vim.inject_user_key(**params_copy)
570 self.logger.debug(
571 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
572 )
573
574 return (
575 "DONE",
576 None,
577 db_task_update,
578 ) # params_copy["key"]
579 except (vimconn.VimConnException, NsWorkerException) as e:
580 retries += 1
581
582 if retries < self.max_retries_inject_ssh_key:
583 return (
584 "BUILD",
585 None,
586 {
587 "retries": retries,
588 "next_retry": self.time_retries_inject_ssh_key,
589 },
590 )
591
592 self.logger.error(
593 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
594 )
595 ro_vim_item_update = {"vim_details": str(e)}
596
597 return "FAILED", ro_vim_item_update, db_task_update
598
599
600 class VimInteractionImage(VimInteractionBase):
601 def new(self, ro_task, task_index, task_depends):
602 task = ro_task["tasks"][task_index]
603 task_id = task["task_id"]
604 created = False
605 created_items = {}
606 target_vim = self.my_vims[ro_task["target_id"]]
607
608 try:
609 # FIND
610 if task.get("find_params"):
611 vim_images = target_vim.get_image_list(**task["find_params"])
612
613 if not vim_images:
614 raise NsWorkerExceptionNotFound(
615 "Image not found with this criteria: '{}'".format(
616 task["find_params"]
617 )
618 )
619 elif len(vim_images) > 1:
620 raise NsWorkerException(
621 "More than one image found with this criteria: '{}'".format(
622 task["find_params"]
623 )
624 )
625 else:
626 vim_image_id = vim_images[0]["id"]
627
628 ro_vim_item_update = {
629 "vim_id": vim_image_id,
630 "vim_status": "DONE",
631 "created": created,
632 "created_items": created_items,
633 "vim_details": None,
634 }
635 self.logger.debug(
636 "task={} {} new-image={} created={}".format(
637 task_id, ro_task["target_id"], vim_image_id, created
638 )
639 )
640
641 return "DONE", ro_vim_item_update
642 except (NsWorkerException, vimconn.VimConnException) as e:
643 self.logger.error(
644 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
645 )
646 ro_vim_item_update = {
647 "vim_status": "VIM_ERROR",
648 "created": created,
649 "vim_details": str(e),
650 }
651
652 return "FAILED", ro_vim_item_update
653
654
655 class VimInteractionFlavor(VimInteractionBase):
656 def delete(self, ro_task, task_index):
657 task = ro_task["tasks"][task_index]
658 task_id = task["task_id"]
659 flavor_vim_id = ro_task["vim_info"]["vim_id"]
660 ro_vim_item_update_ok = {
661 "vim_status": "DELETED",
662 "created": False,
663 "vim_details": "DELETED",
664 "vim_id": None,
665 }
666
667 try:
668 if flavor_vim_id:
669 target_vim = self.my_vims[ro_task["target_id"]]
670 target_vim.delete_flavor(flavor_vim_id)
671 except vimconn.VimConnNotFoundException:
672 ro_vim_item_update_ok["vim_details"] = "already deleted"
673 except vimconn.VimConnException as e:
674 self.logger.error(
675 "ro_task={} vim={} del-flavor={}: {}".format(
676 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
677 )
678 )
679 ro_vim_item_update = {
680 "vim_status": "VIM_ERROR",
681 "vim_details": "Error while deleting: {}".format(e),
682 }
683
684 return "FAILED", ro_vim_item_update
685
686 self.logger.debug(
687 "task={} {} del-flavor={} {}".format(
688 task_id,
689 ro_task["target_id"],
690 flavor_vim_id,
691 ro_vim_item_update_ok.get("vim_details", ""),
692 )
693 )
694
695 return "DONE", ro_vim_item_update_ok
696
697 def new(self, ro_task, task_index, task_depends):
698 task = ro_task["tasks"][task_index]
699 task_id = task["task_id"]
700 created = False
701 created_items = {}
702 target_vim = self.my_vims[ro_task["target_id"]]
703
704 try:
705 # FIND
706 vim_flavor_id = None
707
708 if task.get("find_params"):
709 try:
710 flavor_data = task["find_params"]["flavor_data"]
711 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
712 except vimconn.VimConnNotFoundException:
713 pass
714
715 if not vim_flavor_id and task.get("params"):
716 # CREATE
717 flavor_data = task["params"]["flavor_data"]
718 vim_flavor_id = target_vim.new_flavor(flavor_data)
719 created = True
720
721 ro_vim_item_update = {
722 "vim_id": vim_flavor_id,
723 "vim_status": "DONE",
724 "created": created,
725 "created_items": created_items,
726 "vim_details": None,
727 }
728 self.logger.debug(
729 "task={} {} new-flavor={} created={}".format(
730 task_id, ro_task["target_id"], vim_flavor_id, created
731 )
732 )
733
734 return "DONE", ro_vim_item_update
735 except (vimconn.VimConnException, NsWorkerException) as e:
736 self.logger.error(
737 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
738 )
739 ro_vim_item_update = {
740 "vim_status": "VIM_ERROR",
741 "created": created,
742 "vim_details": str(e),
743 }
744
745 return "FAILED", ro_vim_item_update
746
747
748 class VimInteractionSdnNet(VimInteractionBase):
749 @staticmethod
750 def _match_pci(port_pci, mapping):
751 """
752 Check if port_pci matches with mapping
753 mapping can have brackets to indicate that several chars are accepted. e.g
754 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
755 :param port_pci: text
756 :param mapping: text, can contain brackets to indicate several chars are available
757 :return: True if matches, False otherwise
758 """
759 if not port_pci or not mapping:
760 return False
761 if port_pci == mapping:
762 return True
763
764 mapping_index = 0
765 pci_index = 0
766 while True:
767 bracket_start = mapping.find("[", mapping_index)
768
769 if bracket_start == -1:
770 break
771
772 bracket_end = mapping.find("]", bracket_start)
773 if bracket_end == -1:
774 break
775
776 length = bracket_start - mapping_index
777 if (
778 length
779 and port_pci[pci_index : pci_index + length]
780 != mapping[mapping_index:bracket_start]
781 ):
782 return False
783
784 if (
785 port_pci[pci_index + length]
786 not in mapping[bracket_start + 1 : bracket_end]
787 ):
788 return False
789
790 pci_index += length + 1
791 mapping_index = bracket_end + 1
792
793 if port_pci[pci_index:] != mapping[mapping_index:]:
794 return False
795
796 return True
797
798 def _get_interfaces(self, vlds_to_connect, vim_account_id):
799 """
800 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
801 :param vim_account_id:
802 :return:
803 """
804 interfaces = []
805
806 for vld in vlds_to_connect:
807 table, _, db_id = vld.partition(":")
808 db_id, _, vld = db_id.partition(":")
809 _, _, vld_id = vld.partition(".")
810
811 if table == "vnfrs":
812 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
813 iface_key = "vnf-vld-id"
814 else: # table == "nsrs"
815 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
816 iface_key = "ns-vld-id"
817
818 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
819
820 for db_vnfr in db_vnfrs:
821 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
822 for iface_index, interface in enumerate(vdur["interfaces"]):
823 if interface.get(iface_key) == vld_id and interface.get(
824 "type"
825 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
826 # only SR-IOV o PT
827 interface_ = interface.copy()
828 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
829 db_vnfr["_id"], vdu_index, iface_index
830 )
831
832 if vdur.get("status") == "ERROR":
833 interface_["status"] = "ERROR"
834
835 interfaces.append(interface_)
836
837 return interfaces
838
839 def refresh(self, ro_task):
840 # look for task create
841 task_create_index, _ = next(
842 i_t
843 for i_t in enumerate(ro_task["tasks"])
844 if i_t[1]
845 and i_t[1]["action"] == "CREATE"
846 and i_t[1]["status"] != "FINISHED"
847 )
848
849 return self.new(ro_task, task_create_index, None)
850
851 def new(self, ro_task, task_index, task_depends):
852
853 task = ro_task["tasks"][task_index]
854 task_id = task["task_id"]
855 target_vim = self.my_vims[ro_task["target_id"]]
856
857 sdn_net_id = ro_task["vim_info"]["vim_id"]
858
859 created_items = ro_task["vim_info"].get("created_items")
860 connected_ports = ro_task["vim_info"].get("connected_ports", [])
861 new_connected_ports = []
862 last_update = ro_task["vim_info"].get("last_update", 0)
863 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
864 error_list = []
865 created = ro_task["vim_info"].get("created", False)
866
867 try:
868 # CREATE
869 params = task["params"]
870 vlds_to_connect = params.get("vlds", [])
871 associated_vim = params.get("target_vim")
872 # external additional ports
873 additional_ports = params.get("sdn-ports") or ()
874 _, _, vim_account_id = (
875 (None, None, None)
876 if associated_vim is None
877 else associated_vim.partition(":")
878 )
879
880 if associated_vim:
881 # get associated VIM
882 if associated_vim not in self.db_vims:
883 self.db_vims[associated_vim] = self.db.get_one(
884 "vim_accounts", {"_id": vim_account_id}
885 )
886
887 db_vim = self.db_vims[associated_vim]
888
889 # look for ports to connect
890 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
891 # print(ports)
892
893 sdn_ports = []
894 pending_ports = error_ports = 0
895 vlan_used = None
896 sdn_need_update = False
897
898 for port in ports:
899 vlan_used = port.get("vlan") or vlan_used
900
901 # TODO. Do not connect if already done
902 if not port.get("compute_node") or not port.get("pci"):
903 if port.get("status") == "ERROR":
904 error_ports += 1
905 else:
906 pending_ports += 1
907 continue
908
909 pmap = None
910 compute_node_mappings = next(
911 (
912 c
913 for c in db_vim["config"].get("sdn-port-mapping", ())
914 if c and c["compute_node"] == port["compute_node"]
915 ),
916 None,
917 )
918
919 if compute_node_mappings:
920 # process port_mapping pci of type 0000:af:1[01].[1357]
921 pmap = next(
922 (
923 p
924 for p in compute_node_mappings["ports"]
925 if self._match_pci(port["pci"], p.get("pci"))
926 ),
927 None,
928 )
929
930 if not pmap:
931 if not db_vim["config"].get("mapping_not_needed"):
932 error_list.append(
933 "Port mapping not found for compute_node={} pci={}".format(
934 port["compute_node"], port["pci"]
935 )
936 )
937 continue
938
939 pmap = {}
940
941 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
942 new_port = {
943 "service_endpoint_id": pmap.get("service_endpoint_id")
944 or service_endpoint_id,
945 "service_endpoint_encapsulation_type": "dot1q"
946 if port["type"] == "SR-IOV"
947 else None,
948 "service_endpoint_encapsulation_info": {
949 "vlan": port.get("vlan"),
950 "mac": port.get("mac-address"),
951 "device_id": pmap.get("device_id") or port["compute_node"],
952 "device_interface_id": pmap.get("device_interface_id")
953 or port["pci"],
954 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
955 "switch_port": pmap.get("switch_port"),
956 "service_mapping_info": pmap.get("service_mapping_info"),
957 },
958 }
959
960 # TODO
961 # if port["modified_at"] > last_update:
962 # sdn_need_update = True
963 new_connected_ports.append(port["id"]) # TODO
964 sdn_ports.append(new_port)
965
966 if error_ports:
967 error_list.append(
968 "{} interfaces have not been created as VDU is on ERROR status".format(
969 error_ports
970 )
971 )
972
973 # connect external ports
974 for index, additional_port in enumerate(additional_ports):
975 additional_port_id = additional_port.get(
976 "service_endpoint_id"
977 ) or "external-{}".format(index)
978 sdn_ports.append(
979 {
980 "service_endpoint_id": additional_port_id,
981 "service_endpoint_encapsulation_type": additional_port.get(
982 "service_endpoint_encapsulation_type", "dot1q"
983 ),
984 "service_endpoint_encapsulation_info": {
985 "vlan": additional_port.get("vlan") or vlan_used,
986 "mac": additional_port.get("mac_address"),
987 "device_id": additional_port.get("device_id"),
988 "device_interface_id": additional_port.get(
989 "device_interface_id"
990 ),
991 "switch_dpid": additional_port.get("switch_dpid")
992 or additional_port.get("switch_id"),
993 "switch_port": additional_port.get("switch_port"),
994 "service_mapping_info": additional_port.get(
995 "service_mapping_info"
996 ),
997 },
998 }
999 )
1000 new_connected_ports.append(additional_port_id)
1001 sdn_info = ""
1002
1003 # if there are more ports to connect or they have been modified, call create/update
1004 if error_list:
1005 sdn_status = "ERROR"
1006 sdn_info = "; ".join(error_list)
1007 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1008 last_update = time.time()
1009
1010 if not sdn_net_id:
1011 if len(sdn_ports) < 2:
1012 sdn_status = "ACTIVE"
1013
1014 if not pending_ports:
1015 self.logger.debug(
1016 "task={} {} new-sdn-net done, less than 2 ports".format(
1017 task_id, ro_task["target_id"]
1018 )
1019 )
1020 else:
1021 net_type = params.get("type") or "ELAN"
1022 (
1023 sdn_net_id,
1024 created_items,
1025 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1026 created = True
1027 self.logger.debug(
1028 "task={} {} new-sdn-net={} created={}".format(
1029 task_id, ro_task["target_id"], sdn_net_id, created
1030 )
1031 )
1032 else:
1033 created_items = target_vim.edit_connectivity_service(
1034 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1035 )
1036 created = True
1037 self.logger.debug(
1038 "task={} {} update-sdn-net={} created={}".format(
1039 task_id, ro_task["target_id"], sdn_net_id, created
1040 )
1041 )
1042
1043 connected_ports = new_connected_ports
1044 elif sdn_net_id:
1045 wim_status_dict = target_vim.get_connectivity_service_status(
1046 sdn_net_id, conn_info=created_items
1047 )
1048 sdn_status = wim_status_dict["sdn_status"]
1049
1050 if wim_status_dict.get("sdn_info"):
1051 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1052
1053 if wim_status_dict.get("error_msg"):
1054 sdn_info = wim_status_dict.get("error_msg") or ""
1055
1056 if pending_ports:
1057 if sdn_status != "ERROR":
1058 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1059 len(ports) - pending_ports, len(ports)
1060 )
1061
1062 if sdn_status == "ACTIVE":
1063 sdn_status = "BUILD"
1064
1065 ro_vim_item_update = {
1066 "vim_id": sdn_net_id,
1067 "vim_status": sdn_status,
1068 "created": created,
1069 "created_items": created_items,
1070 "connected_ports": connected_ports,
1071 "vim_details": sdn_info,
1072 "last_update": last_update,
1073 }
1074
1075 return sdn_status, ro_vim_item_update
1076 except Exception as e:
1077 self.logger.error(
1078 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1079 exc_info=not isinstance(
1080 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1081 ),
1082 )
1083 ro_vim_item_update = {
1084 "vim_status": "VIM_ERROR",
1085 "created": created,
1086 "vim_details": str(e),
1087 }
1088
1089 return "FAILED", ro_vim_item_update
1090
1091 def delete(self, ro_task, task_index):
1092 task = ro_task["tasks"][task_index]
1093 task_id = task["task_id"]
1094 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1095 ro_vim_item_update_ok = {
1096 "vim_status": "DELETED",
1097 "created": False,
1098 "vim_details": "DELETED",
1099 "vim_id": None,
1100 }
1101
1102 try:
1103 if sdn_vim_id:
1104 target_vim = self.my_vims[ro_task["target_id"]]
1105 target_vim.delete_connectivity_service(
1106 sdn_vim_id, ro_task["vim_info"].get("created_items")
1107 )
1108
1109 except Exception as e:
1110 if (
1111 isinstance(e, sdnconn.SdnConnectorError)
1112 and e.http_code == HTTPStatus.NOT_FOUND.value
1113 ):
1114 ro_vim_item_update_ok["vim_details"] = "already deleted"
1115 else:
1116 self.logger.error(
1117 "ro_task={} vim={} del-sdn-net={}: {}".format(
1118 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1119 ),
1120 exc_info=not isinstance(
1121 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1122 ),
1123 )
1124 ro_vim_item_update = {
1125 "vim_status": "VIM_ERROR",
1126 "vim_details": "Error while deleting: {}".format(e),
1127 }
1128
1129 return "FAILED", ro_vim_item_update
1130
1131 self.logger.debug(
1132 "task={} {} del-sdn-net={} {}".format(
1133 task_id,
1134 ro_task["target_id"],
1135 sdn_vim_id,
1136 ro_vim_item_update_ok.get("vim_details", ""),
1137 )
1138 )
1139
1140 return "DONE", ro_vim_item_update_ok
1141
1142
1143 class NsWorker(threading.Thread):
1144 REFRESH_BUILD = 5 # 5 seconds
1145 REFRESH_ACTIVE = 60 # 1 minute
1146 REFRESH_ERROR = 600
1147 REFRESH_IMAGE = 3600 * 10
1148 REFRESH_DELETE = 3600 * 10
1149 QUEUE_SIZE = 100
1150 terminate = False
1151
1152 def __init__(self, worker_index, config, plugins, db):
1153 """
1154
1155 :param worker_index: thread index
1156 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1157 :param plugins: global shared dict with the loaded plugins
1158 :param db: database class instance to use
1159 """
1160 threading.Thread.__init__(self)
1161 self.config = config
1162 self.plugins = plugins
1163 self.plugin_name = "unknown"
1164 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1165 self.worker_index = worker_index
1166 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1167 # targetvim: vimplugin class
1168 self.my_vims = {}
1169 # targetvim: vim information from database
1170 self.db_vims = {}
1171 # targetvim list
1172 self.vim_targets = []
1173 self.my_id = config["process_id"] + ":" + str(worker_index)
1174 self.db = db
1175 self.item2class = {
1176 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1177 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1178 "image": VimInteractionImage(
1179 self.db, self.my_vims, self.db_vims, self.logger
1180 ),
1181 "flavor": VimInteractionFlavor(
1182 self.db, self.my_vims, self.db_vims, self.logger
1183 ),
1184 "sdn_net": VimInteractionSdnNet(
1185 self.db, self.my_vims, self.db_vims, self.logger
1186 ),
1187 }
1188 self.time_last_task_processed = None
1189 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1190 self.tasks_to_delete = []
1191 # it is idle when there are not vim_targets associated
1192 self.idle = True
1193 self.task_locked_time = config["global"]["task_locked_time"]
1194
1195 def insert_task(self, task):
1196 try:
1197 self.task_queue.put(task, False)
1198 return None
1199 except queue.Full:
1200 raise NsWorkerException("timeout inserting a task")
1201
1202 def terminate(self):
1203 self.insert_task("exit")
1204
1205 def del_task(self, task):
1206 with self.task_lock:
1207 if task["status"] == "SCHEDULED":
1208 task["status"] = "SUPERSEDED"
1209 return True
1210 else: # task["status"] == "processing"
1211 self.task_lock.release()
1212 return False
1213
1214 def _process_vim_config(self, target_id, db_vim):
1215 """
1216 Process vim config, creating vim configuration files as ca_cert
1217 :param target_id: vim/sdn/wim + id
1218 :param db_vim: Vim dictionary obtained from database
1219 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1220 """
1221 if not db_vim.get("config"):
1222 return
1223
1224 file_name = ""
1225
1226 try:
1227 if db_vim["config"].get("ca_cert_content"):
1228 file_name = "{}:{}".format(target_id, self.worker_index)
1229
1230 try:
1231 mkdir(file_name)
1232 except FileExistsError:
1233 pass
1234
1235 file_name = file_name + "/ca_cert"
1236
1237 with open(file_name, "w") as f:
1238 f.write(db_vim["config"]["ca_cert_content"])
1239 del db_vim["config"]["ca_cert_content"]
1240 db_vim["config"]["ca_cert"] = file_name
1241 except Exception as e:
1242 raise NsWorkerException(
1243 "Error writing to file '{}': {}".format(file_name, e)
1244 )
1245
1246 def _load_plugin(self, name, type="vim"):
1247 # type can be vim or sdn
1248 if "rovim_dummy" not in self.plugins:
1249 self.plugins["rovim_dummy"] = VimDummyConnector
1250
1251 if "rosdn_dummy" not in self.plugins:
1252 self.plugins["rosdn_dummy"] = SdnDummyConnector
1253
1254 if name in self.plugins:
1255 return self.plugins[name]
1256
1257 try:
1258 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1259 self.plugins[name] = ep.load()
1260 except Exception as e:
1261 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1262
1263 if name and name not in self.plugins:
1264 raise NsWorkerException(
1265 "Plugin 'osm_{n}' has not been installed".format(n=name)
1266 )
1267
1268 return self.plugins[name]
1269
1270 def _unload_vim(self, target_id):
1271 """
1272 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1273 :param target_id: Contains type:_id; where type can be 'vim', ...
1274 :return: None.
1275 """
1276 try:
1277 self.db_vims.pop(target_id, None)
1278 self.my_vims.pop(target_id, None)
1279
1280 if target_id in self.vim_targets:
1281 self.vim_targets.remove(target_id)
1282
1283 self.logger.info("Unloaded {}".format(target_id))
1284 rmtree("{}:{}".format(target_id, self.worker_index))
1285 except FileNotFoundError:
1286 pass # this is raised by rmtree if folder does not exist
1287 except Exception as e:
1288 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1289
1290 def _check_vim(self, target_id):
1291 """
1292 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1293 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1294 :return: None.
1295 """
1296 target, _, _id = target_id.partition(":")
1297 now = time.time()
1298 update_dict = {}
1299 unset_dict = {}
1300 op_text = ""
1301 step = ""
1302 loaded = target_id in self.vim_targets
1303 target_database = (
1304 "vim_accounts"
1305 if target == "vim"
1306 else "wim_accounts"
1307 if target == "wim"
1308 else "sdns"
1309 )
1310
1311 try:
1312 step = "Getting {} from db".format(target_id)
1313 db_vim = self.db.get_one(target_database, {"_id": _id})
1314
1315 for op_index, operation in enumerate(
1316 db_vim["_admin"].get("operations", ())
1317 ):
1318 if operation["operationState"] != "PROCESSING":
1319 continue
1320
1321 locked_at = operation.get("locked_at")
1322
1323 if locked_at is not None and locked_at >= now - self.task_locked_time:
1324 # some other thread is doing this operation
1325 return
1326
1327 # lock
1328 op_text = "_admin.operations.{}.".format(op_index)
1329
1330 if not self.db.set_one(
1331 target_database,
1332 q_filter={
1333 "_id": _id,
1334 op_text + "operationState": "PROCESSING",
1335 op_text + "locked_at": locked_at,
1336 },
1337 update_dict={
1338 op_text + "locked_at": now,
1339 "admin.current_operation": op_index,
1340 },
1341 fail_on_empty=False,
1342 ):
1343 return
1344
1345 unset_dict[op_text + "locked_at"] = None
1346 unset_dict["current_operation"] = None
1347 step = "Loading " + target_id
1348 error_text = self._load_vim(target_id)
1349
1350 if not error_text:
1351 step = "Checking connectivity"
1352
1353 if target == "vim":
1354 self.my_vims[target_id].check_vim_connectivity()
1355 else:
1356 self.my_vims[target_id].check_credentials()
1357
1358 update_dict["_admin.operationalState"] = "ENABLED"
1359 update_dict["_admin.detailed-status"] = ""
1360 unset_dict[op_text + "detailed-status"] = None
1361 update_dict[op_text + "operationState"] = "COMPLETED"
1362
1363 return
1364
1365 except Exception as e:
1366 error_text = "{}: {}".format(step, e)
1367 self.logger.error("{} for {}: {}".format(step, target_id, e))
1368
1369 finally:
1370 if update_dict or unset_dict:
1371 if error_text:
1372 update_dict[op_text + "operationState"] = "FAILED"
1373 update_dict[op_text + "detailed-status"] = error_text
1374 unset_dict.pop(op_text + "detailed-status", None)
1375 update_dict["_admin.operationalState"] = "ERROR"
1376 update_dict["_admin.detailed-status"] = error_text
1377
1378 if op_text:
1379 update_dict[op_text + "statusEnteredTime"] = now
1380
1381 self.db.set_one(
1382 target_database,
1383 q_filter={"_id": _id},
1384 update_dict=update_dict,
1385 unset=unset_dict,
1386 fail_on_empty=False,
1387 )
1388
1389 if not loaded:
1390 self._unload_vim(target_id)
1391
1392 def _reload_vim(self, target_id):
1393 if target_id in self.vim_targets:
1394 self._load_vim(target_id)
1395 else:
1396 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1397 # just remove it to force load again next time it is needed
1398 self.db_vims.pop(target_id, None)
1399
1400 def _load_vim(self, target_id):
1401 """
1402 Load or reload a vim_account, sdn_controller or wim_account.
1403 Read content from database, load the plugin if not loaded.
1404 In case of error loading the plugin, it load a failing VIM_connector
1405 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1406 :param target_id: Contains type:_id; where type can be 'vim', ...
1407 :return: None if ok, descriptive text if error
1408 """
1409 target, _, _id = target_id.partition(":")
1410 target_database = (
1411 "vim_accounts"
1412 if target == "vim"
1413 else "wim_accounts"
1414 if target == "wim"
1415 else "sdns"
1416 )
1417 plugin_name = ""
1418 vim = None
1419
1420 try:
1421 step = "Getting {}={} from db".format(target, _id)
1422 # TODO process for wim, sdnc, ...
1423 vim = self.db.get_one(target_database, {"_id": _id})
1424
1425 # if deep_get(vim, "config", "sdn-controller"):
1426 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1427 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1428
1429 step = "Decrypting password"
1430 schema_version = vim.get("schema_version")
1431 self.db.encrypt_decrypt_fields(
1432 vim,
1433 "decrypt",
1434 fields=("password", "secret"),
1435 schema_version=schema_version,
1436 salt=_id,
1437 )
1438 self._process_vim_config(target_id, vim)
1439
1440 if target == "vim":
1441 plugin_name = "rovim_" + vim["vim_type"]
1442 step = "Loading plugin '{}'".format(plugin_name)
1443 vim_module_conn = self._load_plugin(plugin_name)
1444 step = "Loading {}'".format(target_id)
1445 self.my_vims[target_id] = vim_module_conn(
1446 uuid=vim["_id"],
1447 name=vim["name"],
1448 tenant_id=vim.get("vim_tenant_id"),
1449 tenant_name=vim.get("vim_tenant_name"),
1450 url=vim["vim_url"],
1451 url_admin=None,
1452 user=vim["vim_user"],
1453 passwd=vim["vim_password"],
1454 config=vim.get("config") or {},
1455 persistent_info={},
1456 )
1457 else: # sdn
1458 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1459 step = "Loading plugin '{}'".format(plugin_name)
1460 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1461 step = "Loading {}'".format(target_id)
1462 wim = deepcopy(vim)
1463 wim_config = wim.pop("config", {}) or {}
1464 wim["uuid"] = wim["_id"]
1465 wim["wim_url"] = wim["url"]
1466
1467 if wim.get("dpid"):
1468 wim_config["dpid"] = wim.pop("dpid")
1469
1470 if wim.get("switch_id"):
1471 wim_config["switch_id"] = wim.pop("switch_id")
1472
1473 # wim, wim_account, config
1474 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1475 self.db_vims[target_id] = vim
1476 self.error_status = None
1477
1478 self.logger.info(
1479 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1480 )
1481 except Exception as e:
1482 self.logger.error(
1483 "Cannot load {} plugin={}: {} {}".format(
1484 target_id, plugin_name, step, e
1485 )
1486 )
1487
1488 self.db_vims[target_id] = vim or {}
1489 self.db_vims[target_id] = FailingConnector(str(e))
1490 error_status = "{} Error: {}".format(step, e)
1491
1492 return error_status
1493 finally:
1494 if target_id not in self.vim_targets:
1495 self.vim_targets.append(target_id)
1496
1497 def _get_db_task(self):
1498 """
1499 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1500 :return: None
1501 """
1502 now = time.time()
1503
1504 if not self.time_last_task_processed:
1505 self.time_last_task_processed = now
1506
1507 try:
1508 while True:
1509 """
1510 # Log RO tasks only when loglevel is DEBUG
1511 if self.logger.getEffectiveLevel() == logging.DEBUG:
1512 self._log_ro_task(
1513 None,
1514 None,
1515 None,
1516 "TASK_WF",
1517 "task_locked_time="
1518 + str(self.task_locked_time)
1519 + " "
1520 + "time_last_task_processed="
1521 + str(self.time_last_task_processed)
1522 + " "
1523 + "now="
1524 + str(now),
1525 )
1526 """
1527 locked = self.db.set_one(
1528 "ro_tasks",
1529 q_filter={
1530 "target_id": self.vim_targets,
1531 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1532 "locked_at.lt": now - self.task_locked_time,
1533 "to_check_at.lt": self.time_last_task_processed,
1534 },
1535 update_dict={"locked_by": self.my_id, "locked_at": now},
1536 fail_on_empty=False,
1537 )
1538
1539 if locked:
1540 # read and return
1541 ro_task = self.db.get_one(
1542 "ro_tasks",
1543 q_filter={
1544 "target_id": self.vim_targets,
1545 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1546 "locked_at": now,
1547 },
1548 )
1549 return ro_task
1550
1551 if self.time_last_task_processed == now:
1552 self.time_last_task_processed = None
1553 return None
1554 else:
1555 self.time_last_task_processed = now
1556 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1557
1558 except DbException as e:
1559 self.logger.error("Database exception at _get_db_task: {}".format(e))
1560 except Exception as e:
1561 self.logger.critical(
1562 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1563 )
1564
1565 return None
1566
1567 def _get_db_all_tasks(self):
1568 """
1569 Read all content of table ro_tasks to log it
1570 :return: None
1571 """
1572 try:
1573 # Checking the content of the BD:
1574
1575 # read and return
1576 ro_task = self.db.get_list("ro_tasks")
1577 for rt in ro_task:
1578 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1579 return ro_task
1580
1581 except DbException as e:
1582 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1583 except Exception as e:
1584 self.logger.critical(
1585 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1586 )
1587
1588 return None
1589
1590 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1591 """
1592 Generate a log with the following format:
1593
1594 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1595 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1596 task_array_index;task_id;task_action;task_item;task_args
1597
1598 Example:
1599
1600 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1601 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1602 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1603 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1604 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1605 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1606 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1607 """
1608 try:
1609 line = []
1610 i = 0
1611 if ro_task is not None and isinstance(ro_task, dict):
1612 for t in ro_task["tasks"]:
1613 line.clear()
1614 line.append(mark)
1615 line.append(event)
1616 line.append(ro_task.get("_id", ""))
1617 line.append(str(ro_task.get("locked_at", "")))
1618 line.append(str(ro_task.get("modified_at", "")))
1619 line.append(str(ro_task.get("created_at", "")))
1620 line.append(str(ro_task.get("to_check_at", "")))
1621 line.append(str(ro_task.get("locked_by", "")))
1622 line.append(str(ro_task.get("target_id", "")))
1623 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1624 line.append(str(ro_task.get("vim_info", "")))
1625 line.append(str(ro_task.get("tasks", "")))
1626 if isinstance(t, dict):
1627 line.append(str(t.get("status", "")))
1628 line.append(str(t.get("action_id", "")))
1629 line.append(str(i))
1630 line.append(str(t.get("task_id", "")))
1631 line.append(str(t.get("action", "")))
1632 line.append(str(t.get("item", "")))
1633 line.append(str(t.get("find_params", "")))
1634 line.append(str(t.get("params", "")))
1635 else:
1636 line.extend([""] * 2)
1637 line.append(str(i))
1638 line.extend([""] * 5)
1639
1640 i += 1
1641 self.logger.debug(";".join(line))
1642 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1643 i = 0
1644 while True:
1645 st = "tasks.{}.status".format(i)
1646 if st not in db_ro_task_update:
1647 break
1648 line.clear()
1649 line.append(mark)
1650 line.append(event)
1651 line.append(db_ro_task_update.get("_id", ""))
1652 line.append(str(db_ro_task_update.get("locked_at", "")))
1653 line.append(str(db_ro_task_update.get("modified_at", "")))
1654 line.append("")
1655 line.append(str(db_ro_task_update.get("to_check_at", "")))
1656 line.append(str(db_ro_task_update.get("locked_by", "")))
1657 line.append("")
1658 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1659 line.append("")
1660 line.append(str(db_ro_task_update.get("vim_info", "")))
1661 line.append(str(str(db_ro_task_update).count(".status")))
1662 line.append(db_ro_task_update.get(st, ""))
1663 line.append("")
1664 line.append(str(i))
1665 line.extend([""] * 3)
1666 i += 1
1667 self.logger.debug(";".join(line))
1668
1669 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1670 line.clear()
1671 line.append(mark)
1672 line.append(event)
1673 line.append(db_ro_task_delete.get("_id", ""))
1674 line.append("")
1675 line.append(db_ro_task_delete.get("modified_at", ""))
1676 line.extend([""] * 13)
1677 self.logger.debug(";".join(line))
1678
1679 else:
1680 line.clear()
1681 line.append(mark)
1682 line.append(event)
1683 line.extend([""] * 16)
1684 self.logger.debug(";".join(line))
1685
1686 except Exception as e:
1687 self.logger.error("Error logging ro_task: {}".format(e))
1688
1689 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1690 """
1691 Determine if this task need to be done or superseded
1692 :return: None
1693 """
1694 my_task = ro_task["tasks"][task_index]
1695 task_id = my_task["task_id"]
1696 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1697 "created_items", False
1698 )
1699
1700 if my_task["status"] == "FAILED":
1701 return None, None # TODO need to be retry??
1702
1703 try:
1704 for index, task in enumerate(ro_task["tasks"]):
1705 if index == task_index or not task:
1706 continue # own task
1707
1708 if (
1709 my_task["target_record"] == task["target_record"]
1710 and task["action"] == "CREATE"
1711 ):
1712 # set to finished
1713 db_update["tasks.{}.status".format(index)] = task[
1714 "status"
1715 ] = "FINISHED"
1716 elif task["action"] == "CREATE" and task["status"] not in (
1717 "FINISHED",
1718 "SUPERSEDED",
1719 ):
1720 needed_delete = False
1721
1722 if needed_delete:
1723 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1724 else:
1725 return "SUPERSEDED", None
1726 except Exception as e:
1727 if not isinstance(e, NsWorkerException):
1728 self.logger.critical(
1729 "Unexpected exception at _delete_task task={}: {}".format(
1730 task_id, e
1731 ),
1732 exc_info=True,
1733 )
1734
1735 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1736
1737 def _create_task(self, ro_task, task_index, task_depends, db_update):
1738 """
1739 Determine if this task need to create something at VIM
1740 :return: None
1741 """
1742 my_task = ro_task["tasks"][task_index]
1743 task_id = my_task["task_id"]
1744 task_status = None
1745
1746 if my_task["status"] == "FAILED":
1747 return None, None # TODO need to be retry??
1748 elif my_task["status"] == "SCHEDULED":
1749 # check if already created by another task
1750 for index, task in enumerate(ro_task["tasks"]):
1751 if index == task_index or not task:
1752 continue # own task
1753
1754 if task["action"] == "CREATE" and task["status"] not in (
1755 "SCHEDULED",
1756 "FINISHED",
1757 "SUPERSEDED",
1758 ):
1759 return task["status"], "COPY_VIM_INFO"
1760
1761 try:
1762 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1763 ro_task, task_index, task_depends
1764 )
1765 # TODO update other CREATE tasks
1766 except Exception as e:
1767 if not isinstance(e, NsWorkerException):
1768 self.logger.error(
1769 "Error executing task={}: {}".format(task_id, e), exc_info=True
1770 )
1771
1772 task_status = "FAILED"
1773 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1774 # TODO update ro_vim_item_update
1775
1776 return task_status, ro_vim_item_update
1777 else:
1778 return None, None
1779
1780 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1781 """
1782 Look for dependency task
1783 :param task_id: Can be one of
1784 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1785 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1786 3. task.task_id: "<action_id>:number"
1787 :param ro_task:
1788 :param target_id:
1789 :return: database ro_task plus index of task
1790 """
1791 if (
1792 task_id.startswith("vim:")
1793 or task_id.startswith("sdn:")
1794 or task_id.startswith("wim:")
1795 ):
1796 target_id, _, task_id = task_id.partition(" ")
1797
1798 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1799 ro_task_dependency = self.db.get_one(
1800 "ro_tasks",
1801 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1802 fail_on_empty=False,
1803 )
1804
1805 if ro_task_dependency:
1806 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1807 if task["target_record_id"] == task_id:
1808 return ro_task_dependency, task_index
1809
1810 else:
1811 if ro_task:
1812 for task_index, task in enumerate(ro_task["tasks"]):
1813 if task and task["task_id"] == task_id:
1814 return ro_task, task_index
1815
1816 ro_task_dependency = self.db.get_one(
1817 "ro_tasks",
1818 q_filter={
1819 "tasks.ANYINDEX.task_id": task_id,
1820 "tasks.ANYINDEX.target_record.ne": None,
1821 },
1822 fail_on_empty=False,
1823 )
1824
1825 if ro_task_dependency:
1826 for task_index, task in ro_task_dependency["tasks"]:
1827 if task["task_id"] == task_id:
1828 return ro_task_dependency, task_index
1829 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1830
1831 def _process_pending_tasks(self, ro_task):
1832 ro_task_id = ro_task["_id"]
1833 now = time.time()
1834 # one day
1835 next_check_at = now + (24 * 60 * 60)
1836 db_ro_task_update = {}
1837
1838 def _update_refresh(new_status):
1839 # compute next_refresh
1840 nonlocal task
1841 nonlocal next_check_at
1842 nonlocal db_ro_task_update
1843 nonlocal ro_task
1844
1845 next_refresh = time.time()
1846
1847 if task["item"] in ("image", "flavor"):
1848 next_refresh += self.REFRESH_IMAGE
1849 elif new_status == "BUILD":
1850 next_refresh += self.REFRESH_BUILD
1851 elif new_status == "DONE":
1852 next_refresh += self.REFRESH_ACTIVE
1853 else:
1854 next_refresh += self.REFRESH_ERROR
1855
1856 next_check_at = min(next_check_at, next_refresh)
1857 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1858 ro_task["vim_info"]["refresh_at"] = next_refresh
1859
1860 try:
1861 """
1862 # Log RO tasks only when loglevel is DEBUG
1863 if self.logger.getEffectiveLevel() == logging.DEBUG:
1864 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1865 """
1866 # 0: get task_status_create
1867 lock_object = None
1868 task_status_create = None
1869 task_create = next(
1870 (
1871 t
1872 for t in ro_task["tasks"]
1873 if t
1874 and t["action"] == "CREATE"
1875 and t["status"] in ("BUILD", "DONE")
1876 ),
1877 None,
1878 )
1879
1880 if task_create:
1881 task_status_create = task_create["status"]
1882
1883 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1884 for task_action in ("DELETE", "CREATE", "EXEC"):
1885 db_vim_update = None
1886 new_status = None
1887
1888 for task_index, task in enumerate(ro_task["tasks"]):
1889 if not task:
1890 continue # task deleted
1891
1892 task_depends = {}
1893 target_update = None
1894
1895 if (
1896 (
1897 task_action in ("DELETE", "EXEC")
1898 and task["status"] not in ("SCHEDULED", "BUILD")
1899 )
1900 or task["action"] != task_action
1901 or (
1902 task_action == "CREATE"
1903 and task["status"] in ("FINISHED", "SUPERSEDED")
1904 )
1905 ):
1906 continue
1907
1908 task_path = "tasks.{}.status".format(task_index)
1909 try:
1910 db_vim_info_update = None
1911
1912 if task["status"] == "SCHEDULED":
1913 # check if tasks that this depends on have been completed
1914 dependency_not_completed = False
1915
1916 for dependency_task_id in task.get("depends_on") or ():
1917 (
1918 dependency_ro_task,
1919 dependency_task_index,
1920 ) = self._get_dependency(
1921 dependency_task_id, target_id=ro_task["target_id"]
1922 )
1923 dependency_task = dependency_ro_task["tasks"][
1924 dependency_task_index
1925 ]
1926
1927 if dependency_task["status"] == "SCHEDULED":
1928 dependency_not_completed = True
1929 next_check_at = min(
1930 next_check_at, dependency_ro_task["to_check_at"]
1931 )
1932 # must allow dependent task to be processed first
1933 # to do this set time after last_task_processed
1934 next_check_at = max(
1935 self.time_last_task_processed, next_check_at
1936 )
1937 break
1938 elif dependency_task["status"] == "FAILED":
1939 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1940 task["action"],
1941 task["item"],
1942 dependency_task["action"],
1943 dependency_task["item"],
1944 dependency_task_id,
1945 dependency_ro_task["vim_info"].get(
1946 "vim_details"
1947 ),
1948 )
1949 self.logger.error(
1950 "task={} {}".format(task["task_id"], error_text)
1951 )
1952 raise NsWorkerException(error_text)
1953
1954 task_depends[dependency_task_id] = dependency_ro_task[
1955 "vim_info"
1956 ]["vim_id"]
1957 task_depends[
1958 "TASK-{}".format(dependency_task_id)
1959 ] = dependency_ro_task["vim_info"]["vim_id"]
1960
1961 if dependency_not_completed:
1962 # TODO set at vim_info.vim_details that it is waiting
1963 continue
1964
1965 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
1966 # the task of renew this locking. It will update database locket_at periodically
1967 if not lock_object:
1968 lock_object = LockRenew.add_lock_object(
1969 "ro_tasks", ro_task, self
1970 )
1971
1972 if task["action"] == "DELETE":
1973 (new_status, db_vim_info_update,) = self._delete_task(
1974 ro_task, task_index, task_depends, db_ro_task_update
1975 )
1976 new_status = (
1977 "FINISHED" if new_status == "DONE" else new_status
1978 )
1979 # ^with FINISHED instead of DONE it will not be refreshing
1980
1981 if new_status in ("FINISHED", "SUPERSEDED"):
1982 target_update = "DELETE"
1983 elif task["action"] == "EXEC":
1984 (
1985 new_status,
1986 db_vim_info_update,
1987 db_task_update,
1988 ) = self.item2class[task["item"]].exec(
1989 ro_task, task_index, task_depends
1990 )
1991 new_status = (
1992 "FINISHED" if new_status == "DONE" else new_status
1993 )
1994 # ^with FINISHED instead of DONE it will not be refreshing
1995
1996 if db_task_update:
1997 # load into database the modified db_task_update "retries" and "next_retry"
1998 if db_task_update.get("retries"):
1999 db_ro_task_update[
2000 "tasks.{}.retries".format(task_index)
2001 ] = db_task_update["retries"]
2002
2003 next_check_at = time.time() + db_task_update.get(
2004 "next_retry", 60
2005 )
2006 target_update = None
2007 elif task["action"] == "CREATE":
2008 if task["status"] == "SCHEDULED":
2009 if task_status_create:
2010 new_status = task_status_create
2011 target_update = "COPY_VIM_INFO"
2012 else:
2013 new_status, db_vim_info_update = self.item2class[
2014 task["item"]
2015 ].new(ro_task, task_index, task_depends)
2016 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2017 _update_refresh(new_status)
2018 else:
2019 if (
2020 ro_task["vim_info"]["refresh_at"]
2021 and now > ro_task["vim_info"]["refresh_at"]
2022 ):
2023 new_status, db_vim_info_update = self.item2class[
2024 task["item"]
2025 ].refresh(ro_task)
2026 _update_refresh(new_status)
2027 else:
2028 # The refresh is updated to avoid set the value of "refresh_at" to
2029 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2030 # because it can happen that in this case the task is never processed
2031 _update_refresh(task["status"])
2032
2033 except Exception as e:
2034 new_status = "FAILED"
2035 db_vim_info_update = {
2036 "vim_status": "VIM_ERROR",
2037 "vim_details": str(e),
2038 }
2039
2040 if not isinstance(
2041 e, (NsWorkerException, vimconn.VimConnException)
2042 ):
2043 self.logger.error(
2044 "Unexpected exception at _delete_task task={}: {}".format(
2045 task["task_id"], e
2046 ),
2047 exc_info=True,
2048 )
2049
2050 try:
2051 if db_vim_info_update:
2052 db_vim_update = db_vim_info_update.copy()
2053 db_ro_task_update.update(
2054 {
2055 "vim_info." + k: v
2056 for k, v in db_vim_info_update.items()
2057 }
2058 )
2059 ro_task["vim_info"].update(db_vim_info_update)
2060
2061 if new_status:
2062 if task_action == "CREATE":
2063 task_status_create = new_status
2064 db_ro_task_update[task_path] = new_status
2065
2066 if target_update or db_vim_update:
2067 if target_update == "DELETE":
2068 self._update_target(task, None)
2069 elif target_update == "COPY_VIM_INFO":
2070 self._update_target(task, ro_task["vim_info"])
2071 else:
2072 self._update_target(task, db_vim_update)
2073
2074 except Exception as e:
2075 if (
2076 isinstance(e, DbException)
2077 and e.http_code == HTTPStatus.NOT_FOUND
2078 ):
2079 # if the vnfrs or nsrs has been removed from database, this task must be removed
2080 self.logger.debug(
2081 "marking to delete task={}".format(task["task_id"])
2082 )
2083 self.tasks_to_delete.append(task)
2084 else:
2085 self.logger.error(
2086 "Unexpected exception at _update_target task={}: {}".format(
2087 task["task_id"], e
2088 ),
2089 exc_info=True,
2090 )
2091
2092 locked_at = ro_task["locked_at"]
2093
2094 if lock_object:
2095 locked_at = [
2096 lock_object["locked_at"],
2097 lock_object["locked_at"] + self.task_locked_time,
2098 ]
2099 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2100 # contain exactly locked_at + self.task_locked_time
2101 LockRenew.remove_lock_object(lock_object)
2102
2103 q_filter = {
2104 "_id": ro_task["_id"],
2105 "to_check_at": ro_task["to_check_at"],
2106 "locked_at": locked_at,
2107 }
2108 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2109 # outside this task (by ro_nbi) do not update it
2110 db_ro_task_update["locked_by"] = None
2111 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2112 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2113 db_ro_task_update["modified_at"] = now
2114 db_ro_task_update["to_check_at"] = next_check_at
2115
2116 """
2117 # Log RO tasks only when loglevel is DEBUG
2118 if self.logger.getEffectiveLevel() == logging.DEBUG:
2119 db_ro_task_update_log = db_ro_task_update.copy()
2120 db_ro_task_update_log["_id"] = q_filter["_id"]
2121 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2122 """
2123
2124 if not self.db.set_one(
2125 "ro_tasks",
2126 update_dict=db_ro_task_update,
2127 q_filter=q_filter,
2128 fail_on_empty=False,
2129 ):
2130 del db_ro_task_update["to_check_at"]
2131 del q_filter["to_check_at"]
2132 """
2133 # Log RO tasks only when loglevel is DEBUG
2134 if self.logger.getEffectiveLevel() == logging.DEBUG:
2135 self._log_ro_task(
2136 None,
2137 db_ro_task_update_log,
2138 None,
2139 "TASK_WF",
2140 "SET_TASK " + str(q_filter),
2141 )
2142 """
2143 self.db.set_one(
2144 "ro_tasks",
2145 q_filter=q_filter,
2146 update_dict=db_ro_task_update,
2147 fail_on_empty=True,
2148 )
2149 except DbException as e:
2150 self.logger.error(
2151 "ro_task={} Error updating database {}".format(ro_task_id, e)
2152 )
2153 except Exception as e:
2154 self.logger.error(
2155 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2156 )
2157
2158 def _update_target(self, task, ro_vim_item_update):
2159 table, _, temp = task["target_record"].partition(":")
2160 _id, _, path_vim_status = temp.partition(":")
2161 path_item = path_vim_status[: path_vim_status.rfind(".")]
2162 path_item = path_item[: path_item.rfind(".")]
2163 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2164 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2165
2166 if ro_vim_item_update:
2167 update_dict = {
2168 path_vim_status + "." + k: v
2169 for k, v in ro_vim_item_update.items()
2170 if k
2171 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2172 }
2173
2174 if path_vim_status.startswith("vdur."):
2175 # for backward compatibility, add vdur.name apart from vdur.vim_name
2176 if ro_vim_item_update.get("vim_name"):
2177 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2178
2179 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2180 if ro_vim_item_update.get("vim_id"):
2181 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2182
2183 # update general status
2184 if ro_vim_item_update.get("vim_status"):
2185 update_dict[path_item + ".status"] = ro_vim_item_update[
2186 "vim_status"
2187 ]
2188
2189 if ro_vim_item_update.get("interfaces"):
2190 path_interfaces = path_item + ".interfaces"
2191
2192 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2193 if iface:
2194 update_dict.update(
2195 {
2196 path_interfaces + ".{}.".format(i) + k: v
2197 for k, v in iface.items()
2198 if k in ("vlan", "compute_node", "pci")
2199 }
2200 )
2201
2202 # put ip_address and mac_address with ip-address and mac-address
2203 if iface.get("ip_address"):
2204 update_dict[
2205 path_interfaces + ".{}.".format(i) + "ip-address"
2206 ] = iface["ip_address"]
2207
2208 if iface.get("mac_address"):
2209 update_dict[
2210 path_interfaces + ".{}.".format(i) + "mac-address"
2211 ] = iface["mac_address"]
2212
2213 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2214 update_dict["ip-address"] = iface.get("ip_address").split(
2215 ";"
2216 )[0]
2217
2218 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2219 update_dict[path_item + ".ip-address"] = iface.get(
2220 "ip_address"
2221 ).split(";")[0]
2222
2223 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2224 else:
2225 update_dict = {path_item + ".status": "DELETED"}
2226 self.db.set_one(
2227 table,
2228 q_filter={"_id": _id},
2229 update_dict=update_dict,
2230 unset={path_vim_status: None},
2231 )
2232
2233 def _process_delete_db_tasks(self):
2234 """
2235 Delete task from database because vnfrs or nsrs or both have been deleted
2236 :return: None. Uses and modify self.tasks_to_delete
2237 """
2238 while self.tasks_to_delete:
2239 task = self.tasks_to_delete[0]
2240 vnfrs_deleted = None
2241 nsr_id = task["nsr_id"]
2242
2243 if task["target_record"].startswith("vnfrs:"):
2244 # check if nsrs is present
2245 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2246 vnfrs_deleted = task["target_record"].split(":")[1]
2247
2248 try:
2249 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2250 except Exception as e:
2251 self.logger.error(
2252 "Error deleting task={}: {}".format(task["task_id"], e)
2253 )
2254 self.tasks_to_delete.pop(0)
2255
2256 @staticmethod
2257 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2258 """
2259 Static method because it is called from osm_ng_ro.ns
2260 :param db: instance of database to use
2261 :param nsr_id: affected nsrs id
2262 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2263 :return: None, exception is fails
2264 """
2265 retries = 5
2266 for retry in range(retries):
2267 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2268 now = time.time()
2269 conflict = False
2270
2271 for ro_task in ro_tasks:
2272 db_update = {}
2273 to_delete_ro_task = True
2274
2275 for index, task in enumerate(ro_task["tasks"]):
2276 if not task:
2277 pass
2278 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2279 vnfrs_deleted
2280 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2281 ):
2282 db_update["tasks.{}".format(index)] = None
2283 else:
2284 # used by other nsr, ro_task cannot be deleted
2285 to_delete_ro_task = False
2286
2287 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2288 if to_delete_ro_task:
2289 if not db.del_one(
2290 "ro_tasks",
2291 q_filter={
2292 "_id": ro_task["_id"],
2293 "modified_at": ro_task["modified_at"],
2294 },
2295 fail_on_empty=False,
2296 ):
2297 conflict = True
2298 elif db_update:
2299 db_update["modified_at"] = now
2300 if not db.set_one(
2301 "ro_tasks",
2302 q_filter={
2303 "_id": ro_task["_id"],
2304 "modified_at": ro_task["modified_at"],
2305 },
2306 update_dict=db_update,
2307 fail_on_empty=False,
2308 ):
2309 conflict = True
2310 if not conflict:
2311 return
2312 else:
2313 raise NsWorkerException("Exceeded {} retries".format(retries))
2314
2315 def run(self):
2316 # load database
2317 self.logger.info("Starting")
2318 while True:
2319 # step 1: get commands from queue
2320 try:
2321 if self.vim_targets:
2322 task = self.task_queue.get(block=False)
2323 else:
2324 if not self.idle:
2325 self.logger.debug("enters in idle state")
2326 self.idle = True
2327 task = self.task_queue.get(block=True)
2328 self.idle = False
2329
2330 if task[0] == "terminate":
2331 break
2332 elif task[0] == "load_vim":
2333 self.logger.info("order to load vim {}".format(task[1]))
2334 self._load_vim(task[1])
2335 elif task[0] == "unload_vim":
2336 self.logger.info("order to unload vim {}".format(task[1]))
2337 self._unload_vim(task[1])
2338 elif task[0] == "reload_vim":
2339 self._reload_vim(task[1])
2340 elif task[0] == "check_vim":
2341 self.logger.info("order to check vim {}".format(task[1]))
2342 self._check_vim(task[1])
2343 continue
2344 except Exception as e:
2345 if isinstance(e, queue.Empty):
2346 pass
2347 else:
2348 self.logger.critical(
2349 "Error processing task: {}".format(e), exc_info=True
2350 )
2351
2352 # step 2: process pending_tasks, delete not needed tasks
2353 try:
2354 if self.tasks_to_delete:
2355 self._process_delete_db_tasks()
2356 busy = False
2357 """
2358 # Log RO tasks only when loglevel is DEBUG
2359 if self.logger.getEffectiveLevel() == logging.DEBUG:
2360 _ = self._get_db_all_tasks()
2361 """
2362 ro_task = self._get_db_task()
2363 if ro_task:
2364 self._process_pending_tasks(ro_task)
2365 busy = True
2366 if not busy:
2367 time.sleep(5)
2368 except Exception as e:
2369 self.logger.critical(
2370 "Unexpected exception at run: " + str(e), exc_info=True
2371 )
2372
2373 self.logger.info("Finishing")