Add log to trace task workflow
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45 __author__ = "Alfonso Tierno"
46 __date__ = "$28-Sep-2017 12:07:15$"
47
48
49 def deep_get(target_dict, *args, **kwargs):
50 """
51 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
52 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
53 :param target_dict: dictionary to be read
54 :param args: list of keys to read from target_dict
55 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
56 :return: The wanted value if exist, None or default otherwise
57 """
58 for key in args:
59 if not isinstance(target_dict, dict) or key not in target_dict:
60 return kwargs.get("default")
61 target_dict = target_dict[key]
62 return target_dict
63
64
65 class NsWorkerException(Exception):
66 pass
67
68
69 class FailingConnector:
70 def __init__(self, error_msg):
71 self.error_msg = error_msg
72
73 for method in dir(vimconn.VimConnector):
74 if method[0] != "_":
75 setattr(
76 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
77 )
78
79 for method in dir(sdnconn.SdnConnectorBase):
80 if method[0] != "_":
81 setattr(
82 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
83 )
84
85
86 class NsWorkerExceptionNotFound(NsWorkerException):
87 pass
88
89
90 class VimInteractionBase:
91 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
92 It implements methods that does nothing and return ok"""
93
94 def __init__(self, db, my_vims, db_vims, logger):
95 self.db = db
96 self.logger = logger
97 self.my_vims = my_vims
98 self.db_vims = db_vims
99
100 def new(self, ro_task, task_index, task_depends):
101 return "BUILD", {}
102
103 def refresh(self, ro_task):
104 """skip calling VIM to get image, flavor status. Assumes ok"""
105 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
106 return "FAILED", {}
107
108 return "DONE", {}
109
110 def delete(self, ro_task, task_index):
111 """skip calling VIM to delete image. Assumes ok"""
112 return "DONE", {}
113
114 def exec(self, ro_task, task_index, task_depends):
115 return "DONE", None, None
116
117
118 class VimInteractionNet(VimInteractionBase):
119 def new(self, ro_task, task_index, task_depends):
120 vim_net_id = None
121 task = ro_task["tasks"][task_index]
122 task_id = task["task_id"]
123 created = False
124 created_items = {}
125 target_vim = self.my_vims[ro_task["target_id"]]
126 mgmtnet = False
127 mgmtnet_defined_in_vim = False
128
129 try:
130 # FIND
131 if task.get("find_params"):
132 # if management, get configuration of VIM
133 if task["find_params"].get("filter_dict"):
134 vim_filter = task["find_params"]["filter_dict"]
135 # management network
136 elif task["find_params"].get("mgmt"):
137 mgmtnet = True
138 if deep_get(
139 self.db_vims[ro_task["target_id"]],
140 "config",
141 "management_network_id",
142 ):
143 mgmtnet_defined_in_vim = True
144 vim_filter = {
145 "id": self.db_vims[ro_task["target_id"]]["config"][
146 "management_network_id"
147 ]
148 }
149 elif deep_get(
150 self.db_vims[ro_task["target_id"]],
151 "config",
152 "management_network_name",
153 ):
154 mgmtnet_defined_in_vim = True
155 vim_filter = {
156 "name": self.db_vims[ro_task["target_id"]]["config"][
157 "management_network_name"
158 ]
159 }
160 else:
161 vim_filter = {"name": task["find_params"]["name"]}
162 else:
163 raise NsWorkerExceptionNotFound(
164 "Invalid find_params for new_net {}".format(task["find_params"])
165 )
166
167 vim_nets = target_vim.get_network_list(vim_filter)
168 if not vim_nets and not task.get("params"):
169 # If there is mgmt-network in the descriptor,
170 # there is no mapping of that network to a VIM network in the descriptor,
171 # also there is no mapping in the "--config" parameter or at VIM creation;
172 # that mgmt-network will be created.
173 if mgmtnet and not mgmtnet_defined_in_vim:
174 net_name = (
175 vim_filter.get("name")
176 if vim_filter.get("name")
177 else vim_filter.get("id")[:16]
178 )
179 vim_net_id, created_items = target_vim.new_network(
180 net_name, None
181 )
182 self.logger.debug(
183 "Created mgmt network vim_net_id: {}".format(vim_net_id)
184 )
185 created = True
186 else:
187 raise NsWorkerExceptionNotFound(
188 "Network not found with this criteria: '{}'".format(
189 task.get("find_params")
190 )
191 )
192 elif len(vim_nets) > 1:
193 raise NsWorkerException(
194 "More than one network found with this criteria: '{}'".format(
195 task["find_params"]
196 )
197 )
198
199 if vim_nets:
200 vim_net_id = vim_nets[0]["id"]
201 else:
202 # CREATE
203 params = task["params"]
204 vim_net_id, created_items = target_vim.new_network(**params)
205 created = True
206
207 ro_vim_item_update = {
208 "vim_id": vim_net_id,
209 "vim_status": "BUILD",
210 "created": created,
211 "created_items": created_items,
212 "vim_details": None,
213 }
214 self.logger.debug(
215 "task={} {} new-net={} created={}".format(
216 task_id, ro_task["target_id"], vim_net_id, created
217 )
218 )
219
220 return "BUILD", ro_vim_item_update
221 except (vimconn.VimConnException, NsWorkerException) as e:
222 self.logger.error(
223 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
224 )
225 ro_vim_item_update = {
226 "vim_status": "VIM_ERROR",
227 "created": created,
228 "vim_details": str(e),
229 }
230
231 return "FAILED", ro_vim_item_update
232
233 def refresh(self, ro_task):
234 """Call VIM to get network status"""
235 ro_task_id = ro_task["_id"]
236 target_vim = self.my_vims[ro_task["target_id"]]
237 vim_id = ro_task["vim_info"]["vim_id"]
238 net_to_refresh_list = [vim_id]
239
240 try:
241 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
242 vim_info = vim_dict[vim_id]
243
244 if vim_info["status"] == "ACTIVE":
245 task_status = "DONE"
246 elif vim_info["status"] == "BUILD":
247 task_status = "BUILD"
248 else:
249 task_status = "FAILED"
250 except vimconn.VimConnException as e:
251 # Mark all tasks at VIM_ERROR status
252 self.logger.error(
253 "ro_task={} vim={} get-net={}: {}".format(
254 ro_task_id, ro_task["target_id"], vim_id, e
255 )
256 )
257 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
258 task_status = "FAILED"
259
260 ro_vim_item_update = {}
261 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
262 ro_vim_item_update["vim_status"] = vim_info["status"]
263
264 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
265 ro_vim_item_update["vim_name"] = vim_info.get("name")
266
267 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
268 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
269 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
270 elif vim_info["status"] == "DELETED":
271 ro_vim_item_update["vim_id"] = None
272 ro_vim_item_update["vim_details"] = "Deleted externally"
273 else:
274 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
275 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
276
277 if ro_vim_item_update:
278 self.logger.debug(
279 "ro_task={} {} get-net={}: status={} {}".format(
280 ro_task_id,
281 ro_task["target_id"],
282 vim_id,
283 ro_vim_item_update.get("vim_status"),
284 ro_vim_item_update.get("vim_details")
285 if ro_vim_item_update.get("vim_status") != "ACTIVE"
286 else "",
287 )
288 )
289
290 return task_status, ro_vim_item_update
291
292 def delete(self, ro_task, task_index):
293 task = ro_task["tasks"][task_index]
294 task_id = task["task_id"]
295 net_vim_id = ro_task["vim_info"]["vim_id"]
296 ro_vim_item_update_ok = {
297 "vim_status": "DELETED",
298 "created": False,
299 "vim_details": "DELETED",
300 "vim_id": None,
301 }
302
303 try:
304 if net_vim_id or ro_task["vim_info"]["created_items"]:
305 target_vim = self.my_vims[ro_task["target_id"]]
306 target_vim.delete_network(
307 net_vim_id, ro_task["vim_info"]["created_items"]
308 )
309 except vimconn.VimConnNotFoundException:
310 ro_vim_item_update_ok["vim_details"] = "already deleted"
311 except vimconn.VimConnException as e:
312 self.logger.error(
313 "ro_task={} vim={} del-net={}: {}".format(
314 ro_task["_id"], ro_task["target_id"], net_vim_id, e
315 )
316 )
317 ro_vim_item_update = {
318 "vim_status": "VIM_ERROR",
319 "vim_details": "Error while deleting: {}".format(e),
320 }
321
322 return "FAILED", ro_vim_item_update
323
324 self.logger.debug(
325 "task={} {} del-net={} {}".format(
326 task_id,
327 ro_task["target_id"],
328 net_vim_id,
329 ro_vim_item_update_ok.get("vim_details", ""),
330 )
331 )
332
333 return "DONE", ro_vim_item_update_ok
334
335
336 class VimInteractionVdu(VimInteractionBase):
337 max_retries_inject_ssh_key = 20 # 20 times
338 time_retries_inject_ssh_key = 30 # wevery 30 seconds
339
340 def new(self, ro_task, task_index, task_depends):
341 task = ro_task["tasks"][task_index]
342 task_id = task["task_id"]
343 created = False
344 created_items = {}
345 target_vim = self.my_vims[ro_task["target_id"]]
346
347 try:
348 created = True
349 params = task["params"]
350 params_copy = deepcopy(params)
351 net_list = params_copy["net_list"]
352
353 for net in net_list:
354 # change task_id into network_id
355 if "net_id" in net and net["net_id"].startswith("TASK-"):
356 network_id = task_depends[net["net_id"]]
357
358 if not network_id:
359 raise NsWorkerException(
360 "Cannot create VM because depends on a network not created or found "
361 "for {}".format(net["net_id"])
362 )
363
364 net["net_id"] = network_id
365
366 if params_copy["image_id"].startswith("TASK-"):
367 params_copy["image_id"] = task_depends[params_copy["image_id"]]
368
369 if params_copy["flavor_id"].startswith("TASK-"):
370 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
371
372 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
373 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
374
375 ro_vim_item_update = {
376 "vim_id": vim_vm_id,
377 "vim_status": "BUILD",
378 "created": created,
379 "created_items": created_items,
380 "vim_details": None,
381 "interfaces_vim_ids": interfaces,
382 "interfaces": [],
383 }
384 self.logger.debug(
385 "task={} {} new-vm={} created={}".format(
386 task_id, ro_task["target_id"], vim_vm_id, created
387 )
388 )
389
390 return "BUILD", ro_vim_item_update
391 except (vimconn.VimConnException, NsWorkerException) as e:
392 self.logger.error(
393 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
394 )
395 ro_vim_item_update = {
396 "vim_status": "VIM_ERROR",
397 "created": created,
398 "vim_details": str(e),
399 }
400
401 return "FAILED", ro_vim_item_update
402
403 def delete(self, ro_task, task_index):
404 task = ro_task["tasks"][task_index]
405 task_id = task["task_id"]
406 vm_vim_id = ro_task["vim_info"]["vim_id"]
407 ro_vim_item_update_ok = {
408 "vim_status": "DELETED",
409 "created": False,
410 "vim_details": "DELETED",
411 "vim_id": None,
412 }
413
414 try:
415 if vm_vim_id or ro_task["vim_info"]["created_items"]:
416 target_vim = self.my_vims[ro_task["target_id"]]
417 target_vim.delete_vminstance(
418 vm_vim_id, ro_task["vim_info"]["created_items"]
419 )
420 except vimconn.VimConnNotFoundException:
421 ro_vim_item_update_ok["vim_details"] = "already deleted"
422 except vimconn.VimConnException as e:
423 self.logger.error(
424 "ro_task={} vim={} del-vm={}: {}".format(
425 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
426 )
427 )
428 ro_vim_item_update = {
429 "vim_status": "VIM_ERROR",
430 "vim_details": "Error while deleting: {}".format(e),
431 }
432
433 return "FAILED", ro_vim_item_update
434
435 self.logger.debug(
436 "task={} {} del-vm={} {}".format(
437 task_id,
438 ro_task["target_id"],
439 vm_vim_id,
440 ro_vim_item_update_ok.get("vim_details", ""),
441 )
442 )
443
444 return "DONE", ro_vim_item_update_ok
445
446 def refresh(self, ro_task):
447 """Call VIM to get vm status"""
448 ro_task_id = ro_task["_id"]
449 target_vim = self.my_vims[ro_task["target_id"]]
450 vim_id = ro_task["vim_info"]["vim_id"]
451
452 if not vim_id:
453 return None, None
454
455 vm_to_refresh_list = [vim_id]
456 try:
457 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
458 vim_info = vim_dict[vim_id]
459
460 if vim_info["status"] == "ACTIVE":
461 task_status = "DONE"
462 elif vim_info["status"] == "BUILD":
463 task_status = "BUILD"
464 else:
465 task_status = "FAILED"
466
467 # try to load and parse vim_information
468 try:
469 vim_info_info = yaml.safe_load(vim_info["vim_info"])
470 if vim_info_info.get("name"):
471 vim_info["name"] = vim_info_info["name"]
472 except Exception:
473 pass
474 except vimconn.VimConnException as e:
475 # Mark all tasks at VIM_ERROR status
476 self.logger.error(
477 "ro_task={} vim={} get-vm={}: {}".format(
478 ro_task_id, ro_task["target_id"], vim_id, e
479 )
480 )
481 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
482 task_status = "FAILED"
483
484 ro_vim_item_update = {}
485
486 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
487 vim_interfaces = []
488 if vim_info.get("interfaces"):
489 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
490 iface = next(
491 (
492 iface
493 for iface in vim_info["interfaces"]
494 if vim_iface_id == iface["vim_interface_id"]
495 ),
496 None,
497 )
498 # if iface:
499 # iface.pop("vim_info", None)
500 vim_interfaces.append(iface)
501
502 task_create = next(
503 t
504 for t in ro_task["tasks"]
505 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
506 )
507 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
508 vim_interfaces[task_create["mgmt_vnf_interface"]][
509 "mgmt_vnf_interface"
510 ] = True
511
512 mgmt_vdu_iface = task_create.get(
513 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
514 )
515 if vim_interfaces:
516 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
517
518 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
519 ro_vim_item_update["interfaces"] = vim_interfaces
520
521 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
522 ro_vim_item_update["vim_status"] = vim_info["status"]
523
524 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
525 ro_vim_item_update["vim_name"] = vim_info.get("name")
526
527 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
528 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
529 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
530 elif vim_info["status"] == "DELETED":
531 ro_vim_item_update["vim_id"] = None
532 ro_vim_item_update["vim_details"] = "Deleted externally"
533 else:
534 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
535 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
536
537 if ro_vim_item_update:
538 self.logger.debug(
539 "ro_task={} {} get-vm={}: status={} {}".format(
540 ro_task_id,
541 ro_task["target_id"],
542 vim_id,
543 ro_vim_item_update.get("vim_status"),
544 ro_vim_item_update.get("vim_details")
545 if ro_vim_item_update.get("vim_status") != "ACTIVE"
546 else "",
547 )
548 )
549
550 return task_status, ro_vim_item_update
551
552 def exec(self, ro_task, task_index, task_depends):
553 task = ro_task["tasks"][task_index]
554 task_id = task["task_id"]
555 target_vim = self.my_vims[ro_task["target_id"]]
556 db_task_update = {"retries": 0}
557 retries = task.get("retries", 0)
558
559 try:
560 params = task["params"]
561 params_copy = deepcopy(params)
562 params_copy["ro_key"] = self.db.decrypt(
563 params_copy.pop("private_key"),
564 params_copy.pop("schema_version"),
565 params_copy.pop("salt"),
566 )
567 params_copy["ip_addr"] = params_copy.pop("ip_address")
568 target_vim.inject_user_key(**params_copy)
569 self.logger.debug(
570 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
571 )
572
573 return (
574 "DONE",
575 None,
576 db_task_update,
577 ) # params_copy["key"]
578 except (vimconn.VimConnException, NsWorkerException) as e:
579 retries += 1
580
581 if retries < self.max_retries_inject_ssh_key:
582 return (
583 "BUILD",
584 None,
585 {
586 "retries": retries,
587 "next_retry": self.time_retries_inject_ssh_key,
588 },
589 )
590
591 self.logger.error(
592 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
593 )
594 ro_vim_item_update = {"vim_details": str(e)}
595
596 return "FAILED", ro_vim_item_update, db_task_update
597
598
599 class VimInteractionImage(VimInteractionBase):
600 def new(self, ro_task, task_index, task_depends):
601 task = ro_task["tasks"][task_index]
602 task_id = task["task_id"]
603 created = False
604 created_items = {}
605 target_vim = self.my_vims[ro_task["target_id"]]
606
607 try:
608 # FIND
609 if task.get("find_params"):
610 vim_images = target_vim.get_image_list(**task["find_params"])
611
612 if not vim_images:
613 raise NsWorkerExceptionNotFound(
614 "Image not found with this criteria: '{}'".format(
615 task["find_params"]
616 )
617 )
618 elif len(vim_images) > 1:
619 raise NsWorkerException(
620 "More than one image found with this criteria: '{}'".format(
621 task["find_params"]
622 )
623 )
624 else:
625 vim_image_id = vim_images[0]["id"]
626
627 ro_vim_item_update = {
628 "vim_id": vim_image_id,
629 "vim_status": "DONE",
630 "created": created,
631 "created_items": created_items,
632 "vim_details": None,
633 }
634 self.logger.debug(
635 "task={} {} new-image={} created={}".format(
636 task_id, ro_task["target_id"], vim_image_id, created
637 )
638 )
639
640 return "DONE", ro_vim_item_update
641 except (NsWorkerException, vimconn.VimConnException) as e:
642 self.logger.error(
643 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
644 )
645 ro_vim_item_update = {
646 "vim_status": "VIM_ERROR",
647 "created": created,
648 "vim_details": str(e),
649 }
650
651 return "FAILED", ro_vim_item_update
652
653
654 class VimInteractionFlavor(VimInteractionBase):
655 def delete(self, ro_task, task_index):
656 task = ro_task["tasks"][task_index]
657 task_id = task["task_id"]
658 flavor_vim_id = ro_task["vim_info"]["vim_id"]
659 ro_vim_item_update_ok = {
660 "vim_status": "DELETED",
661 "created": False,
662 "vim_details": "DELETED",
663 "vim_id": None,
664 }
665
666 try:
667 if flavor_vim_id:
668 target_vim = self.my_vims[ro_task["target_id"]]
669 target_vim.delete_flavor(flavor_vim_id)
670 except vimconn.VimConnNotFoundException:
671 ro_vim_item_update_ok["vim_details"] = "already deleted"
672 except vimconn.VimConnException as e:
673 self.logger.error(
674 "ro_task={} vim={} del-flavor={}: {}".format(
675 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
676 )
677 )
678 ro_vim_item_update = {
679 "vim_status": "VIM_ERROR",
680 "vim_details": "Error while deleting: {}".format(e),
681 }
682
683 return "FAILED", ro_vim_item_update
684
685 self.logger.debug(
686 "task={} {} del-flavor={} {}".format(
687 task_id,
688 ro_task["target_id"],
689 flavor_vim_id,
690 ro_vim_item_update_ok.get("vim_details", ""),
691 )
692 )
693
694 return "DONE", ro_vim_item_update_ok
695
696 def new(self, ro_task, task_index, task_depends):
697 task = ro_task["tasks"][task_index]
698 task_id = task["task_id"]
699 created = False
700 created_items = {}
701 target_vim = self.my_vims[ro_task["target_id"]]
702
703 try:
704 # FIND
705 vim_flavor_id = None
706
707 if task.get("find_params"):
708 try:
709 flavor_data = task["find_params"]["flavor_data"]
710 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
711 except vimconn.VimConnNotFoundException:
712 pass
713
714 if not vim_flavor_id and task.get("params"):
715 # CREATE
716 flavor_data = task["params"]["flavor_data"]
717 vim_flavor_id = target_vim.new_flavor(flavor_data)
718 created = True
719
720 ro_vim_item_update = {
721 "vim_id": vim_flavor_id,
722 "vim_status": "DONE",
723 "created": created,
724 "created_items": created_items,
725 "vim_details": None,
726 }
727 self.logger.debug(
728 "task={} {} new-flavor={} created={}".format(
729 task_id, ro_task["target_id"], vim_flavor_id, created
730 )
731 )
732
733 return "DONE", ro_vim_item_update
734 except (vimconn.VimConnException, NsWorkerException) as e:
735 self.logger.error(
736 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
737 )
738 ro_vim_item_update = {
739 "vim_status": "VIM_ERROR",
740 "created": created,
741 "vim_details": str(e),
742 }
743
744 return "FAILED", ro_vim_item_update
745
746
747 class VimInteractionSdnNet(VimInteractionBase):
748 @staticmethod
749 def _match_pci(port_pci, mapping):
750 """
751 Check if port_pci matches with mapping
752 mapping can have brackets to indicate that several chars are accepted. e.g
753 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
754 :param port_pci: text
755 :param mapping: text, can contain brackets to indicate several chars are available
756 :return: True if matches, False otherwise
757 """
758 if not port_pci or not mapping:
759 return False
760 if port_pci == mapping:
761 return True
762
763 mapping_index = 0
764 pci_index = 0
765 while True:
766 bracket_start = mapping.find("[", mapping_index)
767
768 if bracket_start == -1:
769 break
770
771 bracket_end = mapping.find("]", bracket_start)
772 if bracket_end == -1:
773 break
774
775 length = bracket_start - mapping_index
776 if (
777 length
778 and port_pci[pci_index : pci_index + length]
779 != mapping[mapping_index:bracket_start]
780 ):
781 return False
782
783 if (
784 port_pci[pci_index + length]
785 not in mapping[bracket_start + 1 : bracket_end]
786 ):
787 return False
788
789 pci_index += length + 1
790 mapping_index = bracket_end + 1
791
792 if port_pci[pci_index:] != mapping[mapping_index:]:
793 return False
794
795 return True
796
797 def _get_interfaces(self, vlds_to_connect, vim_account_id):
798 """
799 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
800 :param vim_account_id:
801 :return:
802 """
803 interfaces = []
804
805 for vld in vlds_to_connect:
806 table, _, db_id = vld.partition(":")
807 db_id, _, vld = db_id.partition(":")
808 _, _, vld_id = vld.partition(".")
809
810 if table == "vnfrs":
811 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
812 iface_key = "vnf-vld-id"
813 else: # table == "nsrs"
814 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
815 iface_key = "ns-vld-id"
816
817 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
818
819 for db_vnfr in db_vnfrs:
820 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
821 for iface_index, interface in enumerate(vdur["interfaces"]):
822 if interface.get(iface_key) == vld_id and interface.get(
823 "type"
824 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
825 # only SR-IOV o PT
826 interface_ = interface.copy()
827 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
828 db_vnfr["_id"], vdu_index, iface_index
829 )
830
831 if vdur.get("status") == "ERROR":
832 interface_["status"] = "ERROR"
833
834 interfaces.append(interface_)
835
836 return interfaces
837
838 def refresh(self, ro_task):
839 # look for task create
840 task_create_index, _ = next(
841 i_t
842 for i_t in enumerate(ro_task["tasks"])
843 if i_t[1]
844 and i_t[1]["action"] == "CREATE"
845 and i_t[1]["status"] != "FINISHED"
846 )
847
848 return self.new(ro_task, task_create_index, None)
849
850 def new(self, ro_task, task_index, task_depends):
851
852 task = ro_task["tasks"][task_index]
853 task_id = task["task_id"]
854 target_vim = self.my_vims[ro_task["target_id"]]
855
856 sdn_net_id = ro_task["vim_info"]["vim_id"]
857
858 created_items = ro_task["vim_info"].get("created_items")
859 connected_ports = ro_task["vim_info"].get("connected_ports", [])
860 new_connected_ports = []
861 last_update = ro_task["vim_info"].get("last_update", 0)
862 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
863 error_list = []
864 created = ro_task["vim_info"].get("created", False)
865
866 try:
867 # CREATE
868 params = task["params"]
869 vlds_to_connect = params["vlds"]
870 associated_vim = params["target_vim"]
871 # external additional ports
872 additional_ports = params.get("sdn-ports") or ()
873 _, _, vim_account_id = associated_vim.partition(":")
874
875 if associated_vim:
876 # get associated VIM
877 if associated_vim not in self.db_vims:
878 self.db_vims[associated_vim] = self.db.get_one(
879 "vim_accounts", {"_id": vim_account_id}
880 )
881
882 db_vim = self.db_vims[associated_vim]
883
884 # look for ports to connect
885 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
886 # print(ports)
887
888 sdn_ports = []
889 pending_ports = error_ports = 0
890 vlan_used = None
891 sdn_need_update = False
892
893 for port in ports:
894 vlan_used = port.get("vlan") or vlan_used
895
896 # TODO. Do not connect if already done
897 if not port.get("compute_node") or not port.get("pci"):
898 if port.get("status") == "ERROR":
899 error_ports += 1
900 else:
901 pending_ports += 1
902 continue
903
904 pmap = None
905 compute_node_mappings = next(
906 (
907 c
908 for c in db_vim["config"].get("sdn-port-mapping", ())
909 if c and c["compute_node"] == port["compute_node"]
910 ),
911 None,
912 )
913
914 if compute_node_mappings:
915 # process port_mapping pci of type 0000:af:1[01].[1357]
916 pmap = next(
917 (
918 p
919 for p in compute_node_mappings["ports"]
920 if self._match_pci(port["pci"], p.get("pci"))
921 ),
922 None,
923 )
924
925 if not pmap:
926 if not db_vim["config"].get("mapping_not_needed"):
927 error_list.append(
928 "Port mapping not found for compute_node={} pci={}".format(
929 port["compute_node"], port["pci"]
930 )
931 )
932 continue
933
934 pmap = {}
935
936 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
937 new_port = {
938 "service_endpoint_id": pmap.get("service_endpoint_id")
939 or service_endpoint_id,
940 "service_endpoint_encapsulation_type": "dot1q"
941 if port["type"] == "SR-IOV"
942 else None,
943 "service_endpoint_encapsulation_info": {
944 "vlan": port.get("vlan"),
945 "mac": port.get("mac-address"),
946 "device_id": pmap.get("device_id") or port["compute_node"],
947 "device_interface_id": pmap.get("device_interface_id")
948 or port["pci"],
949 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
950 "switch_port": pmap.get("switch_port"),
951 "service_mapping_info": pmap.get("service_mapping_info"),
952 },
953 }
954
955 # TODO
956 # if port["modified_at"] > last_update:
957 # sdn_need_update = True
958 new_connected_ports.append(port["id"]) # TODO
959 sdn_ports.append(new_port)
960
961 if error_ports:
962 error_list.append(
963 "{} interfaces have not been created as VDU is on ERROR status".format(
964 error_ports
965 )
966 )
967
968 # connect external ports
969 for index, additional_port in enumerate(additional_ports):
970 additional_port_id = additional_port.get(
971 "service_endpoint_id"
972 ) or "external-{}".format(index)
973 sdn_ports.append(
974 {
975 "service_endpoint_id": additional_port_id,
976 "service_endpoint_encapsulation_type": additional_port.get(
977 "service_endpoint_encapsulation_type", "dot1q"
978 ),
979 "service_endpoint_encapsulation_info": {
980 "vlan": additional_port.get("vlan") or vlan_used,
981 "mac": additional_port.get("mac_address"),
982 "device_id": additional_port.get("device_id"),
983 "device_interface_id": additional_port.get(
984 "device_interface_id"
985 ),
986 "switch_dpid": additional_port.get("switch_dpid")
987 or additional_port.get("switch_id"),
988 "switch_port": additional_port.get("switch_port"),
989 "service_mapping_info": additional_port.get(
990 "service_mapping_info"
991 ),
992 },
993 }
994 )
995 new_connected_ports.append(additional_port_id)
996 sdn_info = ""
997
998 # if there are more ports to connect or they have been modified, call create/update
999 if error_list:
1000 sdn_status = "ERROR"
1001 sdn_info = "; ".join(error_list)
1002 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1003 last_update = time.time()
1004
1005 if not sdn_net_id:
1006 if len(sdn_ports) < 2:
1007 sdn_status = "ACTIVE"
1008
1009 if not pending_ports:
1010 self.logger.debug(
1011 "task={} {} new-sdn-net done, less than 2 ports".format(
1012 task_id, ro_task["target_id"]
1013 )
1014 )
1015 else:
1016 net_type = params.get("type") or "ELAN"
1017 (
1018 sdn_net_id,
1019 created_items,
1020 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1021 created = True
1022 self.logger.debug(
1023 "task={} {} new-sdn-net={} created={}".format(
1024 task_id, ro_task["target_id"], sdn_net_id, created
1025 )
1026 )
1027 else:
1028 created_items = target_vim.edit_connectivity_service(
1029 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1030 )
1031 created = True
1032 self.logger.debug(
1033 "task={} {} update-sdn-net={} created={}".format(
1034 task_id, ro_task["target_id"], sdn_net_id, created
1035 )
1036 )
1037
1038 connected_ports = new_connected_ports
1039 elif sdn_net_id:
1040 wim_status_dict = target_vim.get_connectivity_service_status(
1041 sdn_net_id, conn_info=created_items
1042 )
1043 sdn_status = wim_status_dict["sdn_status"]
1044
1045 if wim_status_dict.get("sdn_info"):
1046 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1047
1048 if wim_status_dict.get("error_msg"):
1049 sdn_info = wim_status_dict.get("error_msg") or ""
1050
1051 if pending_ports:
1052 if sdn_status != "ERROR":
1053 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1054 len(ports) - pending_ports, len(ports)
1055 )
1056
1057 if sdn_status == "ACTIVE":
1058 sdn_status = "BUILD"
1059
1060 ro_vim_item_update = {
1061 "vim_id": sdn_net_id,
1062 "vim_status": sdn_status,
1063 "created": created,
1064 "created_items": created_items,
1065 "connected_ports": connected_ports,
1066 "vim_details": sdn_info,
1067 "last_update": last_update,
1068 }
1069
1070 return sdn_status, ro_vim_item_update
1071 except Exception as e:
1072 self.logger.error(
1073 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1074 exc_info=not isinstance(
1075 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1076 ),
1077 )
1078 ro_vim_item_update = {
1079 "vim_status": "VIM_ERROR",
1080 "created": created,
1081 "vim_details": str(e),
1082 }
1083
1084 return "FAILED", ro_vim_item_update
1085
1086 def delete(self, ro_task, task_index):
1087 task = ro_task["tasks"][task_index]
1088 task_id = task["task_id"]
1089 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1090 ro_vim_item_update_ok = {
1091 "vim_status": "DELETED",
1092 "created": False,
1093 "vim_details": "DELETED",
1094 "vim_id": None,
1095 }
1096
1097 try:
1098 if sdn_vim_id:
1099 target_vim = self.my_vims[ro_task["target_id"]]
1100 target_vim.delete_connectivity_service(
1101 sdn_vim_id, ro_task["vim_info"].get("created_items")
1102 )
1103
1104 except Exception as e:
1105 if (
1106 isinstance(e, sdnconn.SdnConnectorError)
1107 and e.http_code == HTTPStatus.NOT_FOUND.value
1108 ):
1109 ro_vim_item_update_ok["vim_details"] = "already deleted"
1110 else:
1111 self.logger.error(
1112 "ro_task={} vim={} del-sdn-net={}: {}".format(
1113 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1114 ),
1115 exc_info=not isinstance(
1116 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1117 ),
1118 )
1119 ro_vim_item_update = {
1120 "vim_status": "VIM_ERROR",
1121 "vim_details": "Error while deleting: {}".format(e),
1122 }
1123
1124 return "FAILED", ro_vim_item_update
1125
1126 self.logger.debug(
1127 "task={} {} del-sdn-net={} {}".format(
1128 task_id,
1129 ro_task["target_id"],
1130 sdn_vim_id,
1131 ro_vim_item_update_ok.get("vim_details", ""),
1132 )
1133 )
1134
1135 return "DONE", ro_vim_item_update_ok
1136
1137
1138 class NsWorker(threading.Thread):
1139 REFRESH_BUILD = 5 # 5 seconds
1140 REFRESH_ACTIVE = 60 # 1 minute
1141 REFRESH_ERROR = 600
1142 REFRESH_IMAGE = 3600 * 10
1143 REFRESH_DELETE = 3600 * 10
1144 QUEUE_SIZE = 100
1145 terminate = False
1146
1147 def __init__(self, worker_index, config, plugins, db):
1148 """
1149
1150 :param worker_index: thread index
1151 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1152 :param plugins: global shared dict with the loaded plugins
1153 :param db: database class instance to use
1154 """
1155 threading.Thread.__init__(self)
1156 self.config = config
1157 self.plugins = plugins
1158 self.plugin_name = "unknown"
1159 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1160 self.worker_index = worker_index
1161 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1162 # targetvim: vimplugin class
1163 self.my_vims = {}
1164 # targetvim: vim information from database
1165 self.db_vims = {}
1166 # targetvim list
1167 self.vim_targets = []
1168 self.my_id = config["process_id"] + ":" + str(worker_index)
1169 self.db = db
1170 self.item2class = {
1171 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1172 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1173 "image": VimInteractionImage(
1174 self.db, self.my_vims, self.db_vims, self.logger
1175 ),
1176 "flavor": VimInteractionFlavor(
1177 self.db, self.my_vims, self.db_vims, self.logger
1178 ),
1179 "sdn_net": VimInteractionSdnNet(
1180 self.db, self.my_vims, self.db_vims, self.logger
1181 ),
1182 }
1183 self.time_last_task_processed = None
1184 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1185 self.tasks_to_delete = []
1186 # it is idle when there are not vim_targets associated
1187 self.idle = True
1188 self.task_locked_time = config["global"]["task_locked_time"]
1189
1190 def insert_task(self, task):
1191 try:
1192 self.task_queue.put(task, False)
1193 return None
1194 except queue.Full:
1195 raise NsWorkerException("timeout inserting a task")
1196
1197 def terminate(self):
1198 self.insert_task("exit")
1199
1200 def del_task(self, task):
1201 with self.task_lock:
1202 if task["status"] == "SCHEDULED":
1203 task["status"] = "SUPERSEDED"
1204 return True
1205 else: # task["status"] == "processing"
1206 self.task_lock.release()
1207 return False
1208
1209 def _process_vim_config(self, target_id, db_vim):
1210 """
1211 Process vim config, creating vim configuration files as ca_cert
1212 :param target_id: vim/sdn/wim + id
1213 :param db_vim: Vim dictionary obtained from database
1214 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1215 """
1216 if not db_vim.get("config"):
1217 return
1218
1219 file_name = ""
1220
1221 try:
1222 if db_vim["config"].get("ca_cert_content"):
1223 file_name = "{}:{}".format(target_id, self.worker_index)
1224
1225 try:
1226 mkdir(file_name)
1227 except FileExistsError:
1228 pass
1229
1230 file_name = file_name + "/ca_cert"
1231
1232 with open(file_name, "w") as f:
1233 f.write(db_vim["config"]["ca_cert_content"])
1234 del db_vim["config"]["ca_cert_content"]
1235 db_vim["config"]["ca_cert"] = file_name
1236 except Exception as e:
1237 raise NsWorkerException(
1238 "Error writing to file '{}': {}".format(file_name, e)
1239 )
1240
1241 def _load_plugin(self, name, type="vim"):
1242 # type can be vim or sdn
1243 if "rovim_dummy" not in self.plugins:
1244 self.plugins["rovim_dummy"] = VimDummyConnector
1245
1246 if "rosdn_dummy" not in self.plugins:
1247 self.plugins["rosdn_dummy"] = SdnDummyConnector
1248
1249 if name in self.plugins:
1250 return self.plugins[name]
1251
1252 try:
1253 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1254 self.plugins[name] = ep.load()
1255 except Exception as e:
1256 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1257
1258 if name and name not in self.plugins:
1259 raise NsWorkerException(
1260 "Plugin 'osm_{n}' has not been installed".format(n=name)
1261 )
1262
1263 return self.plugins[name]
1264
1265 def _unload_vim(self, target_id):
1266 """
1267 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1268 :param target_id: Contains type:_id; where type can be 'vim', ...
1269 :return: None.
1270 """
1271 try:
1272 self.db_vims.pop(target_id, None)
1273 self.my_vims.pop(target_id, None)
1274
1275 if target_id in self.vim_targets:
1276 self.vim_targets.remove(target_id)
1277
1278 self.logger.info("Unloaded {}".format(target_id))
1279 rmtree("{}:{}".format(target_id, self.worker_index))
1280 except FileNotFoundError:
1281 pass # this is raised by rmtree if folder does not exist
1282 except Exception as e:
1283 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1284
1285 def _check_vim(self, target_id):
1286 """
1287 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1288 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1289 :return: None.
1290 """
1291 target, _, _id = target_id.partition(":")
1292 now = time.time()
1293 update_dict = {}
1294 unset_dict = {}
1295 op_text = ""
1296 step = ""
1297 loaded = target_id in self.vim_targets
1298 target_database = (
1299 "vim_accounts"
1300 if target == "vim"
1301 else "wim_accounts"
1302 if target == "wim"
1303 else "sdns"
1304 )
1305
1306 try:
1307 step = "Getting {} from db".format(target_id)
1308 db_vim = self.db.get_one(target_database, {"_id": _id})
1309
1310 for op_index, operation in enumerate(
1311 db_vim["_admin"].get("operations", ())
1312 ):
1313 if operation["operationState"] != "PROCESSING":
1314 continue
1315
1316 locked_at = operation.get("locked_at")
1317
1318 if locked_at is not None and locked_at >= now - self.task_locked_time:
1319 # some other thread is doing this operation
1320 return
1321
1322 # lock
1323 op_text = "_admin.operations.{}.".format(op_index)
1324
1325 if not self.db.set_one(
1326 target_database,
1327 q_filter={
1328 "_id": _id,
1329 op_text + "operationState": "PROCESSING",
1330 op_text + "locked_at": locked_at,
1331 },
1332 update_dict={
1333 op_text + "locked_at": now,
1334 "admin.current_operation": op_index,
1335 },
1336 fail_on_empty=False,
1337 ):
1338 return
1339
1340 unset_dict[op_text + "locked_at"] = None
1341 unset_dict["current_operation"] = None
1342 step = "Loading " + target_id
1343 error_text = self._load_vim(target_id)
1344
1345 if not error_text:
1346 step = "Checking connectivity"
1347
1348 if target == "vim":
1349 self.my_vims[target_id].check_vim_connectivity()
1350 else:
1351 self.my_vims[target_id].check_credentials()
1352
1353 update_dict["_admin.operationalState"] = "ENABLED"
1354 update_dict["_admin.detailed-status"] = ""
1355 unset_dict[op_text + "detailed-status"] = None
1356 update_dict[op_text + "operationState"] = "COMPLETED"
1357
1358 return
1359
1360 except Exception as e:
1361 error_text = "{}: {}".format(step, e)
1362 self.logger.error("{} for {}: {}".format(step, target_id, e))
1363
1364 finally:
1365 if update_dict or unset_dict:
1366 if error_text:
1367 update_dict[op_text + "operationState"] = "FAILED"
1368 update_dict[op_text + "detailed-status"] = error_text
1369 unset_dict.pop(op_text + "detailed-status", None)
1370 update_dict["_admin.operationalState"] = "ERROR"
1371 update_dict["_admin.detailed-status"] = error_text
1372
1373 if op_text:
1374 update_dict[op_text + "statusEnteredTime"] = now
1375
1376 self.db.set_one(
1377 target_database,
1378 q_filter={"_id": _id},
1379 update_dict=update_dict,
1380 unset=unset_dict,
1381 fail_on_empty=False,
1382 )
1383
1384 if not loaded:
1385 self._unload_vim(target_id)
1386
1387 def _reload_vim(self, target_id):
1388 if target_id in self.vim_targets:
1389 self._load_vim(target_id)
1390 else:
1391 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1392 # just remove it to force load again next time it is needed
1393 self.db_vims.pop(target_id, None)
1394
1395 def _load_vim(self, target_id):
1396 """
1397 Load or reload a vim_account, sdn_controller or wim_account.
1398 Read content from database, load the plugin if not loaded.
1399 In case of error loading the plugin, it load a failing VIM_connector
1400 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1401 :param target_id: Contains type:_id; where type can be 'vim', ...
1402 :return: None if ok, descriptive text if error
1403 """
1404 target, _, _id = target_id.partition(":")
1405 target_database = (
1406 "vim_accounts"
1407 if target == "vim"
1408 else "wim_accounts"
1409 if target == "wim"
1410 else "sdns"
1411 )
1412 plugin_name = ""
1413 vim = None
1414
1415 try:
1416 step = "Getting {}={} from db".format(target, _id)
1417 # TODO process for wim, sdnc, ...
1418 vim = self.db.get_one(target_database, {"_id": _id})
1419
1420 # if deep_get(vim, "config", "sdn-controller"):
1421 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1422 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1423
1424 step = "Decrypting password"
1425 schema_version = vim.get("schema_version")
1426 self.db.encrypt_decrypt_fields(
1427 vim,
1428 "decrypt",
1429 fields=("password", "secret"),
1430 schema_version=schema_version,
1431 salt=_id,
1432 )
1433 self._process_vim_config(target_id, vim)
1434
1435 if target == "vim":
1436 plugin_name = "rovim_" + vim["vim_type"]
1437 step = "Loading plugin '{}'".format(plugin_name)
1438 vim_module_conn = self._load_plugin(plugin_name)
1439 step = "Loading {}'".format(target_id)
1440 self.my_vims[target_id] = vim_module_conn(
1441 uuid=vim["_id"],
1442 name=vim["name"],
1443 tenant_id=vim.get("vim_tenant_id"),
1444 tenant_name=vim.get("vim_tenant_name"),
1445 url=vim["vim_url"],
1446 url_admin=None,
1447 user=vim["vim_user"],
1448 passwd=vim["vim_password"],
1449 config=vim.get("config") or {},
1450 persistent_info={},
1451 )
1452 else: # sdn
1453 plugin_name = "rosdn_" + vim["type"]
1454 step = "Loading plugin '{}'".format(plugin_name)
1455 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1456 step = "Loading {}'".format(target_id)
1457 wim = deepcopy(vim)
1458 wim_config = wim.pop("config", {}) or {}
1459 wim["uuid"] = wim["_id"]
1460 wim["wim_url"] = wim["url"]
1461
1462 if wim.get("dpid"):
1463 wim_config["dpid"] = wim.pop("dpid")
1464
1465 if wim.get("switch_id"):
1466 wim_config["switch_id"] = wim.pop("switch_id")
1467
1468 # wim, wim_account, config
1469 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1470 self.db_vims[target_id] = vim
1471 self.error_status = None
1472
1473 self.logger.info(
1474 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1475 )
1476 except Exception as e:
1477 self.logger.error(
1478 "Cannot load {} plugin={}: {} {}".format(
1479 target_id, plugin_name, step, e
1480 )
1481 )
1482
1483 self.db_vims[target_id] = vim or {}
1484 self.db_vims[target_id] = FailingConnector(str(e))
1485 error_status = "{} Error: {}".format(step, e)
1486
1487 return error_status
1488 finally:
1489 if target_id not in self.vim_targets:
1490 self.vim_targets.append(target_id)
1491
1492 def _get_db_task(self):
1493 """
1494 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1495 :return: None
1496 """
1497 now = time.time()
1498
1499 if not self.time_last_task_processed:
1500 self.time_last_task_processed = now
1501
1502 try:
1503 while True:
1504 """
1505 # Log RO tasks only when loglevel is DEBUG
1506 if self.logger.getEffectiveLevel() == logging.DEBUG:
1507 self._log_ro_task(
1508 None,
1509 None,
1510 None,
1511 "TASK_WF",
1512 "task_locked_time="
1513 + str(self.task_locked_time)
1514 + " "
1515 + "time_last_task_processed="
1516 + str(self.time_last_task_processed)
1517 + " "
1518 + "now="
1519 + str(now),
1520 )
1521 """
1522 locked = self.db.set_one(
1523 "ro_tasks",
1524 q_filter={
1525 "target_id": self.vim_targets,
1526 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1527 "locked_at.lt": now - self.task_locked_time,
1528 "to_check_at.lt": self.time_last_task_processed,
1529 },
1530 update_dict={"locked_by": self.my_id, "locked_at": now},
1531 fail_on_empty=False,
1532 )
1533
1534 if locked:
1535 # read and return
1536 ro_task = self.db.get_one(
1537 "ro_tasks",
1538 q_filter={
1539 "target_id": self.vim_targets,
1540 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1541 "locked_at": now,
1542 },
1543 )
1544 return ro_task
1545
1546 if self.time_last_task_processed == now:
1547 self.time_last_task_processed = None
1548 return None
1549 else:
1550 self.time_last_task_processed = now
1551 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1552
1553 except DbException as e:
1554 self.logger.error("Database exception at _get_db_task: {}".format(e))
1555 except Exception as e:
1556 self.logger.critical(
1557 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1558 )
1559
1560 return None
1561
1562 def _get_db_all_tasks(self):
1563 """
1564 Read all content of table ro_tasks to log it
1565 :return: None
1566 """
1567 try:
1568 # Checking the content of the BD:
1569
1570 # read and return
1571 ro_task = self.db.get_list("ro_tasks")
1572 for rt in ro_task:
1573 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1574 return ro_task
1575
1576 except DbException as e:
1577 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1578 except Exception as e:
1579 self.logger.critical(
1580 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1581 )
1582
1583 return None
1584
1585 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1586 """
1587 Generate a log with the following format:
1588
1589 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1590 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1591 task_array_index;task_id;task_action;task_item;task_args
1592
1593 Example:
1594
1595 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1596 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1597 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1598 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1599 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1600 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1601 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1602 """
1603 try:
1604 line = []
1605 i = 0
1606 if ro_task is not None and isinstance(ro_task, dict):
1607 for t in ro_task["tasks"]:
1608 line.clear()
1609 line.append(mark)
1610 line.append(event)
1611 line.append(ro_task.get("_id", ""))
1612 line.append(str(ro_task.get("locked_at", "")))
1613 line.append(str(ro_task.get("modified_at", "")))
1614 line.append(str(ro_task.get("created_at", "")))
1615 line.append(str(ro_task.get("to_check_at", "")))
1616 line.append(str(ro_task.get("locked_by", "")))
1617 line.append(str(ro_task.get("target_id", "")))
1618 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1619 line.append(str(ro_task.get("vim_info", "")))
1620 line.append(str(ro_task.get("tasks", "")))
1621 if isinstance(t, dict):
1622 line.append(str(t.get("status", "")))
1623 line.append(str(t.get("action_id", "")))
1624 line.append(str(i))
1625 line.append(str(t.get("task_id", "")))
1626 line.append(str(t.get("action", "")))
1627 line.append(str(t.get("item", "")))
1628 line.append(str(t.get("find_params", "")))
1629 line.append(str(t.get("params", "")))
1630 else:
1631 line.extend([""] * 2)
1632 line.append(str(i))
1633 line.extend([""] * 5)
1634
1635 i += 1
1636 self.logger.debug(";".join(line))
1637 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1638 i = 0
1639 while True:
1640 st = "tasks.{}.status".format(i)
1641 if st not in db_ro_task_update:
1642 break
1643 line.clear()
1644 line.append(mark)
1645 line.append(event)
1646 line.append(db_ro_task_update.get("_id", ""))
1647 line.append(str(db_ro_task_update.get("locked_at", "")))
1648 line.append(str(db_ro_task_update.get("modified_at", "")))
1649 line.append("")
1650 line.append(str(db_ro_task_update.get("to_check_at", "")))
1651 line.append(str(db_ro_task_update.get("locked_by", "")))
1652 line.append("")
1653 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1654 line.append("")
1655 line.append(str(db_ro_task_update.get("vim_info", "")))
1656 line.append(str(str(db_ro_task_update).count(".status")))
1657 line.append(db_ro_task_update.get(st, ""))
1658 line.append("")
1659 line.append(str(i))
1660 line.extend([""] * 3)
1661 i += 1
1662 self.logger.debug(";".join(line))
1663
1664 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1665 line.clear()
1666 line.append(mark)
1667 line.append(event)
1668 line.append(db_ro_task_delete.get("_id", ""))
1669 line.append("")
1670 line.append(db_ro_task_delete.get("modified_at", ""))
1671 line.extend([""] * 13)
1672 self.logger.debug(";".join(line))
1673
1674 else:
1675 line.clear()
1676 line.append(mark)
1677 line.append(event)
1678 line.extend([""] * 16)
1679 self.logger.debug(";".join(line))
1680
1681 except Exception as e:
1682 self.logger.error("Error logging ro_task: {}".format(e))
1683
1684 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1685 """
1686 Determine if this task need to be done or superseded
1687 :return: None
1688 """
1689 my_task = ro_task["tasks"][task_index]
1690 task_id = my_task["task_id"]
1691 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1692 "created_items", False
1693 )
1694
1695 if my_task["status"] == "FAILED":
1696 return None, None # TODO need to be retry??
1697
1698 try:
1699 for index, task in enumerate(ro_task["tasks"]):
1700 if index == task_index or not task:
1701 continue # own task
1702
1703 if (
1704 my_task["target_record"] == task["target_record"]
1705 and task["action"] == "CREATE"
1706 ):
1707 # set to finished
1708 db_update["tasks.{}.status".format(index)] = task[
1709 "status"
1710 ] = "FINISHED"
1711 elif task["action"] == "CREATE" and task["status"] not in (
1712 "FINISHED",
1713 "SUPERSEDED",
1714 ):
1715 needed_delete = False
1716
1717 if needed_delete:
1718 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1719 else:
1720 return "SUPERSEDED", None
1721 except Exception as e:
1722 if not isinstance(e, NsWorkerException):
1723 self.logger.critical(
1724 "Unexpected exception at _delete_task task={}: {}".format(
1725 task_id, e
1726 ),
1727 exc_info=True,
1728 )
1729
1730 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1731
1732 def _create_task(self, ro_task, task_index, task_depends, db_update):
1733 """
1734 Determine if this task need to create something at VIM
1735 :return: None
1736 """
1737 my_task = ro_task["tasks"][task_index]
1738 task_id = my_task["task_id"]
1739 task_status = None
1740
1741 if my_task["status"] == "FAILED":
1742 return None, None # TODO need to be retry??
1743 elif my_task["status"] == "SCHEDULED":
1744 # check if already created by another task
1745 for index, task in enumerate(ro_task["tasks"]):
1746 if index == task_index or not task:
1747 continue # own task
1748
1749 if task["action"] == "CREATE" and task["status"] not in (
1750 "SCHEDULED",
1751 "FINISHED",
1752 "SUPERSEDED",
1753 ):
1754 return task["status"], "COPY_VIM_INFO"
1755
1756 try:
1757 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1758 ro_task, task_index, task_depends
1759 )
1760 # TODO update other CREATE tasks
1761 except Exception as e:
1762 if not isinstance(e, NsWorkerException):
1763 self.logger.error(
1764 "Error executing task={}: {}".format(task_id, e), exc_info=True
1765 )
1766
1767 task_status = "FAILED"
1768 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1769 # TODO update ro_vim_item_update
1770
1771 return task_status, ro_vim_item_update
1772 else:
1773 return None, None
1774
1775 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1776 """
1777 Look for dependency task
1778 :param task_id: Can be one of
1779 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1780 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1781 3. task.task_id: "<action_id>:number"
1782 :param ro_task:
1783 :param target_id:
1784 :return: database ro_task plus index of task
1785 """
1786 if (
1787 task_id.startswith("vim:")
1788 or task_id.startswith("sdn:")
1789 or task_id.startswith("wim:")
1790 ):
1791 target_id, _, task_id = task_id.partition(" ")
1792
1793 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1794 ro_task_dependency = self.db.get_one(
1795 "ro_tasks",
1796 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1797 fail_on_empty=False,
1798 )
1799
1800 if ro_task_dependency:
1801 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1802 if task["target_record_id"] == task_id:
1803 return ro_task_dependency, task_index
1804
1805 else:
1806 if ro_task:
1807 for task_index, task in enumerate(ro_task["tasks"]):
1808 if task and task["task_id"] == task_id:
1809 return ro_task, task_index
1810
1811 ro_task_dependency = self.db.get_one(
1812 "ro_tasks",
1813 q_filter={
1814 "tasks.ANYINDEX.task_id": task_id,
1815 "tasks.ANYINDEX.target_record.ne": None,
1816 },
1817 fail_on_empty=False,
1818 )
1819
1820 if ro_task_dependency:
1821 for task_index, task in ro_task_dependency["tasks"]:
1822 if task["task_id"] == task_id:
1823 return ro_task_dependency, task_index
1824 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1825
1826 def _process_pending_tasks(self, ro_task):
1827 ro_task_id = ro_task["_id"]
1828 now = time.time()
1829 # one day
1830 next_check_at = now + (24 * 60 * 60)
1831 db_ro_task_update = {}
1832
1833 def _update_refresh(new_status):
1834 # compute next_refresh
1835 nonlocal task
1836 nonlocal next_check_at
1837 nonlocal db_ro_task_update
1838 nonlocal ro_task
1839
1840 next_refresh = time.time()
1841
1842 if task["item"] in ("image", "flavor"):
1843 next_refresh += self.REFRESH_IMAGE
1844 elif new_status == "BUILD":
1845 next_refresh += self.REFRESH_BUILD
1846 elif new_status == "DONE":
1847 next_refresh += self.REFRESH_ACTIVE
1848 else:
1849 next_refresh += self.REFRESH_ERROR
1850
1851 next_check_at = min(next_check_at, next_refresh)
1852 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1853 ro_task["vim_info"]["refresh_at"] = next_refresh
1854
1855 try:
1856 """
1857 # Log RO tasks only when loglevel is DEBUG
1858 if self.logger.getEffectiveLevel() == logging.DEBUG:
1859 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1860 """
1861 # 0: get task_status_create
1862 lock_object = None
1863 task_status_create = None
1864 task_create = next(
1865 (
1866 t
1867 for t in ro_task["tasks"]
1868 if t
1869 and t["action"] == "CREATE"
1870 and t["status"] in ("BUILD", "DONE")
1871 ),
1872 None,
1873 )
1874
1875 if task_create:
1876 task_status_create = task_create["status"]
1877
1878 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1879 for task_action in ("DELETE", "CREATE", "EXEC"):
1880 db_vim_update = None
1881 new_status = None
1882
1883 for task_index, task in enumerate(ro_task["tasks"]):
1884 if not task:
1885 continue # task deleted
1886
1887 task_depends = {}
1888 target_update = None
1889
1890 if (
1891 (
1892 task_action in ("DELETE", "EXEC")
1893 and task["status"] not in ("SCHEDULED", "BUILD")
1894 )
1895 or task["action"] != task_action
1896 or (
1897 task_action == "CREATE"
1898 and task["status"] in ("FINISHED", "SUPERSEDED")
1899 )
1900 ):
1901 continue
1902
1903 task_path = "tasks.{}.status".format(task_index)
1904 try:
1905 db_vim_info_update = None
1906
1907 if task["status"] == "SCHEDULED":
1908 # check if tasks that this depends on have been completed
1909 dependency_not_completed = False
1910
1911 for dependency_task_id in task.get("depends_on") or ():
1912 (
1913 dependency_ro_task,
1914 dependency_task_index,
1915 ) = self._get_dependency(
1916 dependency_task_id, target_id=ro_task["target_id"]
1917 )
1918 dependency_task = dependency_ro_task["tasks"][
1919 dependency_task_index
1920 ]
1921
1922 if dependency_task["status"] == "SCHEDULED":
1923 dependency_not_completed = True
1924 next_check_at = min(
1925 next_check_at, dependency_ro_task["to_check_at"]
1926 )
1927 # must allow dependent task to be processed first
1928 # to do this set time after last_task_processed
1929 next_check_at = max(
1930 self.time_last_task_processed, next_check_at
1931 )
1932 break
1933 elif dependency_task["status"] == "FAILED":
1934 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1935 task["action"],
1936 task["item"],
1937 dependency_task["action"],
1938 dependency_task["item"],
1939 dependency_task_id,
1940 dependency_ro_task["vim_info"].get(
1941 "vim_details"
1942 ),
1943 )
1944 self.logger.error(
1945 "task={} {}".format(task["task_id"], error_text)
1946 )
1947 raise NsWorkerException(error_text)
1948
1949 task_depends[dependency_task_id] = dependency_ro_task[
1950 "vim_info"
1951 ]["vim_id"]
1952 task_depends[
1953 "TASK-{}".format(dependency_task_id)
1954 ] = dependency_ro_task["vim_info"]["vim_id"]
1955
1956 if dependency_not_completed:
1957 # TODO set at vim_info.vim_details that it is waiting
1958 continue
1959
1960 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
1961 # the task of renew this locking. It will update database locket_at periodically
1962 if not lock_object:
1963 lock_object = LockRenew.add_lock_object(
1964 "ro_tasks", ro_task, self
1965 )
1966
1967 if task["action"] == "DELETE":
1968 (new_status, db_vim_info_update,) = self._delete_task(
1969 ro_task, task_index, task_depends, db_ro_task_update
1970 )
1971 new_status = (
1972 "FINISHED" if new_status == "DONE" else new_status
1973 )
1974 # ^with FINISHED instead of DONE it will not be refreshing
1975
1976 if new_status in ("FINISHED", "SUPERSEDED"):
1977 target_update = "DELETE"
1978 elif task["action"] == "EXEC":
1979 (
1980 new_status,
1981 db_vim_info_update,
1982 db_task_update,
1983 ) = self.item2class[task["item"]].exec(
1984 ro_task, task_index, task_depends
1985 )
1986 new_status = (
1987 "FINISHED" if new_status == "DONE" else new_status
1988 )
1989 # ^with FINISHED instead of DONE it will not be refreshing
1990
1991 if db_task_update:
1992 # load into database the modified db_task_update "retries" and "next_retry"
1993 if db_task_update.get("retries"):
1994 db_ro_task_update[
1995 "tasks.{}.retries".format(task_index)
1996 ] = db_task_update["retries"]
1997
1998 next_check_at = time.time() + db_task_update.get(
1999 "next_retry", 60
2000 )
2001 target_update = None
2002 elif task["action"] == "CREATE":
2003 if task["status"] == "SCHEDULED":
2004 if task_status_create:
2005 new_status = task_status_create
2006 target_update = "COPY_VIM_INFO"
2007 else:
2008 new_status, db_vim_info_update = self.item2class[
2009 task["item"]
2010 ].new(ro_task, task_index, task_depends)
2011 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2012 _update_refresh(new_status)
2013 else:
2014 if (
2015 ro_task["vim_info"]["refresh_at"]
2016 and now > ro_task["vim_info"]["refresh_at"]
2017 ):
2018 new_status, db_vim_info_update = self.item2class[
2019 task["item"]
2020 ].refresh(ro_task)
2021 _update_refresh(new_status)
2022 else:
2023 # The refresh is updated to avoid set the value of "refresh_at" to
2024 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2025 # because it can happen that in this case the task is never processed
2026 _update_refresh(task["status"])
2027
2028 except Exception as e:
2029 new_status = "FAILED"
2030 db_vim_info_update = {
2031 "vim_status": "VIM_ERROR",
2032 "vim_details": str(e),
2033 }
2034
2035 if not isinstance(
2036 e, (NsWorkerException, vimconn.VimConnException)
2037 ):
2038 self.logger.error(
2039 "Unexpected exception at _delete_task task={}: {}".format(
2040 task["task_id"], e
2041 ),
2042 exc_info=True,
2043 )
2044
2045 try:
2046 if db_vim_info_update:
2047 db_vim_update = db_vim_info_update.copy()
2048 db_ro_task_update.update(
2049 {
2050 "vim_info." + k: v
2051 for k, v in db_vim_info_update.items()
2052 }
2053 )
2054 ro_task["vim_info"].update(db_vim_info_update)
2055
2056 if new_status:
2057 if task_action == "CREATE":
2058 task_status_create = new_status
2059 db_ro_task_update[task_path] = new_status
2060
2061 if target_update or db_vim_update:
2062 if target_update == "DELETE":
2063 self._update_target(task, None)
2064 elif target_update == "COPY_VIM_INFO":
2065 self._update_target(task, ro_task["vim_info"])
2066 else:
2067 self._update_target(task, db_vim_update)
2068
2069 except Exception as e:
2070 if (
2071 isinstance(e, DbException)
2072 and e.http_code == HTTPStatus.NOT_FOUND
2073 ):
2074 # if the vnfrs or nsrs has been removed from database, this task must be removed
2075 self.logger.debug(
2076 "marking to delete task={}".format(task["task_id"])
2077 )
2078 self.tasks_to_delete.append(task)
2079 else:
2080 self.logger.error(
2081 "Unexpected exception at _update_target task={}: {}".format(
2082 task["task_id"], e
2083 ),
2084 exc_info=True,
2085 )
2086
2087 locked_at = ro_task["locked_at"]
2088
2089 if lock_object:
2090 locked_at = [
2091 lock_object["locked_at"],
2092 lock_object["locked_at"] + self.task_locked_time,
2093 ]
2094 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2095 # contain exactly locked_at + self.task_locked_time
2096 LockRenew.remove_lock_object(lock_object)
2097
2098 q_filter = {
2099 "_id": ro_task["_id"],
2100 "to_check_at": ro_task["to_check_at"],
2101 "locked_at": locked_at,
2102 }
2103 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2104 # outside this task (by ro_nbi) do not update it
2105 db_ro_task_update["locked_by"] = None
2106 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2107 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2108 db_ro_task_update["modified_at"] = now
2109 db_ro_task_update["to_check_at"] = next_check_at
2110
2111 """
2112 # Log RO tasks only when loglevel is DEBUG
2113 if self.logger.getEffectiveLevel() == logging.DEBUG:
2114 db_ro_task_update_log = db_ro_task_update.copy()
2115 db_ro_task_update_log["_id"] = q_filter["_id"]
2116 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2117 """
2118
2119 if not self.db.set_one(
2120 "ro_tasks",
2121 update_dict=db_ro_task_update,
2122 q_filter=q_filter,
2123 fail_on_empty=False,
2124 ):
2125 del db_ro_task_update["to_check_at"]
2126 del q_filter["to_check_at"]
2127 """
2128 # Log RO tasks only when loglevel is DEBUG
2129 if self.logger.getEffectiveLevel() == logging.DEBUG:
2130 self._log_ro_task(
2131 None,
2132 db_ro_task_update_log,
2133 None,
2134 "TASK_WF",
2135 "SET_TASK " + str(q_filter),
2136 )
2137 """
2138 self.db.set_one(
2139 "ro_tasks",
2140 q_filter=q_filter,
2141 update_dict=db_ro_task_update,
2142 fail_on_empty=True,
2143 )
2144 except DbException as e:
2145 self.logger.error(
2146 "ro_task={} Error updating database {}".format(ro_task_id, e)
2147 )
2148 except Exception as e:
2149 self.logger.error(
2150 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2151 )
2152
2153 def _update_target(self, task, ro_vim_item_update):
2154 table, _, temp = task["target_record"].partition(":")
2155 _id, _, path_vim_status = temp.partition(":")
2156 path_item = path_vim_status[: path_vim_status.rfind(".")]
2157 path_item = path_item[: path_item.rfind(".")]
2158 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2159 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2160
2161 if ro_vim_item_update:
2162 update_dict = {
2163 path_vim_status + "." + k: v
2164 for k, v in ro_vim_item_update.items()
2165 if k
2166 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2167 }
2168
2169 if path_vim_status.startswith("vdur."):
2170 # for backward compatibility, add vdur.name apart from vdur.vim_name
2171 if ro_vim_item_update.get("vim_name"):
2172 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2173
2174 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2175 if ro_vim_item_update.get("vim_id"):
2176 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2177
2178 # update general status
2179 if ro_vim_item_update.get("vim_status"):
2180 update_dict[path_item + ".status"] = ro_vim_item_update[
2181 "vim_status"
2182 ]
2183
2184 if ro_vim_item_update.get("interfaces"):
2185 path_interfaces = path_item + ".interfaces"
2186
2187 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2188 if iface:
2189 update_dict.update(
2190 {
2191 path_interfaces + ".{}.".format(i) + k: v
2192 for k, v in iface.items()
2193 if k in ("vlan", "compute_node", "pci")
2194 }
2195 )
2196
2197 # put ip_address and mac_address with ip-address and mac-address
2198 if iface.get("ip_address"):
2199 update_dict[
2200 path_interfaces + ".{}.".format(i) + "ip-address"
2201 ] = iface["ip_address"]
2202
2203 if iface.get("mac_address"):
2204 update_dict[
2205 path_interfaces + ".{}.".format(i) + "mac-address"
2206 ] = iface["mac_address"]
2207
2208 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2209 update_dict["ip-address"] = iface.get("ip_address").split(
2210 ";"
2211 )[0]
2212
2213 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2214 update_dict[path_item + ".ip-address"] = iface.get(
2215 "ip_address"
2216 ).split(";")[0]
2217
2218 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2219 else:
2220 update_dict = {path_item + ".status": "DELETED"}
2221 self.db.set_one(
2222 table,
2223 q_filter={"_id": _id},
2224 update_dict=update_dict,
2225 unset={path_vim_status: None},
2226 )
2227
2228 def _process_delete_db_tasks(self):
2229 """
2230 Delete task from database because vnfrs or nsrs or both have been deleted
2231 :return: None. Uses and modify self.tasks_to_delete
2232 """
2233 while self.tasks_to_delete:
2234 task = self.tasks_to_delete[0]
2235 vnfrs_deleted = None
2236 nsr_id = task["nsr_id"]
2237
2238 if task["target_record"].startswith("vnfrs:"):
2239 # check if nsrs is present
2240 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2241 vnfrs_deleted = task["target_record"].split(":")[1]
2242
2243 try:
2244 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2245 except Exception as e:
2246 self.logger.error(
2247 "Error deleting task={}: {}".format(task["task_id"], e)
2248 )
2249 self.tasks_to_delete.pop(0)
2250
2251 @staticmethod
2252 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2253 """
2254 Static method because it is called from osm_ng_ro.ns
2255 :param db: instance of database to use
2256 :param nsr_id: affected nsrs id
2257 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2258 :return: None, exception is fails
2259 """
2260 retries = 5
2261 for retry in range(retries):
2262 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2263 now = time.time()
2264 conflict = False
2265
2266 for ro_task in ro_tasks:
2267 db_update = {}
2268 to_delete_ro_task = True
2269
2270 for index, task in enumerate(ro_task["tasks"]):
2271 if not task:
2272 pass
2273 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2274 vnfrs_deleted
2275 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2276 ):
2277 db_update["tasks.{}".format(index)] = None
2278 else:
2279 # used by other nsr, ro_task cannot be deleted
2280 to_delete_ro_task = False
2281
2282 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2283 if to_delete_ro_task:
2284 if not db.del_one(
2285 "ro_tasks",
2286 q_filter={
2287 "_id": ro_task["_id"],
2288 "modified_at": ro_task["modified_at"],
2289 },
2290 fail_on_empty=False,
2291 ):
2292 conflict = True
2293 elif db_update:
2294 db_update["modified_at"] = now
2295 if not db.set_one(
2296 "ro_tasks",
2297 q_filter={
2298 "_id": ro_task["_id"],
2299 "modified_at": ro_task["modified_at"],
2300 },
2301 update_dict=db_update,
2302 fail_on_empty=False,
2303 ):
2304 conflict = True
2305 if not conflict:
2306 return
2307 else:
2308 raise NsWorkerException("Exceeded {} retries".format(retries))
2309
2310 def run(self):
2311 # load database
2312 self.logger.info("Starting")
2313 while True:
2314 # step 1: get commands from queue
2315 try:
2316 if self.vim_targets:
2317 task = self.task_queue.get(block=False)
2318 else:
2319 if not self.idle:
2320 self.logger.debug("enters in idle state")
2321 self.idle = True
2322 task = self.task_queue.get(block=True)
2323 self.idle = False
2324
2325 if task[0] == "terminate":
2326 break
2327 elif task[0] == "load_vim":
2328 self.logger.info("order to load vim {}".format(task[1]))
2329 self._load_vim(task[1])
2330 elif task[0] == "unload_vim":
2331 self.logger.info("order to unload vim {}".format(task[1]))
2332 self._unload_vim(task[1])
2333 elif task[0] == "reload_vim":
2334 self._reload_vim(task[1])
2335 elif task[0] == "check_vim":
2336 self.logger.info("order to check vim {}".format(task[1]))
2337 self._check_vim(task[1])
2338 continue
2339 except Exception as e:
2340 if isinstance(e, queue.Empty):
2341 pass
2342 else:
2343 self.logger.critical(
2344 "Error processing task: {}".format(e), exc_info=True
2345 )
2346
2347 # step 2: process pending_tasks, delete not needed tasks
2348 try:
2349 if self.tasks_to_delete:
2350 self._process_delete_db_tasks()
2351 busy = False
2352 """
2353 # Log RO tasks only when loglevel is DEBUG
2354 if self.logger.getEffectiveLevel() == logging.DEBUG:
2355 _ = self._get_db_all_tasks()
2356 """
2357 ro_task = self._get_db_task()
2358 if ro_task:
2359 self._process_pending_tasks(ro_task)
2360 busy = True
2361 if not busy:
2362 time.sleep(5)
2363 except Exception as e:
2364 self.logger.critical(
2365 "Unexpected exception at run: " + str(e), exc_info=True
2366 )
2367
2368 self.logger.info("Finishing")