4dac9d97112e93d8d1b57ab0c9d516639ab5d63d
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 }
215 self.logger.debug(
216 "task={} {} new-net={} created={}".format(
217 task_id, ro_task["target_id"], vim_net_id, created
218 )
219 )
220
221 return "BUILD", ro_vim_item_update
222 except (vimconn.VimConnException, NsWorkerException) as e:
223 self.logger.error(
224 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
225 )
226 ro_vim_item_update = {
227 "vim_status": "VIM_ERROR",
228 "created": created,
229 "vim_details": str(e),
230 }
231
232 return "FAILED", ro_vim_item_update
233
234 def refresh(self, ro_task):
235 """Call VIM to get network status"""
236 ro_task_id = ro_task["_id"]
237 target_vim = self.my_vims[ro_task["target_id"]]
238 vim_id = ro_task["vim_info"]["vim_id"]
239 net_to_refresh_list = [vim_id]
240
241 try:
242 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
243 vim_info = vim_dict[vim_id]
244
245 if vim_info["status"] == "ACTIVE":
246 task_status = "DONE"
247 elif vim_info["status"] == "BUILD":
248 task_status = "BUILD"
249 else:
250 task_status = "FAILED"
251 except vimconn.VimConnException as e:
252 # Mark all tasks at VIM_ERROR status
253 self.logger.error(
254 "ro_task={} vim={} get-net={}: {}".format(
255 ro_task_id, ro_task["target_id"], vim_id, e
256 )
257 )
258 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
259 task_status = "FAILED"
260
261 ro_vim_item_update = {}
262 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
263 ro_vim_item_update["vim_status"] = vim_info["status"]
264
265 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
266 ro_vim_item_update["vim_name"] = vim_info.get("name")
267
268 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
269 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
270 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
271 elif vim_info["status"] == "DELETED":
272 ro_vim_item_update["vim_id"] = None
273 ro_vim_item_update["vim_details"] = "Deleted externally"
274 else:
275 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
276 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
277
278 if ro_vim_item_update:
279 self.logger.debug(
280 "ro_task={} {} get-net={}: status={} {}".format(
281 ro_task_id,
282 ro_task["target_id"],
283 vim_id,
284 ro_vim_item_update.get("vim_status"),
285 ro_vim_item_update.get("vim_details")
286 if ro_vim_item_update.get("vim_status") != "ACTIVE"
287 else "",
288 )
289 )
290
291 return task_status, ro_vim_item_update
292
293 def delete(self, ro_task, task_index):
294 task = ro_task["tasks"][task_index]
295 task_id = task["task_id"]
296 net_vim_id = ro_task["vim_info"]["vim_id"]
297 ro_vim_item_update_ok = {
298 "vim_status": "DELETED",
299 "created": False,
300 "vim_details": "DELETED",
301 "vim_id": None,
302 }
303
304 try:
305 if net_vim_id or ro_task["vim_info"]["created_items"]:
306 target_vim = self.my_vims[ro_task["target_id"]]
307 target_vim.delete_network(
308 net_vim_id, ro_task["vim_info"]["created_items"]
309 )
310 except vimconn.VimConnNotFoundException:
311 ro_vim_item_update_ok["vim_details"] = "already deleted"
312 except vimconn.VimConnException as e:
313 self.logger.error(
314 "ro_task={} vim={} del-net={}: {}".format(
315 ro_task["_id"], ro_task["target_id"], net_vim_id, e
316 )
317 )
318 ro_vim_item_update = {
319 "vim_status": "VIM_ERROR",
320 "vim_details": "Error while deleting: {}".format(e),
321 }
322
323 return "FAILED", ro_vim_item_update
324
325 self.logger.debug(
326 "task={} {} del-net={} {}".format(
327 task_id,
328 ro_task["target_id"],
329 net_vim_id,
330 ro_vim_item_update_ok.get("vim_details", ""),
331 )
332 )
333
334 return "DONE", ro_vim_item_update_ok
335
336
337 class VimInteractionVdu(VimInteractionBase):
338 max_retries_inject_ssh_key = 20 # 20 times
339 time_retries_inject_ssh_key = 30 # wevery 30 seconds
340
341 def new(self, ro_task, task_index, task_depends):
342 task = ro_task["tasks"][task_index]
343 task_id = task["task_id"]
344 created = False
345 created_items = {}
346 target_vim = self.my_vims[ro_task["target_id"]]
347
348 try:
349 created = True
350 params = task["params"]
351 params_copy = deepcopy(params)
352 net_list = params_copy["net_list"]
353
354 for net in net_list:
355 # change task_id into network_id
356 if "net_id" in net and net["net_id"].startswith("TASK-"):
357 network_id = task_depends[net["net_id"]]
358
359 if not network_id:
360 raise NsWorkerException(
361 "Cannot create VM because depends on a network not created or found "
362 "for {}".format(net["net_id"])
363 )
364
365 net["net_id"] = network_id
366
367 if params_copy["image_id"].startswith("TASK-"):
368 params_copy["image_id"] = task_depends[params_copy["image_id"]]
369
370 if params_copy["flavor_id"].startswith("TASK-"):
371 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
372
373 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
374 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
375
376 ro_vim_item_update = {
377 "vim_id": vim_vm_id,
378 "vim_status": "BUILD",
379 "created": created,
380 "created_items": created_items,
381 "vim_details": None,
382 "interfaces_vim_ids": interfaces,
383 "interfaces": [],
384 }
385 self.logger.debug(
386 "task={} {} new-vm={} created={}".format(
387 task_id, ro_task["target_id"], vim_vm_id, created
388 )
389 )
390
391 return "BUILD", ro_vim_item_update
392 except (vimconn.VimConnException, NsWorkerException) as e:
393 self.logger.error(
394 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
395 )
396 ro_vim_item_update = {
397 "vim_status": "VIM_ERROR",
398 "created": created,
399 "vim_details": str(e),
400 }
401
402 return "FAILED", ro_vim_item_update
403
404 def delete(self, ro_task, task_index):
405 task = ro_task["tasks"][task_index]
406 task_id = task["task_id"]
407 vm_vim_id = ro_task["vim_info"]["vim_id"]
408 ro_vim_item_update_ok = {
409 "vim_status": "DELETED",
410 "created": False,
411 "vim_details": "DELETED",
412 "vim_id": None,
413 }
414
415 try:
416 if vm_vim_id or ro_task["vim_info"]["created_items"]:
417 target_vim = self.my_vims[ro_task["target_id"]]
418 target_vim.delete_vminstance(
419 vm_vim_id, ro_task["vim_info"]["created_items"]
420 )
421 except vimconn.VimConnNotFoundException:
422 ro_vim_item_update_ok["vim_details"] = "already deleted"
423 except vimconn.VimConnException as e:
424 self.logger.error(
425 "ro_task={} vim={} del-vm={}: {}".format(
426 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
427 )
428 )
429 ro_vim_item_update = {
430 "vim_status": "VIM_ERROR",
431 "vim_details": "Error while deleting: {}".format(e),
432 }
433
434 return "FAILED", ro_vim_item_update
435
436 self.logger.debug(
437 "task={} {} del-vm={} {}".format(
438 task_id,
439 ro_task["target_id"],
440 vm_vim_id,
441 ro_vim_item_update_ok.get("vim_details", ""),
442 )
443 )
444
445 return "DONE", ro_vim_item_update_ok
446
447 def refresh(self, ro_task):
448 """Call VIM to get vm status"""
449 ro_task_id = ro_task["_id"]
450 target_vim = self.my_vims[ro_task["target_id"]]
451 vim_id = ro_task["vim_info"]["vim_id"]
452
453 if not vim_id:
454 return None, None
455
456 vm_to_refresh_list = [vim_id]
457 try:
458 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
459 vim_info = vim_dict[vim_id]
460
461 if vim_info["status"] == "ACTIVE":
462 task_status = "DONE"
463 elif vim_info["status"] == "BUILD":
464 task_status = "BUILD"
465 else:
466 task_status = "FAILED"
467
468 # try to load and parse vim_information
469 try:
470 vim_info_info = yaml.safe_load(vim_info["vim_info"])
471 if vim_info_info.get("name"):
472 vim_info["name"] = vim_info_info["name"]
473 except Exception:
474 pass
475 except vimconn.VimConnException as e:
476 # Mark all tasks at VIM_ERROR status
477 self.logger.error(
478 "ro_task={} vim={} get-vm={}: {}".format(
479 ro_task_id, ro_task["target_id"], vim_id, e
480 )
481 )
482 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
483 task_status = "FAILED"
484
485 ro_vim_item_update = {}
486
487 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
488 vim_interfaces = []
489 if vim_info.get("interfaces"):
490 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
491 iface = next(
492 (
493 iface
494 for iface in vim_info["interfaces"]
495 if vim_iface_id == iface["vim_interface_id"]
496 ),
497 None,
498 )
499 # if iface:
500 # iface.pop("vim_info", None)
501 vim_interfaces.append(iface)
502
503 task_create = next(
504 t
505 for t in ro_task["tasks"]
506 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
507 )
508 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
509 vim_interfaces[task_create["mgmt_vnf_interface"]][
510 "mgmt_vnf_interface"
511 ] = True
512
513 mgmt_vdu_iface = task_create.get(
514 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
515 )
516 if vim_interfaces:
517 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
518
519 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
520 ro_vim_item_update["interfaces"] = vim_interfaces
521
522 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
523 ro_vim_item_update["vim_status"] = vim_info["status"]
524
525 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
526 ro_vim_item_update["vim_name"] = vim_info.get("name")
527
528 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
529 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
530 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
531 elif vim_info["status"] == "DELETED":
532 ro_vim_item_update["vim_id"] = None
533 ro_vim_item_update["vim_details"] = "Deleted externally"
534 else:
535 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
536 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
537
538 if ro_vim_item_update:
539 self.logger.debug(
540 "ro_task={} {} get-vm={}: status={} {}".format(
541 ro_task_id,
542 ro_task["target_id"],
543 vim_id,
544 ro_vim_item_update.get("vim_status"),
545 ro_vim_item_update.get("vim_details")
546 if ro_vim_item_update.get("vim_status") != "ACTIVE"
547 else "",
548 )
549 )
550
551 return task_status, ro_vim_item_update
552
553 def exec(self, ro_task, task_index, task_depends):
554 task = ro_task["tasks"][task_index]
555 task_id = task["task_id"]
556 target_vim = self.my_vims[ro_task["target_id"]]
557 db_task_update = {"retries": 0}
558 retries = task.get("retries", 0)
559
560 try:
561 params = task["params"]
562 params_copy = deepcopy(params)
563 params_copy["ro_key"] = self.db.decrypt(
564 params_copy.pop("private_key"),
565 params_copy.pop("schema_version"),
566 params_copy.pop("salt"),
567 )
568 params_copy["ip_addr"] = params_copy.pop("ip_address")
569 target_vim.inject_user_key(**params_copy)
570 self.logger.debug(
571 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
572 )
573
574 return (
575 "DONE",
576 None,
577 db_task_update,
578 ) # params_copy["key"]
579 except (vimconn.VimConnException, NsWorkerException) as e:
580 retries += 1
581
582 if retries < self.max_retries_inject_ssh_key:
583 return (
584 "BUILD",
585 None,
586 {
587 "retries": retries,
588 "next_retry": self.time_retries_inject_ssh_key,
589 },
590 )
591
592 self.logger.error(
593 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
594 )
595 ro_vim_item_update = {"vim_details": str(e)}
596
597 return "FAILED", ro_vim_item_update, db_task_update
598
599
600 class VimInteractionImage(VimInteractionBase):
601 def new(self, ro_task, task_index, task_depends):
602 task = ro_task["tasks"][task_index]
603 task_id = task["task_id"]
604 created = False
605 created_items = {}
606 target_vim = self.my_vims[ro_task["target_id"]]
607
608 try:
609 # FIND
610 if task.get("find_params"):
611 vim_images = target_vim.get_image_list(**task["find_params"])
612
613 if not vim_images:
614 raise NsWorkerExceptionNotFound(
615 "Image not found with this criteria: '{}'".format(
616 task["find_params"]
617 )
618 )
619 elif len(vim_images) > 1:
620 raise NsWorkerException(
621 "More than one image found with this criteria: '{}'".format(
622 task["find_params"]
623 )
624 )
625 else:
626 vim_image_id = vim_images[0]["id"]
627
628 ro_vim_item_update = {
629 "vim_id": vim_image_id,
630 "vim_status": "DONE",
631 "created": created,
632 "created_items": created_items,
633 "vim_details": None,
634 }
635 self.logger.debug(
636 "task={} {} new-image={} created={}".format(
637 task_id, ro_task["target_id"], vim_image_id, created
638 )
639 )
640
641 return "DONE", ro_vim_item_update
642 except (NsWorkerException, vimconn.VimConnException) as e:
643 self.logger.error(
644 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
645 )
646 ro_vim_item_update = {
647 "vim_status": "VIM_ERROR",
648 "created": created,
649 "vim_details": str(e),
650 }
651
652 return "FAILED", ro_vim_item_update
653
654
655 class VimInteractionFlavor(VimInteractionBase):
656 def delete(self, ro_task, task_index):
657 task = ro_task["tasks"][task_index]
658 task_id = task["task_id"]
659 flavor_vim_id = ro_task["vim_info"]["vim_id"]
660 ro_vim_item_update_ok = {
661 "vim_status": "DELETED",
662 "created": False,
663 "vim_details": "DELETED",
664 "vim_id": None,
665 }
666
667 try:
668 if flavor_vim_id:
669 target_vim = self.my_vims[ro_task["target_id"]]
670 target_vim.delete_flavor(flavor_vim_id)
671 except vimconn.VimConnNotFoundException:
672 ro_vim_item_update_ok["vim_details"] = "already deleted"
673 except vimconn.VimConnException as e:
674 self.logger.error(
675 "ro_task={} vim={} del-flavor={}: {}".format(
676 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
677 )
678 )
679 ro_vim_item_update = {
680 "vim_status": "VIM_ERROR",
681 "vim_details": "Error while deleting: {}".format(e),
682 }
683
684 return "FAILED", ro_vim_item_update
685
686 self.logger.debug(
687 "task={} {} del-flavor={} {}".format(
688 task_id,
689 ro_task["target_id"],
690 flavor_vim_id,
691 ro_vim_item_update_ok.get("vim_details", ""),
692 )
693 )
694
695 return "DONE", ro_vim_item_update_ok
696
697 def new(self, ro_task, task_index, task_depends):
698 task = ro_task["tasks"][task_index]
699 task_id = task["task_id"]
700 created = False
701 created_items = {}
702 target_vim = self.my_vims[ro_task["target_id"]]
703
704 try:
705 # FIND
706 vim_flavor_id = None
707
708 if task.get("find_params"):
709 try:
710 flavor_data = task["find_params"]["flavor_data"]
711 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
712 except vimconn.VimConnNotFoundException:
713 pass
714
715 if not vim_flavor_id and task.get("params"):
716 # CREATE
717 flavor_data = task["params"]["flavor_data"]
718 vim_flavor_id = target_vim.new_flavor(flavor_data)
719 created = True
720
721 ro_vim_item_update = {
722 "vim_id": vim_flavor_id,
723 "vim_status": "DONE",
724 "created": created,
725 "created_items": created_items,
726 "vim_details": None,
727 }
728 self.logger.debug(
729 "task={} {} new-flavor={} created={}".format(
730 task_id, ro_task["target_id"], vim_flavor_id, created
731 )
732 )
733
734 return "DONE", ro_vim_item_update
735 except (vimconn.VimConnException, NsWorkerException) as e:
736 self.logger.error(
737 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
738 )
739 ro_vim_item_update = {
740 "vim_status": "VIM_ERROR",
741 "created": created,
742 "vim_details": str(e),
743 }
744
745 return "FAILED", ro_vim_item_update
746
747
748 class VimInteractionSdnNet(VimInteractionBase):
749 @staticmethod
750 def _match_pci(port_pci, mapping):
751 """
752 Check if port_pci matches with mapping
753 mapping can have brackets to indicate that several chars are accepted. e.g
754 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
755 :param port_pci: text
756 :param mapping: text, can contain brackets to indicate several chars are available
757 :return: True if matches, False otherwise
758 """
759 if not port_pci or not mapping:
760 return False
761 if port_pci == mapping:
762 return True
763
764 mapping_index = 0
765 pci_index = 0
766 while True:
767 bracket_start = mapping.find("[", mapping_index)
768
769 if bracket_start == -1:
770 break
771
772 bracket_end = mapping.find("]", bracket_start)
773 if bracket_end == -1:
774 break
775
776 length = bracket_start - mapping_index
777 if (
778 length
779 and port_pci[pci_index : pci_index + length]
780 != mapping[mapping_index:bracket_start]
781 ):
782 return False
783
784 if (
785 port_pci[pci_index + length]
786 not in mapping[bracket_start + 1 : bracket_end]
787 ):
788 return False
789
790 pci_index += length + 1
791 mapping_index = bracket_end + 1
792
793 if port_pci[pci_index:] != mapping[mapping_index:]:
794 return False
795
796 return True
797
798 def _get_interfaces(self, vlds_to_connect, vim_account_id):
799 """
800 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
801 :param vim_account_id:
802 :return:
803 """
804 interfaces = []
805
806 for vld in vlds_to_connect:
807 table, _, db_id = vld.partition(":")
808 db_id, _, vld = db_id.partition(":")
809 _, _, vld_id = vld.partition(".")
810
811 if table == "vnfrs":
812 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
813 iface_key = "vnf-vld-id"
814 else: # table == "nsrs"
815 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
816 iface_key = "ns-vld-id"
817
818 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
819
820 for db_vnfr in db_vnfrs:
821 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
822 for iface_index, interface in enumerate(vdur["interfaces"]):
823 if interface.get(iface_key) == vld_id and interface.get(
824 "type"
825 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
826 # only SR-IOV o PT
827 interface_ = interface.copy()
828 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
829 db_vnfr["_id"], vdu_index, iface_index
830 )
831
832 if vdur.get("status") == "ERROR":
833 interface_["status"] = "ERROR"
834
835 interfaces.append(interface_)
836
837 return interfaces
838
839 def refresh(self, ro_task):
840 # look for task create
841 task_create_index, _ = next(
842 i_t
843 for i_t in enumerate(ro_task["tasks"])
844 if i_t[1]
845 and i_t[1]["action"] == "CREATE"
846 and i_t[1]["status"] != "FINISHED"
847 )
848
849 return self.new(ro_task, task_create_index, None)
850
851 def new(self, ro_task, task_index, task_depends):
852
853 task = ro_task["tasks"][task_index]
854 task_id = task["task_id"]
855 target_vim = self.my_vims[ro_task["target_id"]]
856
857 sdn_net_id = ro_task["vim_info"]["vim_id"]
858
859 created_items = ro_task["vim_info"].get("created_items")
860 connected_ports = ro_task["vim_info"].get("connected_ports", [])
861 new_connected_ports = []
862 last_update = ro_task["vim_info"].get("last_update", 0)
863 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
864 error_list = []
865 created = ro_task["vim_info"].get("created", False)
866
867 try:
868 # CREATE
869 params = task["params"]
870 vlds_to_connect = params["vlds"]
871 associated_vim = params["target_vim"]
872 # external additional ports
873 additional_ports = params.get("sdn-ports") or ()
874 _, _, vim_account_id = associated_vim.partition(":")
875
876 if associated_vim:
877 # get associated VIM
878 if associated_vim not in self.db_vims:
879 self.db_vims[associated_vim] = self.db.get_one(
880 "vim_accounts", {"_id": vim_account_id}
881 )
882
883 db_vim = self.db_vims[associated_vim]
884
885 # look for ports to connect
886 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
887 # print(ports)
888
889 sdn_ports = []
890 pending_ports = error_ports = 0
891 vlan_used = None
892 sdn_need_update = False
893
894 for port in ports:
895 vlan_used = port.get("vlan") or vlan_used
896
897 # TODO. Do not connect if already done
898 if not port.get("compute_node") or not port.get("pci"):
899 if port.get("status") == "ERROR":
900 error_ports += 1
901 else:
902 pending_ports += 1
903 continue
904
905 pmap = None
906 compute_node_mappings = next(
907 (
908 c
909 for c in db_vim["config"].get("sdn-port-mapping", ())
910 if c and c["compute_node"] == port["compute_node"]
911 ),
912 None,
913 )
914
915 if compute_node_mappings:
916 # process port_mapping pci of type 0000:af:1[01].[1357]
917 pmap = next(
918 (
919 p
920 for p in compute_node_mappings["ports"]
921 if self._match_pci(port["pci"], p.get("pci"))
922 ),
923 None,
924 )
925
926 if not pmap:
927 if not db_vim["config"].get("mapping_not_needed"):
928 error_list.append(
929 "Port mapping not found for compute_node={} pci={}".format(
930 port["compute_node"], port["pci"]
931 )
932 )
933 continue
934
935 pmap = {}
936
937 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
938 new_port = {
939 "service_endpoint_id": pmap.get("service_endpoint_id")
940 or service_endpoint_id,
941 "service_endpoint_encapsulation_type": "dot1q"
942 if port["type"] == "SR-IOV"
943 else None,
944 "service_endpoint_encapsulation_info": {
945 "vlan": port.get("vlan"),
946 "mac": port.get("mac-address"),
947 "device_id": pmap.get("device_id") or port["compute_node"],
948 "device_interface_id": pmap.get("device_interface_id")
949 or port["pci"],
950 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
951 "switch_port": pmap.get("switch_port"),
952 "service_mapping_info": pmap.get("service_mapping_info"),
953 },
954 }
955
956 # TODO
957 # if port["modified_at"] > last_update:
958 # sdn_need_update = True
959 new_connected_ports.append(port["id"]) # TODO
960 sdn_ports.append(new_port)
961
962 if error_ports:
963 error_list.append(
964 "{} interfaces have not been created as VDU is on ERROR status".format(
965 error_ports
966 )
967 )
968
969 # connect external ports
970 for index, additional_port in enumerate(additional_ports):
971 additional_port_id = additional_port.get(
972 "service_endpoint_id"
973 ) or "external-{}".format(index)
974 sdn_ports.append(
975 {
976 "service_endpoint_id": additional_port_id,
977 "service_endpoint_encapsulation_type": additional_port.get(
978 "service_endpoint_encapsulation_type", "dot1q"
979 ),
980 "service_endpoint_encapsulation_info": {
981 "vlan": additional_port.get("vlan") or vlan_used,
982 "mac": additional_port.get("mac_address"),
983 "device_id": additional_port.get("device_id"),
984 "device_interface_id": additional_port.get(
985 "device_interface_id"
986 ),
987 "switch_dpid": additional_port.get("switch_dpid")
988 or additional_port.get("switch_id"),
989 "switch_port": additional_port.get("switch_port"),
990 "service_mapping_info": additional_port.get(
991 "service_mapping_info"
992 ),
993 },
994 }
995 )
996 new_connected_ports.append(additional_port_id)
997 sdn_info = ""
998
999 # if there are more ports to connect or they have been modified, call create/update
1000 if error_list:
1001 sdn_status = "ERROR"
1002 sdn_info = "; ".join(error_list)
1003 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1004 last_update = time.time()
1005
1006 if not sdn_net_id:
1007 if len(sdn_ports) < 2:
1008 sdn_status = "ACTIVE"
1009
1010 if not pending_ports:
1011 self.logger.debug(
1012 "task={} {} new-sdn-net done, less than 2 ports".format(
1013 task_id, ro_task["target_id"]
1014 )
1015 )
1016 else:
1017 net_type = params.get("type") or "ELAN"
1018 (
1019 sdn_net_id,
1020 created_items,
1021 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1022 created = True
1023 self.logger.debug(
1024 "task={} {} new-sdn-net={} created={}".format(
1025 task_id, ro_task["target_id"], sdn_net_id, created
1026 )
1027 )
1028 else:
1029 created_items = target_vim.edit_connectivity_service(
1030 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1031 )
1032 created = True
1033 self.logger.debug(
1034 "task={} {} update-sdn-net={} created={}".format(
1035 task_id, ro_task["target_id"], sdn_net_id, created
1036 )
1037 )
1038
1039 connected_ports = new_connected_ports
1040 elif sdn_net_id:
1041 wim_status_dict = target_vim.get_connectivity_service_status(
1042 sdn_net_id, conn_info=created_items
1043 )
1044 sdn_status = wim_status_dict["sdn_status"]
1045
1046 if wim_status_dict.get("sdn_info"):
1047 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1048
1049 if wim_status_dict.get("error_msg"):
1050 sdn_info = wim_status_dict.get("error_msg") or ""
1051
1052 if pending_ports:
1053 if sdn_status != "ERROR":
1054 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1055 len(ports) - pending_ports, len(ports)
1056 )
1057
1058 if sdn_status == "ACTIVE":
1059 sdn_status = "BUILD"
1060
1061 ro_vim_item_update = {
1062 "vim_id": sdn_net_id,
1063 "vim_status": sdn_status,
1064 "created": created,
1065 "created_items": created_items,
1066 "connected_ports": connected_ports,
1067 "vim_details": sdn_info,
1068 "last_update": last_update,
1069 }
1070
1071 return sdn_status, ro_vim_item_update
1072 except Exception as e:
1073 self.logger.error(
1074 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1075 exc_info=not isinstance(
1076 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1077 ),
1078 )
1079 ro_vim_item_update = {
1080 "vim_status": "VIM_ERROR",
1081 "created": created,
1082 "vim_details": str(e),
1083 }
1084
1085 return "FAILED", ro_vim_item_update
1086
1087 def delete(self, ro_task, task_index):
1088 task = ro_task["tasks"][task_index]
1089 task_id = task["task_id"]
1090 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1091 ro_vim_item_update_ok = {
1092 "vim_status": "DELETED",
1093 "created": False,
1094 "vim_details": "DELETED",
1095 "vim_id": None,
1096 }
1097
1098 try:
1099 if sdn_vim_id:
1100 target_vim = self.my_vims[ro_task["target_id"]]
1101 target_vim.delete_connectivity_service(
1102 sdn_vim_id, ro_task["vim_info"].get("created_items")
1103 )
1104
1105 except Exception as e:
1106 if (
1107 isinstance(e, sdnconn.SdnConnectorError)
1108 and e.http_code == HTTPStatus.NOT_FOUND.value
1109 ):
1110 ro_vim_item_update_ok["vim_details"] = "already deleted"
1111 else:
1112 self.logger.error(
1113 "ro_task={} vim={} del-sdn-net={}: {}".format(
1114 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1115 ),
1116 exc_info=not isinstance(
1117 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1118 ),
1119 )
1120 ro_vim_item_update = {
1121 "vim_status": "VIM_ERROR",
1122 "vim_details": "Error while deleting: {}".format(e),
1123 }
1124
1125 return "FAILED", ro_vim_item_update
1126
1127 self.logger.debug(
1128 "task={} {} del-sdn-net={} {}".format(
1129 task_id,
1130 ro_task["target_id"],
1131 sdn_vim_id,
1132 ro_vim_item_update_ok.get("vim_details", ""),
1133 )
1134 )
1135
1136 return "DONE", ro_vim_item_update_ok
1137
1138
1139 class NsWorker(threading.Thread):
1140 REFRESH_BUILD = 5 # 5 seconds
1141 REFRESH_ACTIVE = 60 # 1 minute
1142 REFRESH_ERROR = 600
1143 REFRESH_IMAGE = 3600 * 10
1144 REFRESH_DELETE = 3600 * 10
1145 QUEUE_SIZE = 100
1146 terminate = False
1147
1148 def __init__(self, worker_index, config, plugins, db):
1149 """
1150
1151 :param worker_index: thread index
1152 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1153 :param plugins: global shared dict with the loaded plugins
1154 :param db: database class instance to use
1155 """
1156 threading.Thread.__init__(self)
1157 self.config = config
1158 self.plugins = plugins
1159 self.plugin_name = "unknown"
1160 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1161 self.worker_index = worker_index
1162 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1163 # targetvim: vimplugin class
1164 self.my_vims = {}
1165 # targetvim: vim information from database
1166 self.db_vims = {}
1167 # targetvim list
1168 self.vim_targets = []
1169 self.my_id = config["process_id"] + ":" + str(worker_index)
1170 self.db = db
1171 self.item2class = {
1172 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1173 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1174 "image": VimInteractionImage(
1175 self.db, self.my_vims, self.db_vims, self.logger
1176 ),
1177 "flavor": VimInteractionFlavor(
1178 self.db, self.my_vims, self.db_vims, self.logger
1179 ),
1180 "sdn_net": VimInteractionSdnNet(
1181 self.db, self.my_vims, self.db_vims, self.logger
1182 ),
1183 }
1184 self.time_last_task_processed = None
1185 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1186 self.tasks_to_delete = []
1187 # it is idle when there are not vim_targets associated
1188 self.idle = True
1189 self.task_locked_time = config["global"]["task_locked_time"]
1190
1191 def insert_task(self, task):
1192 try:
1193 self.task_queue.put(task, False)
1194 return None
1195 except queue.Full:
1196 raise NsWorkerException("timeout inserting a task")
1197
1198 def terminate(self):
1199 self.insert_task("exit")
1200
1201 def del_task(self, task):
1202 with self.task_lock:
1203 if task["status"] == "SCHEDULED":
1204 task["status"] = "SUPERSEDED"
1205 return True
1206 else: # task["status"] == "processing"
1207 self.task_lock.release()
1208 return False
1209
1210 def _process_vim_config(self, target_id, db_vim):
1211 """
1212 Process vim config, creating vim configuration files as ca_cert
1213 :param target_id: vim/sdn/wim + id
1214 :param db_vim: Vim dictionary obtained from database
1215 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1216 """
1217 if not db_vim.get("config"):
1218 return
1219
1220 file_name = ""
1221
1222 try:
1223 if db_vim["config"].get("ca_cert_content"):
1224 file_name = "{}:{}".format(target_id, self.worker_index)
1225
1226 try:
1227 mkdir(file_name)
1228 except FileExistsError:
1229 pass
1230
1231 file_name = file_name + "/ca_cert"
1232
1233 with open(file_name, "w") as f:
1234 f.write(db_vim["config"]["ca_cert_content"])
1235 del db_vim["config"]["ca_cert_content"]
1236 db_vim["config"]["ca_cert"] = file_name
1237 except Exception as e:
1238 raise NsWorkerException(
1239 "Error writing to file '{}': {}".format(file_name, e)
1240 )
1241
1242 def _load_plugin(self, name, type="vim"):
1243 # type can be vim or sdn
1244 if "rovim_dummy" not in self.plugins:
1245 self.plugins["rovim_dummy"] = VimDummyConnector
1246
1247 if "rosdn_dummy" not in self.plugins:
1248 self.plugins["rosdn_dummy"] = SdnDummyConnector
1249
1250 if name in self.plugins:
1251 return self.plugins[name]
1252
1253 try:
1254 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1255 self.plugins[name] = ep.load()
1256 except Exception as e:
1257 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1258
1259 if name and name not in self.plugins:
1260 raise NsWorkerException(
1261 "Plugin 'osm_{n}' has not been installed".format(n=name)
1262 )
1263
1264 return self.plugins[name]
1265
1266 def _unload_vim(self, target_id):
1267 """
1268 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1269 :param target_id: Contains type:_id; where type can be 'vim', ...
1270 :return: None.
1271 """
1272 try:
1273 self.db_vims.pop(target_id, None)
1274 self.my_vims.pop(target_id, None)
1275
1276 if target_id in self.vim_targets:
1277 self.vim_targets.remove(target_id)
1278
1279 self.logger.info("Unloaded {}".format(target_id))
1280 rmtree("{}:{}".format(target_id, self.worker_index))
1281 except FileNotFoundError:
1282 pass # this is raised by rmtree if folder does not exist
1283 except Exception as e:
1284 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1285
1286 def _check_vim(self, target_id):
1287 """
1288 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1289 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1290 :return: None.
1291 """
1292 target, _, _id = target_id.partition(":")
1293 now = time.time()
1294 update_dict = {}
1295 unset_dict = {}
1296 op_text = ""
1297 step = ""
1298 loaded = target_id in self.vim_targets
1299 target_database = (
1300 "vim_accounts"
1301 if target == "vim"
1302 else "wim_accounts"
1303 if target == "wim"
1304 else "sdns"
1305 )
1306
1307 try:
1308 step = "Getting {} from db".format(target_id)
1309 db_vim = self.db.get_one(target_database, {"_id": _id})
1310
1311 for op_index, operation in enumerate(
1312 db_vim["_admin"].get("operations", ())
1313 ):
1314 if operation["operationState"] != "PROCESSING":
1315 continue
1316
1317 locked_at = operation.get("locked_at")
1318
1319 if locked_at is not None and locked_at >= now - self.task_locked_time:
1320 # some other thread is doing this operation
1321 return
1322
1323 # lock
1324 op_text = "_admin.operations.{}.".format(op_index)
1325
1326 if not self.db.set_one(
1327 target_database,
1328 q_filter={
1329 "_id": _id,
1330 op_text + "operationState": "PROCESSING",
1331 op_text + "locked_at": locked_at,
1332 },
1333 update_dict={
1334 op_text + "locked_at": now,
1335 "admin.current_operation": op_index,
1336 },
1337 fail_on_empty=False,
1338 ):
1339 return
1340
1341 unset_dict[op_text + "locked_at"] = None
1342 unset_dict["current_operation"] = None
1343 step = "Loading " + target_id
1344 error_text = self._load_vim(target_id)
1345
1346 if not error_text:
1347 step = "Checking connectivity"
1348
1349 if target == "vim":
1350 self.my_vims[target_id].check_vim_connectivity()
1351 else:
1352 self.my_vims[target_id].check_credentials()
1353
1354 update_dict["_admin.operationalState"] = "ENABLED"
1355 update_dict["_admin.detailed-status"] = ""
1356 unset_dict[op_text + "detailed-status"] = None
1357 update_dict[op_text + "operationState"] = "COMPLETED"
1358
1359 return
1360
1361 except Exception as e:
1362 error_text = "{}: {}".format(step, e)
1363 self.logger.error("{} for {}: {}".format(step, target_id, e))
1364
1365 finally:
1366 if update_dict or unset_dict:
1367 if error_text:
1368 update_dict[op_text + "operationState"] = "FAILED"
1369 update_dict[op_text + "detailed-status"] = error_text
1370 unset_dict.pop(op_text + "detailed-status", None)
1371 update_dict["_admin.operationalState"] = "ERROR"
1372 update_dict["_admin.detailed-status"] = error_text
1373
1374 if op_text:
1375 update_dict[op_text + "statusEnteredTime"] = now
1376
1377 self.db.set_one(
1378 target_database,
1379 q_filter={"_id": _id},
1380 update_dict=update_dict,
1381 unset=unset_dict,
1382 fail_on_empty=False,
1383 )
1384
1385 if not loaded:
1386 self._unload_vim(target_id)
1387
1388 def _reload_vim(self, target_id):
1389 if target_id in self.vim_targets:
1390 self._load_vim(target_id)
1391 else:
1392 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1393 # just remove it to force load again next time it is needed
1394 self.db_vims.pop(target_id, None)
1395
1396 def _load_vim(self, target_id):
1397 """
1398 Load or reload a vim_account, sdn_controller or wim_account.
1399 Read content from database, load the plugin if not loaded.
1400 In case of error loading the plugin, it load a failing VIM_connector
1401 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1402 :param target_id: Contains type:_id; where type can be 'vim', ...
1403 :return: None if ok, descriptive text if error
1404 """
1405 target, _, _id = target_id.partition(":")
1406 target_database = (
1407 "vim_accounts"
1408 if target == "vim"
1409 else "wim_accounts"
1410 if target == "wim"
1411 else "sdns"
1412 )
1413 plugin_name = ""
1414 vim = None
1415
1416 try:
1417 step = "Getting {}={} from db".format(target, _id)
1418 # TODO process for wim, sdnc, ...
1419 vim = self.db.get_one(target_database, {"_id": _id})
1420
1421 # if deep_get(vim, "config", "sdn-controller"):
1422 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1423 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1424
1425 step = "Decrypting password"
1426 schema_version = vim.get("schema_version")
1427 self.db.encrypt_decrypt_fields(
1428 vim,
1429 "decrypt",
1430 fields=("password", "secret"),
1431 schema_version=schema_version,
1432 salt=_id,
1433 )
1434 self._process_vim_config(target_id, vim)
1435
1436 if target == "vim":
1437 plugin_name = "rovim_" + vim["vim_type"]
1438 step = "Loading plugin '{}'".format(plugin_name)
1439 vim_module_conn = self._load_plugin(plugin_name)
1440 step = "Loading {}'".format(target_id)
1441 self.my_vims[target_id] = vim_module_conn(
1442 uuid=vim["_id"],
1443 name=vim["name"],
1444 tenant_id=vim.get("vim_tenant_id"),
1445 tenant_name=vim.get("vim_tenant_name"),
1446 url=vim["vim_url"],
1447 url_admin=None,
1448 user=vim["vim_user"],
1449 passwd=vim["vim_password"],
1450 config=vim.get("config") or {},
1451 persistent_info={},
1452 )
1453 else: # sdn
1454 plugin_name = "rosdn_" + vim["type"]
1455 step = "Loading plugin '{}'".format(plugin_name)
1456 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1457 step = "Loading {}'".format(target_id)
1458 wim = deepcopy(vim)
1459 wim_config = wim.pop("config", {}) or {}
1460 wim["uuid"] = wim["_id"]
1461 wim["wim_url"] = wim["url"]
1462
1463 if wim.get("dpid"):
1464 wim_config["dpid"] = wim.pop("dpid")
1465
1466 if wim.get("switch_id"):
1467 wim_config["switch_id"] = wim.pop("switch_id")
1468
1469 # wim, wim_account, config
1470 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1471 self.db_vims[target_id] = vim
1472 self.error_status = None
1473
1474 self.logger.info(
1475 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1476 )
1477 except Exception as e:
1478 self.logger.error(
1479 "Cannot load {} plugin={}: {} {}".format(
1480 target_id, plugin_name, step, e
1481 )
1482 )
1483
1484 self.db_vims[target_id] = vim or {}
1485 self.db_vims[target_id] = FailingConnector(str(e))
1486 error_status = "{} Error: {}".format(step, e)
1487
1488 return error_status
1489 finally:
1490 if target_id not in self.vim_targets:
1491 self.vim_targets.append(target_id)
1492
1493 def _get_db_task(self):
1494 """
1495 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1496 :return: None
1497 """
1498 now = time.time()
1499
1500 if not self.time_last_task_processed:
1501 self.time_last_task_processed = now
1502
1503 try:
1504 while True:
1505 """
1506 # Log RO tasks only when loglevel is DEBUG
1507 if self.logger.getEffectiveLevel() == logging.DEBUG:
1508 self._log_ro_task(
1509 None,
1510 None,
1511 None,
1512 "TASK_WF",
1513 "task_locked_time="
1514 + str(self.task_locked_time)
1515 + " "
1516 + "time_last_task_processed="
1517 + str(self.time_last_task_processed)
1518 + " "
1519 + "now="
1520 + str(now),
1521 )
1522 """
1523 locked = self.db.set_one(
1524 "ro_tasks",
1525 q_filter={
1526 "target_id": self.vim_targets,
1527 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1528 "locked_at.lt": now - self.task_locked_time,
1529 "to_check_at.lt": self.time_last_task_processed,
1530 },
1531 update_dict={"locked_by": self.my_id, "locked_at": now},
1532 fail_on_empty=False,
1533 )
1534
1535 if locked:
1536 # read and return
1537 ro_task = self.db.get_one(
1538 "ro_tasks",
1539 q_filter={
1540 "target_id": self.vim_targets,
1541 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1542 "locked_at": now,
1543 },
1544 )
1545 return ro_task
1546
1547 if self.time_last_task_processed == now:
1548 self.time_last_task_processed = None
1549 return None
1550 else:
1551 self.time_last_task_processed = now
1552 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1553
1554 except DbException as e:
1555 self.logger.error("Database exception at _get_db_task: {}".format(e))
1556 except Exception as e:
1557 self.logger.critical(
1558 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1559 )
1560
1561 return None
1562
1563 def _get_db_all_tasks(self):
1564 """
1565 Read all content of table ro_tasks to log it
1566 :return: None
1567 """
1568 try:
1569 # Checking the content of the BD:
1570
1571 # read and return
1572 ro_task = self.db.get_list("ro_tasks")
1573 for rt in ro_task:
1574 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1575 return ro_task
1576
1577 except DbException as e:
1578 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1579 except Exception as e:
1580 self.logger.critical(
1581 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1582 )
1583
1584 return None
1585
1586 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1587 """
1588 Generate a log with the following format:
1589
1590 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1591 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1592 task_array_index;task_id;task_action;task_item;task_args
1593
1594 Example:
1595
1596 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1597 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1598 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1599 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1600 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1601 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1602 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1603 """
1604 try:
1605 line = []
1606 i = 0
1607 if ro_task is not None and isinstance(ro_task, dict):
1608 for t in ro_task["tasks"]:
1609 line.clear()
1610 line.append(mark)
1611 line.append(event)
1612 line.append(ro_task.get("_id", ""))
1613 line.append(str(ro_task.get("locked_at", "")))
1614 line.append(str(ro_task.get("modified_at", "")))
1615 line.append(str(ro_task.get("created_at", "")))
1616 line.append(str(ro_task.get("to_check_at", "")))
1617 line.append(str(ro_task.get("locked_by", "")))
1618 line.append(str(ro_task.get("target_id", "")))
1619 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1620 line.append(str(ro_task.get("vim_info", "")))
1621 line.append(str(ro_task.get("tasks", "")))
1622 if isinstance(t, dict):
1623 line.append(str(t.get("status", "")))
1624 line.append(str(t.get("action_id", "")))
1625 line.append(str(i))
1626 line.append(str(t.get("task_id", "")))
1627 line.append(str(t.get("action", "")))
1628 line.append(str(t.get("item", "")))
1629 line.append(str(t.get("find_params", "")))
1630 line.append(str(t.get("params", "")))
1631 else:
1632 line.extend([""] * 2)
1633 line.append(str(i))
1634 line.extend([""] * 5)
1635
1636 i += 1
1637 self.logger.debug(";".join(line))
1638 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1639 i = 0
1640 while True:
1641 st = "tasks.{}.status".format(i)
1642 if st not in db_ro_task_update:
1643 break
1644 line.clear()
1645 line.append(mark)
1646 line.append(event)
1647 line.append(db_ro_task_update.get("_id", ""))
1648 line.append(str(db_ro_task_update.get("locked_at", "")))
1649 line.append(str(db_ro_task_update.get("modified_at", "")))
1650 line.append("")
1651 line.append(str(db_ro_task_update.get("to_check_at", "")))
1652 line.append(str(db_ro_task_update.get("locked_by", "")))
1653 line.append("")
1654 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1655 line.append("")
1656 line.append(str(db_ro_task_update.get("vim_info", "")))
1657 line.append(str(str(db_ro_task_update).count(".status")))
1658 line.append(db_ro_task_update.get(st, ""))
1659 line.append("")
1660 line.append(str(i))
1661 line.extend([""] * 3)
1662 i += 1
1663 self.logger.debug(";".join(line))
1664
1665 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1666 line.clear()
1667 line.append(mark)
1668 line.append(event)
1669 line.append(db_ro_task_delete.get("_id", ""))
1670 line.append("")
1671 line.append(db_ro_task_delete.get("modified_at", ""))
1672 line.extend([""] * 13)
1673 self.logger.debug(";".join(line))
1674
1675 else:
1676 line.clear()
1677 line.append(mark)
1678 line.append(event)
1679 line.extend([""] * 16)
1680 self.logger.debug(";".join(line))
1681
1682 except Exception as e:
1683 self.logger.error("Error logging ro_task: {}".format(e))
1684
1685 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1686 """
1687 Determine if this task need to be done or superseded
1688 :return: None
1689 """
1690 my_task = ro_task["tasks"][task_index]
1691 task_id = my_task["task_id"]
1692 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1693 "created_items", False
1694 )
1695
1696 if my_task["status"] == "FAILED":
1697 return None, None # TODO need to be retry??
1698
1699 try:
1700 for index, task in enumerate(ro_task["tasks"]):
1701 if index == task_index or not task:
1702 continue # own task
1703
1704 if (
1705 my_task["target_record"] == task["target_record"]
1706 and task["action"] == "CREATE"
1707 ):
1708 # set to finished
1709 db_update["tasks.{}.status".format(index)] = task[
1710 "status"
1711 ] = "FINISHED"
1712 elif task["action"] == "CREATE" and task["status"] not in (
1713 "FINISHED",
1714 "SUPERSEDED",
1715 ):
1716 needed_delete = False
1717
1718 if needed_delete:
1719 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1720 else:
1721 return "SUPERSEDED", None
1722 except Exception as e:
1723 if not isinstance(e, NsWorkerException):
1724 self.logger.critical(
1725 "Unexpected exception at _delete_task task={}: {}".format(
1726 task_id, e
1727 ),
1728 exc_info=True,
1729 )
1730
1731 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1732
1733 def _create_task(self, ro_task, task_index, task_depends, db_update):
1734 """
1735 Determine if this task need to create something at VIM
1736 :return: None
1737 """
1738 my_task = ro_task["tasks"][task_index]
1739 task_id = my_task["task_id"]
1740 task_status = None
1741
1742 if my_task["status"] == "FAILED":
1743 return None, None # TODO need to be retry??
1744 elif my_task["status"] == "SCHEDULED":
1745 # check if already created by another task
1746 for index, task in enumerate(ro_task["tasks"]):
1747 if index == task_index or not task:
1748 continue # own task
1749
1750 if task["action"] == "CREATE" and task["status"] not in (
1751 "SCHEDULED",
1752 "FINISHED",
1753 "SUPERSEDED",
1754 ):
1755 return task["status"], "COPY_VIM_INFO"
1756
1757 try:
1758 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1759 ro_task, task_index, task_depends
1760 )
1761 # TODO update other CREATE tasks
1762 except Exception as e:
1763 if not isinstance(e, NsWorkerException):
1764 self.logger.error(
1765 "Error executing task={}: {}".format(task_id, e), exc_info=True
1766 )
1767
1768 task_status = "FAILED"
1769 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1770 # TODO update ro_vim_item_update
1771
1772 return task_status, ro_vim_item_update
1773 else:
1774 return None, None
1775
1776 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1777 """
1778 Look for dependency task
1779 :param task_id: Can be one of
1780 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1781 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1782 3. task.task_id: "<action_id>:number"
1783 :param ro_task:
1784 :param target_id:
1785 :return: database ro_task plus index of task
1786 """
1787 if (
1788 task_id.startswith("vim:")
1789 or task_id.startswith("sdn:")
1790 or task_id.startswith("wim:")
1791 ):
1792 target_id, _, task_id = task_id.partition(" ")
1793
1794 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1795 ro_task_dependency = self.db.get_one(
1796 "ro_tasks",
1797 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1798 fail_on_empty=False,
1799 )
1800
1801 if ro_task_dependency:
1802 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1803 if task["target_record_id"] == task_id:
1804 return ro_task_dependency, task_index
1805
1806 else:
1807 if ro_task:
1808 for task_index, task in enumerate(ro_task["tasks"]):
1809 if task and task["task_id"] == task_id:
1810 return ro_task, task_index
1811
1812 ro_task_dependency = self.db.get_one(
1813 "ro_tasks",
1814 q_filter={
1815 "tasks.ANYINDEX.task_id": task_id,
1816 "tasks.ANYINDEX.target_record.ne": None,
1817 },
1818 fail_on_empty=False,
1819 )
1820
1821 if ro_task_dependency:
1822 for task_index, task in ro_task_dependency["tasks"]:
1823 if task["task_id"] == task_id:
1824 return ro_task_dependency, task_index
1825 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1826
1827 def _process_pending_tasks(self, ro_task):
1828 ro_task_id = ro_task["_id"]
1829 now = time.time()
1830 # one day
1831 next_check_at = now + (24 * 60 * 60)
1832 db_ro_task_update = {}
1833
1834 def _update_refresh(new_status):
1835 # compute next_refresh
1836 nonlocal task
1837 nonlocal next_check_at
1838 nonlocal db_ro_task_update
1839 nonlocal ro_task
1840
1841 next_refresh = time.time()
1842
1843 if task["item"] in ("image", "flavor"):
1844 next_refresh += self.REFRESH_IMAGE
1845 elif new_status == "BUILD":
1846 next_refresh += self.REFRESH_BUILD
1847 elif new_status == "DONE":
1848 next_refresh += self.REFRESH_ACTIVE
1849 else:
1850 next_refresh += self.REFRESH_ERROR
1851
1852 next_check_at = min(next_check_at, next_refresh)
1853 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1854 ro_task["vim_info"]["refresh_at"] = next_refresh
1855
1856 try:
1857 """
1858 # Log RO tasks only when loglevel is DEBUG
1859 if self.logger.getEffectiveLevel() == logging.DEBUG:
1860 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1861 """
1862 # 0: get task_status_create
1863 lock_object = None
1864 task_status_create = None
1865 task_create = next(
1866 (
1867 t
1868 for t in ro_task["tasks"]
1869 if t
1870 and t["action"] == "CREATE"
1871 and t["status"] in ("BUILD", "DONE")
1872 ),
1873 None,
1874 )
1875
1876 if task_create:
1877 task_status_create = task_create["status"]
1878
1879 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1880 for task_action in ("DELETE", "CREATE", "EXEC"):
1881 db_vim_update = None
1882 new_status = None
1883
1884 for task_index, task in enumerate(ro_task["tasks"]):
1885 if not task:
1886 continue # task deleted
1887
1888 task_depends = {}
1889 target_update = None
1890
1891 if (
1892 (
1893 task_action in ("DELETE", "EXEC")
1894 and task["status"] not in ("SCHEDULED", "BUILD")
1895 )
1896 or task["action"] != task_action
1897 or (
1898 task_action == "CREATE"
1899 and task["status"] in ("FINISHED", "SUPERSEDED")
1900 )
1901 ):
1902 continue
1903
1904 task_path = "tasks.{}.status".format(task_index)
1905 try:
1906 db_vim_info_update = None
1907
1908 if task["status"] == "SCHEDULED":
1909 # check if tasks that this depends on have been completed
1910 dependency_not_completed = False
1911
1912 for dependency_task_id in task.get("depends_on") or ():
1913 (
1914 dependency_ro_task,
1915 dependency_task_index,
1916 ) = self._get_dependency(
1917 dependency_task_id, target_id=ro_task["target_id"]
1918 )
1919 dependency_task = dependency_ro_task["tasks"][
1920 dependency_task_index
1921 ]
1922
1923 if dependency_task["status"] == "SCHEDULED":
1924 dependency_not_completed = True
1925 next_check_at = min(
1926 next_check_at, dependency_ro_task["to_check_at"]
1927 )
1928 # must allow dependent task to be processed first
1929 # to do this set time after last_task_processed
1930 next_check_at = max(
1931 self.time_last_task_processed, next_check_at
1932 )
1933 break
1934 elif dependency_task["status"] == "FAILED":
1935 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1936 task["action"],
1937 task["item"],
1938 dependency_task["action"],
1939 dependency_task["item"],
1940 dependency_task_id,
1941 dependency_ro_task["vim_info"].get(
1942 "vim_details"
1943 ),
1944 )
1945 self.logger.error(
1946 "task={} {}".format(task["task_id"], error_text)
1947 )
1948 raise NsWorkerException(error_text)
1949
1950 task_depends[dependency_task_id] = dependency_ro_task[
1951 "vim_info"
1952 ]["vim_id"]
1953 task_depends[
1954 "TASK-{}".format(dependency_task_id)
1955 ] = dependency_ro_task["vim_info"]["vim_id"]
1956
1957 if dependency_not_completed:
1958 # TODO set at vim_info.vim_details that it is waiting
1959 continue
1960
1961 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
1962 # the task of renew this locking. It will update database locket_at periodically
1963 if not lock_object:
1964 lock_object = LockRenew.add_lock_object(
1965 "ro_tasks", ro_task, self
1966 )
1967
1968 if task["action"] == "DELETE":
1969 (new_status, db_vim_info_update,) = self._delete_task(
1970 ro_task, task_index, task_depends, db_ro_task_update
1971 )
1972 new_status = (
1973 "FINISHED" if new_status == "DONE" else new_status
1974 )
1975 # ^with FINISHED instead of DONE it will not be refreshing
1976
1977 if new_status in ("FINISHED", "SUPERSEDED"):
1978 target_update = "DELETE"
1979 elif task["action"] == "EXEC":
1980 (
1981 new_status,
1982 db_vim_info_update,
1983 db_task_update,
1984 ) = self.item2class[task["item"]].exec(
1985 ro_task, task_index, task_depends
1986 )
1987 new_status = (
1988 "FINISHED" if new_status == "DONE" else new_status
1989 )
1990 # ^with FINISHED instead of DONE it will not be refreshing
1991
1992 if db_task_update:
1993 # load into database the modified db_task_update "retries" and "next_retry"
1994 if db_task_update.get("retries"):
1995 db_ro_task_update[
1996 "tasks.{}.retries".format(task_index)
1997 ] = db_task_update["retries"]
1998
1999 next_check_at = time.time() + db_task_update.get(
2000 "next_retry", 60
2001 )
2002 target_update = None
2003 elif task["action"] == "CREATE":
2004 if task["status"] == "SCHEDULED":
2005 if task_status_create:
2006 new_status = task_status_create
2007 target_update = "COPY_VIM_INFO"
2008 else:
2009 new_status, db_vim_info_update = self.item2class[
2010 task["item"]
2011 ].new(ro_task, task_index, task_depends)
2012 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2013 _update_refresh(new_status)
2014 else:
2015 if (
2016 ro_task["vim_info"]["refresh_at"]
2017 and now > ro_task["vim_info"]["refresh_at"]
2018 ):
2019 new_status, db_vim_info_update = self.item2class[
2020 task["item"]
2021 ].refresh(ro_task)
2022 _update_refresh(new_status)
2023 else:
2024 # The refresh is updated to avoid set the value of "refresh_at" to
2025 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2026 # because it can happen that in this case the task is never processed
2027 _update_refresh(task["status"])
2028
2029 except Exception as e:
2030 new_status = "FAILED"
2031 db_vim_info_update = {
2032 "vim_status": "VIM_ERROR",
2033 "vim_details": str(e),
2034 }
2035
2036 if not isinstance(
2037 e, (NsWorkerException, vimconn.VimConnException)
2038 ):
2039 self.logger.error(
2040 "Unexpected exception at _delete_task task={}: {}".format(
2041 task["task_id"], e
2042 ),
2043 exc_info=True,
2044 )
2045
2046 try:
2047 if db_vim_info_update:
2048 db_vim_update = db_vim_info_update.copy()
2049 db_ro_task_update.update(
2050 {
2051 "vim_info." + k: v
2052 for k, v in db_vim_info_update.items()
2053 }
2054 )
2055 ro_task["vim_info"].update(db_vim_info_update)
2056
2057 if new_status:
2058 if task_action == "CREATE":
2059 task_status_create = new_status
2060 db_ro_task_update[task_path] = new_status
2061
2062 if target_update or db_vim_update:
2063 if target_update == "DELETE":
2064 self._update_target(task, None)
2065 elif target_update == "COPY_VIM_INFO":
2066 self._update_target(task, ro_task["vim_info"])
2067 else:
2068 self._update_target(task, db_vim_update)
2069
2070 except Exception as e:
2071 if (
2072 isinstance(e, DbException)
2073 and e.http_code == HTTPStatus.NOT_FOUND
2074 ):
2075 # if the vnfrs or nsrs has been removed from database, this task must be removed
2076 self.logger.debug(
2077 "marking to delete task={}".format(task["task_id"])
2078 )
2079 self.tasks_to_delete.append(task)
2080 else:
2081 self.logger.error(
2082 "Unexpected exception at _update_target task={}: {}".format(
2083 task["task_id"], e
2084 ),
2085 exc_info=True,
2086 )
2087
2088 locked_at = ro_task["locked_at"]
2089
2090 if lock_object:
2091 locked_at = [
2092 lock_object["locked_at"],
2093 lock_object["locked_at"] + self.task_locked_time,
2094 ]
2095 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2096 # contain exactly locked_at + self.task_locked_time
2097 LockRenew.remove_lock_object(lock_object)
2098
2099 q_filter = {
2100 "_id": ro_task["_id"],
2101 "to_check_at": ro_task["to_check_at"],
2102 "locked_at": locked_at,
2103 }
2104 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2105 # outside this task (by ro_nbi) do not update it
2106 db_ro_task_update["locked_by"] = None
2107 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2108 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2109 db_ro_task_update["modified_at"] = now
2110 db_ro_task_update["to_check_at"] = next_check_at
2111
2112 """
2113 # Log RO tasks only when loglevel is DEBUG
2114 if self.logger.getEffectiveLevel() == logging.DEBUG:
2115 db_ro_task_update_log = db_ro_task_update.copy()
2116 db_ro_task_update_log["_id"] = q_filter["_id"]
2117 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2118 """
2119
2120 if not self.db.set_one(
2121 "ro_tasks",
2122 update_dict=db_ro_task_update,
2123 q_filter=q_filter,
2124 fail_on_empty=False,
2125 ):
2126 del db_ro_task_update["to_check_at"]
2127 del q_filter["to_check_at"]
2128 """
2129 # Log RO tasks only when loglevel is DEBUG
2130 if self.logger.getEffectiveLevel() == logging.DEBUG:
2131 self._log_ro_task(
2132 None,
2133 db_ro_task_update_log,
2134 None,
2135 "TASK_WF",
2136 "SET_TASK " + str(q_filter),
2137 )
2138 """
2139 self.db.set_one(
2140 "ro_tasks",
2141 q_filter=q_filter,
2142 update_dict=db_ro_task_update,
2143 fail_on_empty=True,
2144 )
2145 except DbException as e:
2146 self.logger.error(
2147 "ro_task={} Error updating database {}".format(ro_task_id, e)
2148 )
2149 except Exception as e:
2150 self.logger.error(
2151 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2152 )
2153
2154 def _update_target(self, task, ro_vim_item_update):
2155 table, _, temp = task["target_record"].partition(":")
2156 _id, _, path_vim_status = temp.partition(":")
2157 path_item = path_vim_status[: path_vim_status.rfind(".")]
2158 path_item = path_item[: path_item.rfind(".")]
2159 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2160 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2161
2162 if ro_vim_item_update:
2163 update_dict = {
2164 path_vim_status + "." + k: v
2165 for k, v in ro_vim_item_update.items()
2166 if k
2167 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2168 }
2169
2170 if path_vim_status.startswith("vdur."):
2171 # for backward compatibility, add vdur.name apart from vdur.vim_name
2172 if ro_vim_item_update.get("vim_name"):
2173 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2174
2175 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2176 if ro_vim_item_update.get("vim_id"):
2177 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2178
2179 # update general status
2180 if ro_vim_item_update.get("vim_status"):
2181 update_dict[path_item + ".status"] = ro_vim_item_update[
2182 "vim_status"
2183 ]
2184
2185 if ro_vim_item_update.get("interfaces"):
2186 path_interfaces = path_item + ".interfaces"
2187
2188 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2189 if iface:
2190 update_dict.update(
2191 {
2192 path_interfaces + ".{}.".format(i) + k: v
2193 for k, v in iface.items()
2194 if k in ("vlan", "compute_node", "pci")
2195 }
2196 )
2197
2198 # put ip_address and mac_address with ip-address and mac-address
2199 if iface.get("ip_address"):
2200 update_dict[
2201 path_interfaces + ".{}.".format(i) + "ip-address"
2202 ] = iface["ip_address"]
2203
2204 if iface.get("mac_address"):
2205 update_dict[
2206 path_interfaces + ".{}.".format(i) + "mac-address"
2207 ] = iface["mac_address"]
2208
2209 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2210 update_dict["ip-address"] = iface.get("ip_address").split(
2211 ";"
2212 )[0]
2213
2214 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2215 update_dict[path_item + ".ip-address"] = iface.get(
2216 "ip_address"
2217 ).split(";")[0]
2218
2219 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2220 else:
2221 update_dict = {path_item + ".status": "DELETED"}
2222 self.db.set_one(
2223 table,
2224 q_filter={"_id": _id},
2225 update_dict=update_dict,
2226 unset={path_vim_status: None},
2227 )
2228
2229 def _process_delete_db_tasks(self):
2230 """
2231 Delete task from database because vnfrs or nsrs or both have been deleted
2232 :return: None. Uses and modify self.tasks_to_delete
2233 """
2234 while self.tasks_to_delete:
2235 task = self.tasks_to_delete[0]
2236 vnfrs_deleted = None
2237 nsr_id = task["nsr_id"]
2238
2239 if task["target_record"].startswith("vnfrs:"):
2240 # check if nsrs is present
2241 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2242 vnfrs_deleted = task["target_record"].split(":")[1]
2243
2244 try:
2245 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2246 except Exception as e:
2247 self.logger.error(
2248 "Error deleting task={}: {}".format(task["task_id"], e)
2249 )
2250 self.tasks_to_delete.pop(0)
2251
2252 @staticmethod
2253 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2254 """
2255 Static method because it is called from osm_ng_ro.ns
2256 :param db: instance of database to use
2257 :param nsr_id: affected nsrs id
2258 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2259 :return: None, exception is fails
2260 """
2261 retries = 5
2262 for retry in range(retries):
2263 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2264 now = time.time()
2265 conflict = False
2266
2267 for ro_task in ro_tasks:
2268 db_update = {}
2269 to_delete_ro_task = True
2270
2271 for index, task in enumerate(ro_task["tasks"]):
2272 if not task:
2273 pass
2274 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2275 vnfrs_deleted
2276 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2277 ):
2278 db_update["tasks.{}".format(index)] = None
2279 else:
2280 # used by other nsr, ro_task cannot be deleted
2281 to_delete_ro_task = False
2282
2283 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2284 if to_delete_ro_task:
2285 if not db.del_one(
2286 "ro_tasks",
2287 q_filter={
2288 "_id": ro_task["_id"],
2289 "modified_at": ro_task["modified_at"],
2290 },
2291 fail_on_empty=False,
2292 ):
2293 conflict = True
2294 elif db_update:
2295 db_update["modified_at"] = now
2296 if not db.set_one(
2297 "ro_tasks",
2298 q_filter={
2299 "_id": ro_task["_id"],
2300 "modified_at": ro_task["modified_at"],
2301 },
2302 update_dict=db_update,
2303 fail_on_empty=False,
2304 ):
2305 conflict = True
2306 if not conflict:
2307 return
2308 else:
2309 raise NsWorkerException("Exceeded {} retries".format(retries))
2310
2311 def run(self):
2312 # load database
2313 self.logger.info("Starting")
2314 while True:
2315 # step 1: get commands from queue
2316 try:
2317 if self.vim_targets:
2318 task = self.task_queue.get(block=False)
2319 else:
2320 if not self.idle:
2321 self.logger.debug("enters in idle state")
2322 self.idle = True
2323 task = self.task_queue.get(block=True)
2324 self.idle = False
2325
2326 if task[0] == "terminate":
2327 break
2328 elif task[0] == "load_vim":
2329 self.logger.info("order to load vim {}".format(task[1]))
2330 self._load_vim(task[1])
2331 elif task[0] == "unload_vim":
2332 self.logger.info("order to unload vim {}".format(task[1]))
2333 self._unload_vim(task[1])
2334 elif task[0] == "reload_vim":
2335 self._reload_vim(task[1])
2336 elif task[0] == "check_vim":
2337 self.logger.info("order to check vim {}".format(task[1]))
2338 self._check_vim(task[1])
2339 continue
2340 except Exception as e:
2341 if isinstance(e, queue.Empty):
2342 pass
2343 else:
2344 self.logger.critical(
2345 "Error processing task: {}".format(e), exc_info=True
2346 )
2347
2348 # step 2: process pending_tasks, delete not needed tasks
2349 try:
2350 if self.tasks_to_delete:
2351 self._process_delete_db_tasks()
2352 busy = False
2353 """
2354 # Log RO tasks only when loglevel is DEBUG
2355 if self.logger.getEffectiveLevel() == logging.DEBUG:
2356 _ = self._get_db_all_tasks()
2357 """
2358 ro_task = self._get_db_task()
2359 if ro_task:
2360 self._process_pending_tasks(ro_task)
2361 busy = True
2362 if not busy:
2363 time.sleep(5)
2364 except Exception as e:
2365 self.logger.critical(
2366 "Unexpected exception at run: " + str(e), exc_info=True
2367 )
2368
2369 self.logger.info("Finishing")