Fix bug 1902 to resolve issues with IETF L2VPN WIM connector
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 }
215 self.logger.debug(
216 "task={} {} new-net={} created={}".format(
217 task_id, ro_task["target_id"], vim_net_id, created
218 )
219 )
220
221 return "BUILD", ro_vim_item_update
222 except (vimconn.VimConnException, NsWorkerException) as e:
223 self.logger.error(
224 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
225 )
226 ro_vim_item_update = {
227 "vim_status": "VIM_ERROR",
228 "created": created,
229 "vim_details": str(e),
230 }
231
232 return "FAILED", ro_vim_item_update
233
234 def refresh(self, ro_task):
235 """Call VIM to get network status"""
236 ro_task_id = ro_task["_id"]
237 target_vim = self.my_vims[ro_task["target_id"]]
238 vim_id = ro_task["vim_info"]["vim_id"]
239 net_to_refresh_list = [vim_id]
240
241 try:
242 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
243 vim_info = vim_dict[vim_id]
244
245 if vim_info["status"] == "ACTIVE":
246 task_status = "DONE"
247 elif vim_info["status"] == "BUILD":
248 task_status = "BUILD"
249 else:
250 task_status = "FAILED"
251 except vimconn.VimConnException as e:
252 # Mark all tasks at VIM_ERROR status
253 self.logger.error(
254 "ro_task={} vim={} get-net={}: {}".format(
255 ro_task_id, ro_task["target_id"], vim_id, e
256 )
257 )
258 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
259 task_status = "FAILED"
260
261 ro_vim_item_update = {}
262 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
263 ro_vim_item_update["vim_status"] = vim_info["status"]
264
265 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
266 ro_vim_item_update["vim_name"] = vim_info.get("name")
267
268 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
269 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
270 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
271 elif vim_info["status"] == "DELETED":
272 ro_vim_item_update["vim_id"] = None
273 ro_vim_item_update["vim_details"] = "Deleted externally"
274 else:
275 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
276 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
277
278 if ro_vim_item_update:
279 self.logger.debug(
280 "ro_task={} {} get-net={}: status={} {}".format(
281 ro_task_id,
282 ro_task["target_id"],
283 vim_id,
284 ro_vim_item_update.get("vim_status"),
285 ro_vim_item_update.get("vim_details")
286 if ro_vim_item_update.get("vim_status") != "ACTIVE"
287 else "",
288 )
289 )
290
291 return task_status, ro_vim_item_update
292
293 def delete(self, ro_task, task_index):
294 task = ro_task["tasks"][task_index]
295 task_id = task["task_id"]
296 net_vim_id = ro_task["vim_info"]["vim_id"]
297 ro_vim_item_update_ok = {
298 "vim_status": "DELETED",
299 "created": False,
300 "vim_details": "DELETED",
301 "vim_id": None,
302 }
303
304 try:
305 if net_vim_id or ro_task["vim_info"]["created_items"]:
306 target_vim = self.my_vims[ro_task["target_id"]]
307 target_vim.delete_network(
308 net_vim_id, ro_task["vim_info"]["created_items"]
309 )
310 except vimconn.VimConnNotFoundException:
311 ro_vim_item_update_ok["vim_details"] = "already deleted"
312 except vimconn.VimConnException as e:
313 self.logger.error(
314 "ro_task={} vim={} del-net={}: {}".format(
315 ro_task["_id"], ro_task["target_id"], net_vim_id, e
316 )
317 )
318 ro_vim_item_update = {
319 "vim_status": "VIM_ERROR",
320 "vim_details": "Error while deleting: {}".format(e),
321 }
322
323 return "FAILED", ro_vim_item_update
324
325 self.logger.debug(
326 "task={} {} del-net={} {}".format(
327 task_id,
328 ro_task["target_id"],
329 net_vim_id,
330 ro_vim_item_update_ok.get("vim_details", ""),
331 )
332 )
333
334 return "DONE", ro_vim_item_update_ok
335
336
337 class VimInteractionVdu(VimInteractionBase):
338 max_retries_inject_ssh_key = 20 # 20 times
339 time_retries_inject_ssh_key = 30 # wevery 30 seconds
340
341 def new(self, ro_task, task_index, task_depends):
342 task = ro_task["tasks"][task_index]
343 task_id = task["task_id"]
344 created = False
345 created_items = {}
346 target_vim = self.my_vims[ro_task["target_id"]]
347
348 try:
349 created = True
350 params = task["params"]
351 params_copy = deepcopy(params)
352 net_list = params_copy["net_list"]
353
354 for net in net_list:
355 # change task_id into network_id
356 if "net_id" in net and net["net_id"].startswith("TASK-"):
357 network_id = task_depends[net["net_id"]]
358
359 if not network_id:
360 raise NsWorkerException(
361 "Cannot create VM because depends on a network not created or found "
362 "for {}".format(net["net_id"])
363 )
364
365 net["net_id"] = network_id
366
367 if params_copy["image_id"].startswith("TASK-"):
368 params_copy["image_id"] = task_depends[params_copy["image_id"]]
369
370 if params_copy["flavor_id"].startswith("TASK-"):
371 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
372
373 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
374 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
375
376 ro_vim_item_update = {
377 "vim_id": vim_vm_id,
378 "vim_status": "BUILD",
379 "created": created,
380 "created_items": created_items,
381 "vim_details": None,
382 "interfaces_vim_ids": interfaces,
383 "interfaces": [],
384 }
385 self.logger.debug(
386 "task={} {} new-vm={} created={}".format(
387 task_id, ro_task["target_id"], vim_vm_id, created
388 )
389 )
390
391 return "BUILD", ro_vim_item_update
392 except (vimconn.VimConnException, NsWorkerException) as e:
393 self.logger.error(
394 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
395 )
396 ro_vim_item_update = {
397 "vim_status": "VIM_ERROR",
398 "created": created,
399 "vim_details": str(e),
400 }
401
402 return "FAILED", ro_vim_item_update
403
404 def delete(self, ro_task, task_index):
405 task = ro_task["tasks"][task_index]
406 task_id = task["task_id"]
407 vm_vim_id = ro_task["vim_info"]["vim_id"]
408 ro_vim_item_update_ok = {
409 "vim_status": "DELETED",
410 "created": False,
411 "vim_details": "DELETED",
412 "vim_id": None,
413 }
414
415 try:
416 if vm_vim_id or ro_task["vim_info"]["created_items"]:
417 target_vim = self.my_vims[ro_task["target_id"]]
418 target_vim.delete_vminstance(
419 vm_vim_id, ro_task["vim_info"]["created_items"]
420 )
421 except vimconn.VimConnNotFoundException:
422 ro_vim_item_update_ok["vim_details"] = "already deleted"
423 except vimconn.VimConnException as e:
424 self.logger.error(
425 "ro_task={} vim={} del-vm={}: {}".format(
426 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
427 )
428 )
429 ro_vim_item_update = {
430 "vim_status": "VIM_ERROR",
431 "vim_details": "Error while deleting: {}".format(e),
432 }
433
434 return "FAILED", ro_vim_item_update
435
436 self.logger.debug(
437 "task={} {} del-vm={} {}".format(
438 task_id,
439 ro_task["target_id"],
440 vm_vim_id,
441 ro_vim_item_update_ok.get("vim_details", ""),
442 )
443 )
444
445 return "DONE", ro_vim_item_update_ok
446
447 def refresh(self, ro_task):
448 """Call VIM to get vm status"""
449 ro_task_id = ro_task["_id"]
450 target_vim = self.my_vims[ro_task["target_id"]]
451 vim_id = ro_task["vim_info"]["vim_id"]
452
453 if not vim_id:
454 return None, None
455
456 vm_to_refresh_list = [vim_id]
457 try:
458 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
459 vim_info = vim_dict[vim_id]
460
461 if vim_info["status"] == "ACTIVE":
462 task_status = "DONE"
463 elif vim_info["status"] == "BUILD":
464 task_status = "BUILD"
465 else:
466 task_status = "FAILED"
467
468 # try to load and parse vim_information
469 try:
470 vim_info_info = yaml.safe_load(vim_info["vim_info"])
471 if vim_info_info.get("name"):
472 vim_info["name"] = vim_info_info["name"]
473 except Exception:
474 pass
475 except vimconn.VimConnException as e:
476 # Mark all tasks at VIM_ERROR status
477 self.logger.error(
478 "ro_task={} vim={} get-vm={}: {}".format(
479 ro_task_id, ro_task["target_id"], vim_id, e
480 )
481 )
482 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
483 task_status = "FAILED"
484
485 ro_vim_item_update = {}
486
487 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
488 vim_interfaces = []
489 if vim_info.get("interfaces"):
490 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
491 iface = next(
492 (
493 iface
494 for iface in vim_info["interfaces"]
495 if vim_iface_id == iface["vim_interface_id"]
496 ),
497 None,
498 )
499 # if iface:
500 # iface.pop("vim_info", None)
501 vim_interfaces.append(iface)
502
503 task_create = next(
504 t
505 for t in ro_task["tasks"]
506 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
507 )
508 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
509 vim_interfaces[task_create["mgmt_vnf_interface"]][
510 "mgmt_vnf_interface"
511 ] = True
512
513 mgmt_vdu_iface = task_create.get(
514 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
515 )
516 if vim_interfaces:
517 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
518
519 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
520 ro_vim_item_update["interfaces"] = vim_interfaces
521
522 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
523 ro_vim_item_update["vim_status"] = vim_info["status"]
524
525 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
526 ro_vim_item_update["vim_name"] = vim_info.get("name")
527
528 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
529 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
530 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
531 elif vim_info["status"] == "DELETED":
532 ro_vim_item_update["vim_id"] = None
533 ro_vim_item_update["vim_details"] = "Deleted externally"
534 else:
535 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
536 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
537
538 if ro_vim_item_update:
539 self.logger.debug(
540 "ro_task={} {} get-vm={}: status={} {}".format(
541 ro_task_id,
542 ro_task["target_id"],
543 vim_id,
544 ro_vim_item_update.get("vim_status"),
545 ro_vim_item_update.get("vim_details")
546 if ro_vim_item_update.get("vim_status") != "ACTIVE"
547 else "",
548 )
549 )
550
551 return task_status, ro_vim_item_update
552
553 def exec(self, ro_task, task_index, task_depends):
554 task = ro_task["tasks"][task_index]
555 task_id = task["task_id"]
556 target_vim = self.my_vims[ro_task["target_id"]]
557 db_task_update = {"retries": 0}
558 retries = task.get("retries", 0)
559
560 try:
561 params = task["params"]
562 params_copy = deepcopy(params)
563 params_copy["ro_key"] = self.db.decrypt(
564 params_copy.pop("private_key"),
565 params_copy.pop("schema_version"),
566 params_copy.pop("salt"),
567 )
568 params_copy["ip_addr"] = params_copy.pop("ip_address")
569 target_vim.inject_user_key(**params_copy)
570 self.logger.debug(
571 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
572 )
573
574 return (
575 "DONE",
576 None,
577 db_task_update,
578 ) # params_copy["key"]
579 except (vimconn.VimConnException, NsWorkerException) as e:
580 retries += 1
581
582 if retries < self.max_retries_inject_ssh_key:
583 return (
584 "BUILD",
585 None,
586 {
587 "retries": retries,
588 "next_retry": self.time_retries_inject_ssh_key,
589 },
590 )
591
592 self.logger.error(
593 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
594 )
595 ro_vim_item_update = {"vim_details": str(e)}
596
597 return "FAILED", ro_vim_item_update, db_task_update
598
599
600 class VimInteractionImage(VimInteractionBase):
601 def new(self, ro_task, task_index, task_depends):
602 task = ro_task["tasks"][task_index]
603 task_id = task["task_id"]
604 created = False
605 created_items = {}
606 target_vim = self.my_vims[ro_task["target_id"]]
607
608 try:
609 # FIND
610 if task.get("find_params"):
611 vim_images = target_vim.get_image_list(**task["find_params"])
612
613 if not vim_images:
614 raise NsWorkerExceptionNotFound(
615 "Image not found with this criteria: '{}'".format(
616 task["find_params"]
617 )
618 )
619 elif len(vim_images) > 1:
620 raise NsWorkerException(
621 "More than one image found with this criteria: '{}'".format(
622 task["find_params"]
623 )
624 )
625 else:
626 vim_image_id = vim_images[0]["id"]
627
628 ro_vim_item_update = {
629 "vim_id": vim_image_id,
630 "vim_status": "DONE",
631 "created": created,
632 "created_items": created_items,
633 "vim_details": None,
634 }
635 self.logger.debug(
636 "task={} {} new-image={} created={}".format(
637 task_id, ro_task["target_id"], vim_image_id, created
638 )
639 )
640
641 return "DONE", ro_vim_item_update
642 except (NsWorkerException, vimconn.VimConnException) as e:
643 self.logger.error(
644 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
645 )
646 ro_vim_item_update = {
647 "vim_status": "VIM_ERROR",
648 "created": created,
649 "vim_details": str(e),
650 }
651
652 return "FAILED", ro_vim_item_update
653
654
655 class VimInteractionFlavor(VimInteractionBase):
656 def delete(self, ro_task, task_index):
657 task = ro_task["tasks"][task_index]
658 task_id = task["task_id"]
659 flavor_vim_id = ro_task["vim_info"]["vim_id"]
660 ro_vim_item_update_ok = {
661 "vim_status": "DELETED",
662 "created": False,
663 "vim_details": "DELETED",
664 "vim_id": None,
665 }
666
667 try:
668 if flavor_vim_id:
669 target_vim = self.my_vims[ro_task["target_id"]]
670 target_vim.delete_flavor(flavor_vim_id)
671 except vimconn.VimConnNotFoundException:
672 ro_vim_item_update_ok["vim_details"] = "already deleted"
673 except vimconn.VimConnException as e:
674 self.logger.error(
675 "ro_task={} vim={} del-flavor={}: {}".format(
676 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
677 )
678 )
679 ro_vim_item_update = {
680 "vim_status": "VIM_ERROR",
681 "vim_details": "Error while deleting: {}".format(e),
682 }
683
684 return "FAILED", ro_vim_item_update
685
686 self.logger.debug(
687 "task={} {} del-flavor={} {}".format(
688 task_id,
689 ro_task["target_id"],
690 flavor_vim_id,
691 ro_vim_item_update_ok.get("vim_details", ""),
692 )
693 )
694
695 return "DONE", ro_vim_item_update_ok
696
697 def new(self, ro_task, task_index, task_depends):
698 task = ro_task["tasks"][task_index]
699 task_id = task["task_id"]
700 created = False
701 created_items = {}
702 target_vim = self.my_vims[ro_task["target_id"]]
703
704 try:
705 # FIND
706 vim_flavor_id = None
707
708 if task.get("find_params"):
709 try:
710 flavor_data = task["find_params"]["flavor_data"]
711 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
712 except vimconn.VimConnNotFoundException:
713 pass
714
715 if not vim_flavor_id and task.get("params"):
716 # CREATE
717 flavor_data = task["params"]["flavor_data"]
718 vim_flavor_id = target_vim.new_flavor(flavor_data)
719 created = True
720
721 ro_vim_item_update = {
722 "vim_id": vim_flavor_id,
723 "vim_status": "DONE",
724 "created": created,
725 "created_items": created_items,
726 "vim_details": None,
727 }
728 self.logger.debug(
729 "task={} {} new-flavor={} created={}".format(
730 task_id, ro_task["target_id"], vim_flavor_id, created
731 )
732 )
733
734 return "DONE", ro_vim_item_update
735 except (vimconn.VimConnException, NsWorkerException) as e:
736 self.logger.error(
737 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
738 )
739 ro_vim_item_update = {
740 "vim_status": "VIM_ERROR",
741 "created": created,
742 "vim_details": str(e),
743 }
744
745 return "FAILED", ro_vim_item_update
746
747
748 class VimInteractionSdnNet(VimInteractionBase):
749 @staticmethod
750 def _match_pci(port_pci, mapping):
751 """
752 Check if port_pci matches with mapping
753 mapping can have brackets to indicate that several chars are accepted. e.g
754 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
755 :param port_pci: text
756 :param mapping: text, can contain brackets to indicate several chars are available
757 :return: True if matches, False otherwise
758 """
759 if not port_pci or not mapping:
760 return False
761 if port_pci == mapping:
762 return True
763
764 mapping_index = 0
765 pci_index = 0
766 while True:
767 bracket_start = mapping.find("[", mapping_index)
768
769 if bracket_start == -1:
770 break
771
772 bracket_end = mapping.find("]", bracket_start)
773 if bracket_end == -1:
774 break
775
776 length = bracket_start - mapping_index
777 if (
778 length
779 and port_pci[pci_index : pci_index + length]
780 != mapping[mapping_index:bracket_start]
781 ):
782 return False
783
784 if (
785 port_pci[pci_index + length]
786 not in mapping[bracket_start + 1 : bracket_end]
787 ):
788 return False
789
790 pci_index += length + 1
791 mapping_index = bracket_end + 1
792
793 if port_pci[pci_index:] != mapping[mapping_index:]:
794 return False
795
796 return True
797
798 def _get_interfaces(self, vlds_to_connect, vim_account_id):
799 """
800 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
801 :param vim_account_id:
802 :return:
803 """
804 interfaces = []
805
806 for vld in vlds_to_connect:
807 table, _, db_id = vld.partition(":")
808 db_id, _, vld = db_id.partition(":")
809 _, _, vld_id = vld.partition(".")
810
811 if table == "vnfrs":
812 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
813 iface_key = "vnf-vld-id"
814 else: # table == "nsrs"
815 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
816 iface_key = "ns-vld-id"
817
818 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
819
820 for db_vnfr in db_vnfrs:
821 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
822 for iface_index, interface in enumerate(vdur["interfaces"]):
823 if interface.get(iface_key) == vld_id and interface.get(
824 "type"
825 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
826 # only SR-IOV o PT
827 interface_ = interface.copy()
828 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
829 db_vnfr["_id"], vdu_index, iface_index
830 )
831
832 if vdur.get("status") == "ERROR":
833 interface_["status"] = "ERROR"
834
835 interfaces.append(interface_)
836
837 return interfaces
838
839 def refresh(self, ro_task):
840 # look for task create
841 task_create_index, _ = next(
842 i_t
843 for i_t in enumerate(ro_task["tasks"])
844 if i_t[1]
845 and i_t[1]["action"] == "CREATE"
846 and i_t[1]["status"] != "FINISHED"
847 )
848
849 return self.new(ro_task, task_create_index, None)
850
851 def new(self, ro_task, task_index, task_depends):
852
853 task = ro_task["tasks"][task_index]
854 task_id = task["task_id"]
855 target_vim = self.my_vims[ro_task["target_id"]]
856
857 sdn_net_id = ro_task["vim_info"]["vim_id"]
858
859 created_items = ro_task["vim_info"].get("created_items")
860 connected_ports = ro_task["vim_info"].get("connected_ports", [])
861 new_connected_ports = []
862 last_update = ro_task["vim_info"].get("last_update", 0)
863 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
864 error_list = []
865 created = ro_task["vim_info"].get("created", False)
866
867 try:
868 # CREATE
869 params = task["params"]
870 vlds_to_connect = params.get("vlds", [])
871 associated_vim = params.get("target_vim")
872 # external additional ports
873 additional_ports = params.get("sdn-ports") or ()
874 _, _, vim_account_id = (
875 (None, None, None)
876 if associated_vim is None
877 else associated_vim.partition(":")
878 )
879
880 if associated_vim:
881 # get associated VIM
882 if associated_vim not in self.db_vims:
883 self.db_vims[associated_vim] = self.db.get_one(
884 "vim_accounts", {"_id": vim_account_id}
885 )
886
887 db_vim = self.db_vims[associated_vim]
888
889 # look for ports to connect
890 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
891 # print(ports)
892
893 sdn_ports = []
894 pending_ports = error_ports = 0
895 vlan_used = None
896 sdn_need_update = False
897
898 for port in ports:
899 vlan_used = port.get("vlan") or vlan_used
900
901 # TODO. Do not connect if already done
902 if not port.get("compute_node") or not port.get("pci"):
903 if port.get("status") == "ERROR":
904 error_ports += 1
905 else:
906 pending_ports += 1
907 continue
908
909 pmap = None
910 compute_node_mappings = next(
911 (
912 c
913 for c in db_vim["config"].get("sdn-port-mapping", ())
914 if c and c["compute_node"] == port["compute_node"]
915 ),
916 None,
917 )
918
919 if compute_node_mappings:
920 # process port_mapping pci of type 0000:af:1[01].[1357]
921 pmap = next(
922 (
923 p
924 for p in compute_node_mappings["ports"]
925 if self._match_pci(port["pci"], p.get("pci"))
926 ),
927 None,
928 )
929
930 if not pmap:
931 if not db_vim["config"].get("mapping_not_needed"):
932 error_list.append(
933 "Port mapping not found for compute_node={} pci={}".format(
934 port["compute_node"], port["pci"]
935 )
936 )
937 continue
938
939 pmap = {}
940
941 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
942 new_port = {
943 "service_endpoint_id": pmap.get("service_endpoint_id")
944 or service_endpoint_id,
945 "service_endpoint_encapsulation_type": "dot1q"
946 if port["type"] == "SR-IOV"
947 else None,
948 "service_endpoint_encapsulation_info": {
949 "vlan": port.get("vlan"),
950 "mac": port.get("mac-address"),
951 "device_id": pmap.get("device_id") or port["compute_node"],
952 "device_interface_id": pmap.get("device_interface_id")
953 or port["pci"],
954 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
955 "switch_port": pmap.get("switch_port"),
956 "service_mapping_info": pmap.get("service_mapping_info"),
957 },
958 }
959
960 # TODO
961 # if port["modified_at"] > last_update:
962 # sdn_need_update = True
963 new_connected_ports.append(port["id"]) # TODO
964 sdn_ports.append(new_port)
965
966 if error_ports:
967 error_list.append(
968 "{} interfaces have not been created as VDU is on ERROR status".format(
969 error_ports
970 )
971 )
972
973 # connect external ports
974 for index, additional_port in enumerate(additional_ports):
975 additional_port_id = additional_port.get(
976 "service_endpoint_id"
977 ) or "external-{}".format(index)
978 sdn_ports.append(
979 {
980 "service_endpoint_id": additional_port_id,
981 "service_endpoint_encapsulation_type": additional_port.get(
982 "service_endpoint_encapsulation_type", "dot1q"
983 ),
984 "service_endpoint_encapsulation_info": {
985 "vlan": additional_port.get("vlan") or vlan_used,
986 "mac": additional_port.get("mac_address"),
987 "device_id": additional_port.get("device_id"),
988 "device_interface_id": additional_port.get(
989 "device_interface_id"
990 ),
991 "switch_dpid": additional_port.get("switch_dpid")
992 or additional_port.get("switch_id"),
993 "switch_port": additional_port.get("switch_port"),
994 "service_mapping_info": additional_port.get(
995 "service_mapping_info"
996 ),
997 },
998 }
999 )
1000 new_connected_ports.append(additional_port_id)
1001 sdn_info = ""
1002
1003 # if there are more ports to connect or they have been modified, call create/update
1004 if error_list:
1005 sdn_status = "ERROR"
1006 sdn_info = "; ".join(error_list)
1007 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1008 last_update = time.time()
1009
1010 if not sdn_net_id:
1011 if len(sdn_ports) < 2:
1012 sdn_status = "ACTIVE"
1013
1014 if not pending_ports:
1015 self.logger.debug(
1016 "task={} {} new-sdn-net done, less than 2 ports".format(
1017 task_id, ro_task["target_id"]
1018 )
1019 )
1020 else:
1021 net_type = params.get("type") or "ELAN"
1022 (
1023 sdn_net_id,
1024 created_items,
1025 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1026 created = True
1027 self.logger.debug(
1028 "task={} {} new-sdn-net={} created={}".format(
1029 task_id, ro_task["target_id"], sdn_net_id, created
1030 )
1031 )
1032 else:
1033 created_items = target_vim.edit_connectivity_service(
1034 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1035 )
1036 created = True
1037 self.logger.debug(
1038 "task={} {} update-sdn-net={} created={}".format(
1039 task_id, ro_task["target_id"], sdn_net_id, created
1040 )
1041 )
1042
1043 connected_ports = new_connected_ports
1044 elif sdn_net_id:
1045 wim_status_dict = target_vim.get_connectivity_service_status(
1046 sdn_net_id, conn_info=created_items
1047 )
1048 sdn_status = wim_status_dict["sdn_status"]
1049
1050 if wim_status_dict.get("sdn_info"):
1051 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1052
1053 if wim_status_dict.get("error_msg"):
1054 sdn_info = wim_status_dict.get("error_msg") or ""
1055
1056 if pending_ports:
1057 if sdn_status != "ERROR":
1058 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1059 len(ports) - pending_ports, len(ports)
1060 )
1061
1062 if sdn_status == "ACTIVE":
1063 sdn_status = "BUILD"
1064
1065 ro_vim_item_update = {
1066 "vim_id": sdn_net_id,
1067 "vim_status": sdn_status,
1068 "created": created,
1069 "created_items": created_items,
1070 "connected_ports": connected_ports,
1071 "vim_details": sdn_info,
1072 "last_update": last_update,
1073 }
1074
1075 return sdn_status, ro_vim_item_update
1076 except Exception as e:
1077 self.logger.error(
1078 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1079 exc_info=not isinstance(
1080 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1081 ),
1082 )
1083 ro_vim_item_update = {
1084 "vim_status": "VIM_ERROR",
1085 "created": created,
1086 "vim_details": str(e),
1087 }
1088
1089 return "FAILED", ro_vim_item_update
1090
1091 def delete(self, ro_task, task_index):
1092 task = ro_task["tasks"][task_index]
1093 task_id = task["task_id"]
1094 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1095 ro_vim_item_update_ok = {
1096 "vim_status": "DELETED",
1097 "created": False,
1098 "vim_details": "DELETED",
1099 "vim_id": None,
1100 }
1101
1102 try:
1103 if sdn_vim_id:
1104 target_vim = self.my_vims[ro_task["target_id"]]
1105 target_vim.delete_connectivity_service(
1106 sdn_vim_id, ro_task["vim_info"].get("created_items")
1107 )
1108
1109 except Exception as e:
1110 if (
1111 isinstance(e, sdnconn.SdnConnectorError)
1112 and e.http_code == HTTPStatus.NOT_FOUND.value
1113 ):
1114 ro_vim_item_update_ok["vim_details"] = "already deleted"
1115 else:
1116 self.logger.error(
1117 "ro_task={} vim={} del-sdn-net={}: {}".format(
1118 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1119 ),
1120 exc_info=not isinstance(
1121 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1122 ),
1123 )
1124 ro_vim_item_update = {
1125 "vim_status": "VIM_ERROR",
1126 "vim_details": "Error while deleting: {}".format(e),
1127 }
1128
1129 return "FAILED", ro_vim_item_update
1130
1131 self.logger.debug(
1132 "task={} {} del-sdn-net={} {}".format(
1133 task_id,
1134 ro_task["target_id"],
1135 sdn_vim_id,
1136 ro_vim_item_update_ok.get("vim_details", ""),
1137 )
1138 )
1139
1140 return "DONE", ro_vim_item_update_ok
1141
1142
1143 class NsWorker(threading.Thread):
1144 REFRESH_BUILD = 5 # 5 seconds
1145 REFRESH_ACTIVE = 60 # 1 minute
1146 REFRESH_ERROR = 600
1147 REFRESH_IMAGE = 3600 * 10
1148 REFRESH_DELETE = 3600 * 10
1149 QUEUE_SIZE = 100
1150 terminate = False
1151
1152 def __init__(self, worker_index, config, plugins, db):
1153 """
1154
1155 :param worker_index: thread index
1156 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1157 :param plugins: global shared dict with the loaded plugins
1158 :param db: database class instance to use
1159 """
1160 threading.Thread.__init__(self)
1161 self.config = config
1162 self.plugins = plugins
1163 self.plugin_name = "unknown"
1164 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1165 self.worker_index = worker_index
1166 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1167 # targetvim: vimplugin class
1168 self.my_vims = {}
1169 # targetvim: vim information from database
1170 self.db_vims = {}
1171 # targetvim list
1172 self.vim_targets = []
1173 self.my_id = config["process_id"] + ":" + str(worker_index)
1174 self.db = db
1175 self.item2class = {
1176 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1177 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1178 "image": VimInteractionImage(
1179 self.db, self.my_vims, self.db_vims, self.logger
1180 ),
1181 "flavor": VimInteractionFlavor(
1182 self.db, self.my_vims, self.db_vims, self.logger
1183 ),
1184 "sdn_net": VimInteractionSdnNet(
1185 self.db, self.my_vims, self.db_vims, self.logger
1186 ),
1187 }
1188 self.time_last_task_processed = None
1189 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1190 self.tasks_to_delete = []
1191 # it is idle when there are not vim_targets associated
1192 self.idle = True
1193 self.task_locked_time = config["global"]["task_locked_time"]
1194
1195 def insert_task(self, task):
1196 try:
1197 self.task_queue.put(task, False)
1198 return None
1199 except queue.Full:
1200 raise NsWorkerException("timeout inserting a task")
1201
1202 def terminate(self):
1203 self.insert_task("exit")
1204
1205 def del_task(self, task):
1206 with self.task_lock:
1207 if task["status"] == "SCHEDULED":
1208 task["status"] = "SUPERSEDED"
1209 return True
1210 else: # task["status"] == "processing"
1211 self.task_lock.release()
1212 return False
1213
1214 def _process_vim_config(self, target_id, db_vim):
1215 """
1216 Process vim config, creating vim configuration files as ca_cert
1217 :param target_id: vim/sdn/wim + id
1218 :param db_vim: Vim dictionary obtained from database
1219 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1220 """
1221 if not db_vim.get("config"):
1222 return
1223
1224 file_name = ""
1225
1226 try:
1227 if db_vim["config"].get("ca_cert_content"):
1228 file_name = "{}:{}".format(target_id, self.worker_index)
1229
1230 try:
1231 mkdir(file_name)
1232 except FileExistsError:
1233 pass
1234
1235 file_name = file_name + "/ca_cert"
1236
1237 with open(file_name, "w") as f:
1238 f.write(db_vim["config"]["ca_cert_content"])
1239 del db_vim["config"]["ca_cert_content"]
1240 db_vim["config"]["ca_cert"] = file_name
1241 except Exception as e:
1242 raise NsWorkerException(
1243 "Error writing to file '{}': {}".format(file_name, e)
1244 )
1245
1246 def _load_plugin(self, name, type="vim"):
1247 # type can be vim or sdn
1248 if "rovim_dummy" not in self.plugins:
1249 self.plugins["rovim_dummy"] = VimDummyConnector
1250
1251 if "rosdn_dummy" not in self.plugins:
1252 self.plugins["rosdn_dummy"] = SdnDummyConnector
1253
1254 if name in self.plugins:
1255 return self.plugins[name]
1256
1257 try:
1258 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1259 self.plugins[name] = ep.load()
1260 except Exception as e:
1261 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1262
1263 if name and name not in self.plugins:
1264 raise NsWorkerException(
1265 "Plugin 'osm_{n}' has not been installed".format(n=name)
1266 )
1267
1268 return self.plugins[name]
1269
1270 def _unload_vim(self, target_id):
1271 """
1272 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1273 :param target_id: Contains type:_id; where type can be 'vim', ...
1274 :return: None.
1275 """
1276 try:
1277 self.db_vims.pop(target_id, None)
1278 self.my_vims.pop(target_id, None)
1279
1280 if target_id in self.vim_targets:
1281 self.vim_targets.remove(target_id)
1282
1283 self.logger.info("Unloaded {}".format(target_id))
1284 rmtree("{}:{}".format(target_id, self.worker_index))
1285 except FileNotFoundError:
1286 pass # this is raised by rmtree if folder does not exist
1287 except Exception as e:
1288 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1289
1290 def _check_vim(self, target_id):
1291 """
1292 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1293 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1294 :return: None.
1295 """
1296 target, _, _id = target_id.partition(":")
1297 now = time.time()
1298 update_dict = {}
1299 unset_dict = {}
1300 op_text = ""
1301 step = ""
1302 loaded = target_id in self.vim_targets
1303 target_database = (
1304 "vim_accounts"
1305 if target == "vim"
1306 else "wim_accounts"
1307 if target == "wim"
1308 else "sdns"
1309 )
1310
1311 try:
1312 step = "Getting {} from db".format(target_id)
1313 db_vim = self.db.get_one(target_database, {"_id": _id})
1314
1315 for op_index, operation in enumerate(
1316 db_vim["_admin"].get("operations", ())
1317 ):
1318 if operation["operationState"] != "PROCESSING":
1319 continue
1320
1321 locked_at = operation.get("locked_at")
1322
1323 if locked_at is not None and locked_at >= now - self.task_locked_time:
1324 # some other thread is doing this operation
1325 return
1326
1327 # lock
1328 op_text = "_admin.operations.{}.".format(op_index)
1329
1330 if not self.db.set_one(
1331 target_database,
1332 q_filter={
1333 "_id": _id,
1334 op_text + "operationState": "PROCESSING",
1335 op_text + "locked_at": locked_at,
1336 },
1337 update_dict={
1338 op_text + "locked_at": now,
1339 "admin.current_operation": op_index,
1340 },
1341 fail_on_empty=False,
1342 ):
1343 return
1344
1345 unset_dict[op_text + "locked_at"] = None
1346 unset_dict["current_operation"] = None
1347 step = "Loading " + target_id
1348 error_text = self._load_vim(target_id)
1349
1350 if not error_text:
1351 step = "Checking connectivity"
1352
1353 if target == "vim":
1354 self.my_vims[target_id].check_vim_connectivity()
1355 else:
1356 self.my_vims[target_id].check_credentials()
1357
1358 update_dict["_admin.operationalState"] = "ENABLED"
1359 update_dict["_admin.detailed-status"] = ""
1360 unset_dict[op_text + "detailed-status"] = None
1361 update_dict[op_text + "operationState"] = "COMPLETED"
1362
1363 return
1364
1365 except Exception as e:
1366 error_text = "{}: {}".format(step, e)
1367 self.logger.error("{} for {}: {}".format(step, target_id, e))
1368
1369 finally:
1370 if update_dict or unset_dict:
1371 if error_text:
1372 update_dict[op_text + "operationState"] = "FAILED"
1373 update_dict[op_text + "detailed-status"] = error_text
1374 unset_dict.pop(op_text + "detailed-status", None)
1375 update_dict["_admin.operationalState"] = "ERROR"
1376 update_dict["_admin.detailed-status"] = error_text
1377
1378 if op_text:
1379 update_dict[op_text + "statusEnteredTime"] = now
1380
1381 self.db.set_one(
1382 target_database,
1383 q_filter={"_id": _id},
1384 update_dict=update_dict,
1385 unset=unset_dict,
1386 fail_on_empty=False,
1387 )
1388
1389 if not loaded:
1390 self._unload_vim(target_id)
1391
1392 def _reload_vim(self, target_id):
1393 if target_id in self.vim_targets:
1394 self._load_vim(target_id)
1395 else:
1396 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1397 # just remove it to force load again next time it is needed
1398 self.db_vims.pop(target_id, None)
1399
1400 def _load_vim(self, target_id):
1401 """
1402 Load or reload a vim_account, sdn_controller or wim_account.
1403 Read content from database, load the plugin if not loaded.
1404 In case of error loading the plugin, it load a failing VIM_connector
1405 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1406 :param target_id: Contains type:_id; where type can be 'vim', ...
1407 :return: None if ok, descriptive text if error
1408 """
1409 target, _, _id = target_id.partition(":")
1410 target_database = (
1411 "vim_accounts"
1412 if target == "vim"
1413 else "wim_accounts"
1414 if target == "wim"
1415 else "sdns"
1416 )
1417 plugin_name = ""
1418 vim = None
1419
1420 try:
1421 step = "Getting {}={} from db".format(target, _id)
1422 # TODO process for wim, sdnc, ...
1423 vim = self.db.get_one(target_database, {"_id": _id})
1424
1425 # if deep_get(vim, "config", "sdn-controller"):
1426 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1427 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1428
1429 step = "Decrypting password"
1430 schema_version = vim.get("schema_version")
1431 self.db.encrypt_decrypt_fields(
1432 vim,
1433 "decrypt",
1434 fields=("password", "secret"),
1435 schema_version=schema_version,
1436 salt=_id,
1437 )
1438 self._process_vim_config(target_id, vim)
1439
1440 if target == "vim":
1441 plugin_name = "rovim_" + vim["vim_type"]
1442 step = "Loading plugin '{}'".format(plugin_name)
1443 vim_module_conn = self._load_plugin(plugin_name)
1444 step = "Loading {}'".format(target_id)
1445 self.my_vims[target_id] = vim_module_conn(
1446 uuid=vim["_id"],
1447 name=vim["name"],
1448 tenant_id=vim.get("vim_tenant_id"),
1449 tenant_name=vim.get("vim_tenant_name"),
1450 url=vim["vim_url"],
1451 url_admin=None,
1452 user=vim["vim_user"],
1453 passwd=vim["vim_password"],
1454 config=vim.get("config") or {},
1455 persistent_info={},
1456 )
1457 else: # sdn
1458 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1459 step = "Loading plugin '{}'".format(plugin_name)
1460 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1461 step = "Loading {}'".format(target_id)
1462 wim = deepcopy(vim)
1463 wim_config = wim.pop("config", {}) or {}
1464 wim["uuid"] = wim["_id"]
1465 if "url" in wim and "wim_url" not in wim:
1466 wim["wim_url"] = wim["url"]
1467 elif "url" not in wim and "wim_url" in wim:
1468 wim["url"] = wim["wim_url"]
1469
1470 if wim.get("dpid"):
1471 wim_config["dpid"] = wim.pop("dpid")
1472
1473 if wim.get("switch_id"):
1474 wim_config["switch_id"] = wim.pop("switch_id")
1475
1476 # wim, wim_account, config
1477 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1478 self.db_vims[target_id] = vim
1479 self.error_status = None
1480
1481 self.logger.info(
1482 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1483 )
1484 except Exception as e:
1485 self.logger.error(
1486 "Cannot load {} plugin={}: {} {}".format(
1487 target_id, plugin_name, step, e
1488 )
1489 )
1490
1491 self.db_vims[target_id] = vim or {}
1492 self.db_vims[target_id] = FailingConnector(str(e))
1493 error_status = "{} Error: {}".format(step, e)
1494
1495 return error_status
1496 finally:
1497 if target_id not in self.vim_targets:
1498 self.vim_targets.append(target_id)
1499
1500 def _get_db_task(self):
1501 """
1502 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1503 :return: None
1504 """
1505 now = time.time()
1506
1507 if not self.time_last_task_processed:
1508 self.time_last_task_processed = now
1509
1510 try:
1511 while True:
1512 """
1513 # Log RO tasks only when loglevel is DEBUG
1514 if self.logger.getEffectiveLevel() == logging.DEBUG:
1515 self._log_ro_task(
1516 None,
1517 None,
1518 None,
1519 "TASK_WF",
1520 "task_locked_time="
1521 + str(self.task_locked_time)
1522 + " "
1523 + "time_last_task_processed="
1524 + str(self.time_last_task_processed)
1525 + " "
1526 + "now="
1527 + str(now),
1528 )
1529 """
1530 locked = self.db.set_one(
1531 "ro_tasks",
1532 q_filter={
1533 "target_id": self.vim_targets,
1534 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1535 "locked_at.lt": now - self.task_locked_time,
1536 "to_check_at.lt": self.time_last_task_processed,
1537 },
1538 update_dict={"locked_by": self.my_id, "locked_at": now},
1539 fail_on_empty=False,
1540 )
1541
1542 if locked:
1543 # read and return
1544 ro_task = self.db.get_one(
1545 "ro_tasks",
1546 q_filter={
1547 "target_id": self.vim_targets,
1548 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1549 "locked_at": now,
1550 },
1551 )
1552 return ro_task
1553
1554 if self.time_last_task_processed == now:
1555 self.time_last_task_processed = None
1556 return None
1557 else:
1558 self.time_last_task_processed = now
1559 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1560
1561 except DbException as e:
1562 self.logger.error("Database exception at _get_db_task: {}".format(e))
1563 except Exception as e:
1564 self.logger.critical(
1565 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1566 )
1567
1568 return None
1569
1570 def _get_db_all_tasks(self):
1571 """
1572 Read all content of table ro_tasks to log it
1573 :return: None
1574 """
1575 try:
1576 # Checking the content of the BD:
1577
1578 # read and return
1579 ro_task = self.db.get_list("ro_tasks")
1580 for rt in ro_task:
1581 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1582 return ro_task
1583
1584 except DbException as e:
1585 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1586 except Exception as e:
1587 self.logger.critical(
1588 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1589 )
1590
1591 return None
1592
1593 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1594 """
1595 Generate a log with the following format:
1596
1597 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1598 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1599 task_array_index;task_id;task_action;task_item;task_args
1600
1601 Example:
1602
1603 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1604 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1605 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1606 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1607 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1608 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1609 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1610 """
1611 try:
1612 line = []
1613 i = 0
1614 if ro_task is not None and isinstance(ro_task, dict):
1615 for t in ro_task["tasks"]:
1616 line.clear()
1617 line.append(mark)
1618 line.append(event)
1619 line.append(ro_task.get("_id", ""))
1620 line.append(str(ro_task.get("locked_at", "")))
1621 line.append(str(ro_task.get("modified_at", "")))
1622 line.append(str(ro_task.get("created_at", "")))
1623 line.append(str(ro_task.get("to_check_at", "")))
1624 line.append(str(ro_task.get("locked_by", "")))
1625 line.append(str(ro_task.get("target_id", "")))
1626 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1627 line.append(str(ro_task.get("vim_info", "")))
1628 line.append(str(ro_task.get("tasks", "")))
1629 if isinstance(t, dict):
1630 line.append(str(t.get("status", "")))
1631 line.append(str(t.get("action_id", "")))
1632 line.append(str(i))
1633 line.append(str(t.get("task_id", "")))
1634 line.append(str(t.get("action", "")))
1635 line.append(str(t.get("item", "")))
1636 line.append(str(t.get("find_params", "")))
1637 line.append(str(t.get("params", "")))
1638 else:
1639 line.extend([""] * 2)
1640 line.append(str(i))
1641 line.extend([""] * 5)
1642
1643 i += 1
1644 self.logger.debug(";".join(line))
1645 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1646 i = 0
1647 while True:
1648 st = "tasks.{}.status".format(i)
1649 if st not in db_ro_task_update:
1650 break
1651 line.clear()
1652 line.append(mark)
1653 line.append(event)
1654 line.append(db_ro_task_update.get("_id", ""))
1655 line.append(str(db_ro_task_update.get("locked_at", "")))
1656 line.append(str(db_ro_task_update.get("modified_at", "")))
1657 line.append("")
1658 line.append(str(db_ro_task_update.get("to_check_at", "")))
1659 line.append(str(db_ro_task_update.get("locked_by", "")))
1660 line.append("")
1661 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1662 line.append("")
1663 line.append(str(db_ro_task_update.get("vim_info", "")))
1664 line.append(str(str(db_ro_task_update).count(".status")))
1665 line.append(db_ro_task_update.get(st, ""))
1666 line.append("")
1667 line.append(str(i))
1668 line.extend([""] * 3)
1669 i += 1
1670 self.logger.debug(";".join(line))
1671
1672 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1673 line.clear()
1674 line.append(mark)
1675 line.append(event)
1676 line.append(db_ro_task_delete.get("_id", ""))
1677 line.append("")
1678 line.append(db_ro_task_delete.get("modified_at", ""))
1679 line.extend([""] * 13)
1680 self.logger.debug(";".join(line))
1681
1682 else:
1683 line.clear()
1684 line.append(mark)
1685 line.append(event)
1686 line.extend([""] * 16)
1687 self.logger.debug(";".join(line))
1688
1689 except Exception as e:
1690 self.logger.error("Error logging ro_task: {}".format(e))
1691
1692 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1693 """
1694 Determine if this task need to be done or superseded
1695 :return: None
1696 """
1697 my_task = ro_task["tasks"][task_index]
1698 task_id = my_task["task_id"]
1699 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1700 "created_items", False
1701 )
1702
1703 if my_task["status"] == "FAILED":
1704 return None, None # TODO need to be retry??
1705
1706 try:
1707 for index, task in enumerate(ro_task["tasks"]):
1708 if index == task_index or not task:
1709 continue # own task
1710
1711 if (
1712 my_task["target_record"] == task["target_record"]
1713 and task["action"] == "CREATE"
1714 ):
1715 # set to finished
1716 db_update["tasks.{}.status".format(index)] = task[
1717 "status"
1718 ] = "FINISHED"
1719 elif task["action"] == "CREATE" and task["status"] not in (
1720 "FINISHED",
1721 "SUPERSEDED",
1722 ):
1723 needed_delete = False
1724
1725 if needed_delete:
1726 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1727 else:
1728 return "SUPERSEDED", None
1729 except Exception as e:
1730 if not isinstance(e, NsWorkerException):
1731 self.logger.critical(
1732 "Unexpected exception at _delete_task task={}: {}".format(
1733 task_id, e
1734 ),
1735 exc_info=True,
1736 )
1737
1738 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1739
1740 def _create_task(self, ro_task, task_index, task_depends, db_update):
1741 """
1742 Determine if this task need to create something at VIM
1743 :return: None
1744 """
1745 my_task = ro_task["tasks"][task_index]
1746 task_id = my_task["task_id"]
1747 task_status = None
1748
1749 if my_task["status"] == "FAILED":
1750 return None, None # TODO need to be retry??
1751 elif my_task["status"] == "SCHEDULED":
1752 # check if already created by another task
1753 for index, task in enumerate(ro_task["tasks"]):
1754 if index == task_index or not task:
1755 continue # own task
1756
1757 if task["action"] == "CREATE" and task["status"] not in (
1758 "SCHEDULED",
1759 "FINISHED",
1760 "SUPERSEDED",
1761 ):
1762 return task["status"], "COPY_VIM_INFO"
1763
1764 try:
1765 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1766 ro_task, task_index, task_depends
1767 )
1768 # TODO update other CREATE tasks
1769 except Exception as e:
1770 if not isinstance(e, NsWorkerException):
1771 self.logger.error(
1772 "Error executing task={}: {}".format(task_id, e), exc_info=True
1773 )
1774
1775 task_status = "FAILED"
1776 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1777 # TODO update ro_vim_item_update
1778
1779 return task_status, ro_vim_item_update
1780 else:
1781 return None, None
1782
1783 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1784 """
1785 Look for dependency task
1786 :param task_id: Can be one of
1787 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1788 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1789 3. task.task_id: "<action_id>:number"
1790 :param ro_task:
1791 :param target_id:
1792 :return: database ro_task plus index of task
1793 """
1794 if (
1795 task_id.startswith("vim:")
1796 or task_id.startswith("sdn:")
1797 or task_id.startswith("wim:")
1798 ):
1799 target_id, _, task_id = task_id.partition(" ")
1800
1801 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1802 ro_task_dependency = self.db.get_one(
1803 "ro_tasks",
1804 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1805 fail_on_empty=False,
1806 )
1807
1808 if ro_task_dependency:
1809 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1810 if task["target_record_id"] == task_id:
1811 return ro_task_dependency, task_index
1812
1813 else:
1814 if ro_task:
1815 for task_index, task in enumerate(ro_task["tasks"]):
1816 if task and task["task_id"] == task_id:
1817 return ro_task, task_index
1818
1819 ro_task_dependency = self.db.get_one(
1820 "ro_tasks",
1821 q_filter={
1822 "tasks.ANYINDEX.task_id": task_id,
1823 "tasks.ANYINDEX.target_record.ne": None,
1824 },
1825 fail_on_empty=False,
1826 )
1827
1828 if ro_task_dependency:
1829 for task_index, task in ro_task_dependency["tasks"]:
1830 if task["task_id"] == task_id:
1831 return ro_task_dependency, task_index
1832 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1833
1834 def _process_pending_tasks(self, ro_task):
1835 ro_task_id = ro_task["_id"]
1836 now = time.time()
1837 # one day
1838 next_check_at = now + (24 * 60 * 60)
1839 db_ro_task_update = {}
1840
1841 def _update_refresh(new_status):
1842 # compute next_refresh
1843 nonlocal task
1844 nonlocal next_check_at
1845 nonlocal db_ro_task_update
1846 nonlocal ro_task
1847
1848 next_refresh = time.time()
1849
1850 if task["item"] in ("image", "flavor"):
1851 next_refresh += self.REFRESH_IMAGE
1852 elif new_status == "BUILD":
1853 next_refresh += self.REFRESH_BUILD
1854 elif new_status == "DONE":
1855 next_refresh += self.REFRESH_ACTIVE
1856 else:
1857 next_refresh += self.REFRESH_ERROR
1858
1859 next_check_at = min(next_check_at, next_refresh)
1860 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1861 ro_task["vim_info"]["refresh_at"] = next_refresh
1862
1863 try:
1864 """
1865 # Log RO tasks only when loglevel is DEBUG
1866 if self.logger.getEffectiveLevel() == logging.DEBUG:
1867 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1868 """
1869 # 0: get task_status_create
1870 lock_object = None
1871 task_status_create = None
1872 task_create = next(
1873 (
1874 t
1875 for t in ro_task["tasks"]
1876 if t
1877 and t["action"] == "CREATE"
1878 and t["status"] in ("BUILD", "DONE")
1879 ),
1880 None,
1881 )
1882
1883 if task_create:
1884 task_status_create = task_create["status"]
1885
1886 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1887 for task_action in ("DELETE", "CREATE", "EXEC"):
1888 db_vim_update = None
1889 new_status = None
1890
1891 for task_index, task in enumerate(ro_task["tasks"]):
1892 if not task:
1893 continue # task deleted
1894
1895 task_depends = {}
1896 target_update = None
1897
1898 if (
1899 (
1900 task_action in ("DELETE", "EXEC")
1901 and task["status"] not in ("SCHEDULED", "BUILD")
1902 )
1903 or task["action"] != task_action
1904 or (
1905 task_action == "CREATE"
1906 and task["status"] in ("FINISHED", "SUPERSEDED")
1907 )
1908 ):
1909 continue
1910
1911 task_path = "tasks.{}.status".format(task_index)
1912 try:
1913 db_vim_info_update = None
1914
1915 if task["status"] == "SCHEDULED":
1916 # check if tasks that this depends on have been completed
1917 dependency_not_completed = False
1918
1919 for dependency_task_id in task.get("depends_on") or ():
1920 (
1921 dependency_ro_task,
1922 dependency_task_index,
1923 ) = self._get_dependency(
1924 dependency_task_id, target_id=ro_task["target_id"]
1925 )
1926 dependency_task = dependency_ro_task["tasks"][
1927 dependency_task_index
1928 ]
1929
1930 if dependency_task["status"] == "SCHEDULED":
1931 dependency_not_completed = True
1932 next_check_at = min(
1933 next_check_at, dependency_ro_task["to_check_at"]
1934 )
1935 # must allow dependent task to be processed first
1936 # to do this set time after last_task_processed
1937 next_check_at = max(
1938 self.time_last_task_processed, next_check_at
1939 )
1940 break
1941 elif dependency_task["status"] == "FAILED":
1942 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1943 task["action"],
1944 task["item"],
1945 dependency_task["action"],
1946 dependency_task["item"],
1947 dependency_task_id,
1948 dependency_ro_task["vim_info"].get(
1949 "vim_details"
1950 ),
1951 )
1952 self.logger.error(
1953 "task={} {}".format(task["task_id"], error_text)
1954 )
1955 raise NsWorkerException(error_text)
1956
1957 task_depends[dependency_task_id] = dependency_ro_task[
1958 "vim_info"
1959 ]["vim_id"]
1960 task_depends[
1961 "TASK-{}".format(dependency_task_id)
1962 ] = dependency_ro_task["vim_info"]["vim_id"]
1963
1964 if dependency_not_completed:
1965 # TODO set at vim_info.vim_details that it is waiting
1966 continue
1967
1968 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
1969 # the task of renew this locking. It will update database locket_at periodically
1970 if not lock_object:
1971 lock_object = LockRenew.add_lock_object(
1972 "ro_tasks", ro_task, self
1973 )
1974
1975 if task["action"] == "DELETE":
1976 (new_status, db_vim_info_update,) = self._delete_task(
1977 ro_task, task_index, task_depends, db_ro_task_update
1978 )
1979 new_status = (
1980 "FINISHED" if new_status == "DONE" else new_status
1981 )
1982 # ^with FINISHED instead of DONE it will not be refreshing
1983
1984 if new_status in ("FINISHED", "SUPERSEDED"):
1985 target_update = "DELETE"
1986 elif task["action"] == "EXEC":
1987 (
1988 new_status,
1989 db_vim_info_update,
1990 db_task_update,
1991 ) = self.item2class[task["item"]].exec(
1992 ro_task, task_index, task_depends
1993 )
1994 new_status = (
1995 "FINISHED" if new_status == "DONE" else new_status
1996 )
1997 # ^with FINISHED instead of DONE it will not be refreshing
1998
1999 if db_task_update:
2000 # load into database the modified db_task_update "retries" and "next_retry"
2001 if db_task_update.get("retries"):
2002 db_ro_task_update[
2003 "tasks.{}.retries".format(task_index)
2004 ] = db_task_update["retries"]
2005
2006 next_check_at = time.time() + db_task_update.get(
2007 "next_retry", 60
2008 )
2009 target_update = None
2010 elif task["action"] == "CREATE":
2011 if task["status"] == "SCHEDULED":
2012 if task_status_create:
2013 new_status = task_status_create
2014 target_update = "COPY_VIM_INFO"
2015 else:
2016 new_status, db_vim_info_update = self.item2class[
2017 task["item"]
2018 ].new(ro_task, task_index, task_depends)
2019 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2020 _update_refresh(new_status)
2021 else:
2022 if (
2023 ro_task["vim_info"]["refresh_at"]
2024 and now > ro_task["vim_info"]["refresh_at"]
2025 ):
2026 new_status, db_vim_info_update = self.item2class[
2027 task["item"]
2028 ].refresh(ro_task)
2029 _update_refresh(new_status)
2030 else:
2031 # The refresh is updated to avoid set the value of "refresh_at" to
2032 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2033 # because it can happen that in this case the task is never processed
2034 _update_refresh(task["status"])
2035
2036 except Exception as e:
2037 new_status = "FAILED"
2038 db_vim_info_update = {
2039 "vim_status": "VIM_ERROR",
2040 "vim_details": str(e),
2041 }
2042
2043 if not isinstance(
2044 e, (NsWorkerException, vimconn.VimConnException)
2045 ):
2046 self.logger.error(
2047 "Unexpected exception at _delete_task task={}: {}".format(
2048 task["task_id"], e
2049 ),
2050 exc_info=True,
2051 )
2052
2053 try:
2054 if db_vim_info_update:
2055 db_vim_update = db_vim_info_update.copy()
2056 db_ro_task_update.update(
2057 {
2058 "vim_info." + k: v
2059 for k, v in db_vim_info_update.items()
2060 }
2061 )
2062 ro_task["vim_info"].update(db_vim_info_update)
2063
2064 if new_status:
2065 if task_action == "CREATE":
2066 task_status_create = new_status
2067 db_ro_task_update[task_path] = new_status
2068
2069 if target_update or db_vim_update:
2070 if target_update == "DELETE":
2071 self._update_target(task, None)
2072 elif target_update == "COPY_VIM_INFO":
2073 self._update_target(task, ro_task["vim_info"])
2074 else:
2075 self._update_target(task, db_vim_update)
2076
2077 except Exception as e:
2078 if (
2079 isinstance(e, DbException)
2080 and e.http_code == HTTPStatus.NOT_FOUND
2081 ):
2082 # if the vnfrs or nsrs has been removed from database, this task must be removed
2083 self.logger.debug(
2084 "marking to delete task={}".format(task["task_id"])
2085 )
2086 self.tasks_to_delete.append(task)
2087 else:
2088 self.logger.error(
2089 "Unexpected exception at _update_target task={}: {}".format(
2090 task["task_id"], e
2091 ),
2092 exc_info=True,
2093 )
2094
2095 locked_at = ro_task["locked_at"]
2096
2097 if lock_object:
2098 locked_at = [
2099 lock_object["locked_at"],
2100 lock_object["locked_at"] + self.task_locked_time,
2101 ]
2102 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2103 # contain exactly locked_at + self.task_locked_time
2104 LockRenew.remove_lock_object(lock_object)
2105
2106 q_filter = {
2107 "_id": ro_task["_id"],
2108 "to_check_at": ro_task["to_check_at"],
2109 "locked_at": locked_at,
2110 }
2111 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2112 # outside this task (by ro_nbi) do not update it
2113 db_ro_task_update["locked_by"] = None
2114 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2115 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2116 db_ro_task_update["modified_at"] = now
2117 db_ro_task_update["to_check_at"] = next_check_at
2118
2119 """
2120 # Log RO tasks only when loglevel is DEBUG
2121 if self.logger.getEffectiveLevel() == logging.DEBUG:
2122 db_ro_task_update_log = db_ro_task_update.copy()
2123 db_ro_task_update_log["_id"] = q_filter["_id"]
2124 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2125 """
2126
2127 if not self.db.set_one(
2128 "ro_tasks",
2129 update_dict=db_ro_task_update,
2130 q_filter=q_filter,
2131 fail_on_empty=False,
2132 ):
2133 del db_ro_task_update["to_check_at"]
2134 del q_filter["to_check_at"]
2135 """
2136 # Log RO tasks only when loglevel is DEBUG
2137 if self.logger.getEffectiveLevel() == logging.DEBUG:
2138 self._log_ro_task(
2139 None,
2140 db_ro_task_update_log,
2141 None,
2142 "TASK_WF",
2143 "SET_TASK " + str(q_filter),
2144 )
2145 """
2146 self.db.set_one(
2147 "ro_tasks",
2148 q_filter=q_filter,
2149 update_dict=db_ro_task_update,
2150 fail_on_empty=True,
2151 )
2152 except DbException as e:
2153 self.logger.error(
2154 "ro_task={} Error updating database {}".format(ro_task_id, e)
2155 )
2156 except Exception as e:
2157 self.logger.error(
2158 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2159 )
2160
2161 def _update_target(self, task, ro_vim_item_update):
2162 table, _, temp = task["target_record"].partition(":")
2163 _id, _, path_vim_status = temp.partition(":")
2164 path_item = path_vim_status[: path_vim_status.rfind(".")]
2165 path_item = path_item[: path_item.rfind(".")]
2166 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2167 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2168
2169 if ro_vim_item_update:
2170 update_dict = {
2171 path_vim_status + "." + k: v
2172 for k, v in ro_vim_item_update.items()
2173 if k
2174 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2175 }
2176
2177 if path_vim_status.startswith("vdur."):
2178 # for backward compatibility, add vdur.name apart from vdur.vim_name
2179 if ro_vim_item_update.get("vim_name"):
2180 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2181
2182 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2183 if ro_vim_item_update.get("vim_id"):
2184 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2185
2186 # update general status
2187 if ro_vim_item_update.get("vim_status"):
2188 update_dict[path_item + ".status"] = ro_vim_item_update[
2189 "vim_status"
2190 ]
2191
2192 if ro_vim_item_update.get("interfaces"):
2193 path_interfaces = path_item + ".interfaces"
2194
2195 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2196 if iface:
2197 update_dict.update(
2198 {
2199 path_interfaces + ".{}.".format(i) + k: v
2200 for k, v in iface.items()
2201 if k in ("vlan", "compute_node", "pci")
2202 }
2203 )
2204
2205 # put ip_address and mac_address with ip-address and mac-address
2206 if iface.get("ip_address"):
2207 update_dict[
2208 path_interfaces + ".{}.".format(i) + "ip-address"
2209 ] = iface["ip_address"]
2210
2211 if iface.get("mac_address"):
2212 update_dict[
2213 path_interfaces + ".{}.".format(i) + "mac-address"
2214 ] = iface["mac_address"]
2215
2216 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2217 update_dict["ip-address"] = iface.get("ip_address").split(
2218 ";"
2219 )[0]
2220
2221 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2222 update_dict[path_item + ".ip-address"] = iface.get(
2223 "ip_address"
2224 ).split(";")[0]
2225
2226 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2227 else:
2228 update_dict = {path_item + ".status": "DELETED"}
2229 self.db.set_one(
2230 table,
2231 q_filter={"_id": _id},
2232 update_dict=update_dict,
2233 unset={path_vim_status: None},
2234 )
2235
2236 def _process_delete_db_tasks(self):
2237 """
2238 Delete task from database because vnfrs or nsrs or both have been deleted
2239 :return: None. Uses and modify self.tasks_to_delete
2240 """
2241 while self.tasks_to_delete:
2242 task = self.tasks_to_delete[0]
2243 vnfrs_deleted = None
2244 nsr_id = task["nsr_id"]
2245
2246 if task["target_record"].startswith("vnfrs:"):
2247 # check if nsrs is present
2248 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2249 vnfrs_deleted = task["target_record"].split(":")[1]
2250
2251 try:
2252 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2253 except Exception as e:
2254 self.logger.error(
2255 "Error deleting task={}: {}".format(task["task_id"], e)
2256 )
2257 self.tasks_to_delete.pop(0)
2258
2259 @staticmethod
2260 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2261 """
2262 Static method because it is called from osm_ng_ro.ns
2263 :param db: instance of database to use
2264 :param nsr_id: affected nsrs id
2265 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2266 :return: None, exception is fails
2267 """
2268 retries = 5
2269 for retry in range(retries):
2270 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2271 now = time.time()
2272 conflict = False
2273
2274 for ro_task in ro_tasks:
2275 db_update = {}
2276 to_delete_ro_task = True
2277
2278 for index, task in enumerate(ro_task["tasks"]):
2279 if not task:
2280 pass
2281 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2282 vnfrs_deleted
2283 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2284 ):
2285 db_update["tasks.{}".format(index)] = None
2286 else:
2287 # used by other nsr, ro_task cannot be deleted
2288 to_delete_ro_task = False
2289
2290 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2291 if to_delete_ro_task:
2292 if not db.del_one(
2293 "ro_tasks",
2294 q_filter={
2295 "_id": ro_task["_id"],
2296 "modified_at": ro_task["modified_at"],
2297 },
2298 fail_on_empty=False,
2299 ):
2300 conflict = True
2301 elif db_update:
2302 db_update["modified_at"] = now
2303 if not db.set_one(
2304 "ro_tasks",
2305 q_filter={
2306 "_id": ro_task["_id"],
2307 "modified_at": ro_task["modified_at"],
2308 },
2309 update_dict=db_update,
2310 fail_on_empty=False,
2311 ):
2312 conflict = True
2313 if not conflict:
2314 return
2315 else:
2316 raise NsWorkerException("Exceeded {} retries".format(retries))
2317
2318 def run(self):
2319 # load database
2320 self.logger.info("Starting")
2321 while True:
2322 # step 1: get commands from queue
2323 try:
2324 if self.vim_targets:
2325 task = self.task_queue.get(block=False)
2326 else:
2327 if not self.idle:
2328 self.logger.debug("enters in idle state")
2329 self.idle = True
2330 task = self.task_queue.get(block=True)
2331 self.idle = False
2332
2333 if task[0] == "terminate":
2334 break
2335 elif task[0] == "load_vim":
2336 self.logger.info("order to load vim {}".format(task[1]))
2337 self._load_vim(task[1])
2338 elif task[0] == "unload_vim":
2339 self.logger.info("order to unload vim {}".format(task[1]))
2340 self._unload_vim(task[1])
2341 elif task[0] == "reload_vim":
2342 self._reload_vim(task[1])
2343 elif task[0] == "check_vim":
2344 self.logger.info("order to check vim {}".format(task[1]))
2345 self._check_vim(task[1])
2346 continue
2347 except Exception as e:
2348 if isinstance(e, queue.Empty):
2349 pass
2350 else:
2351 self.logger.critical(
2352 "Error processing task: {}".format(e), exc_info=True
2353 )
2354
2355 # step 2: process pending_tasks, delete not needed tasks
2356 try:
2357 if self.tasks_to_delete:
2358 self._process_delete_db_tasks()
2359 busy = False
2360 """
2361 # Log RO tasks only when loglevel is DEBUG
2362 if self.logger.getEffectiveLevel() == logging.DEBUG:
2363 _ = self._get_db_all_tasks()
2364 """
2365 ro_task = self._get_db_task()
2366 if ro_task:
2367 self._process_pending_tasks(ro_task)
2368 busy = True
2369 if not busy:
2370 time.sleep(5)
2371 except Exception as e:
2372 self.logger.critical(
2373 "Unexpected exception at run: " + str(e), exc_info=True
2374 )
2375
2376 self.logger.info("Finishing")