106830de3b5902d120c463ea4b7f04e3dad58f0c
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 }
215 self.logger.debug(
216 "task={} {} new-net={} created={}".format(
217 task_id, ro_task["target_id"], vim_net_id, created
218 )
219 )
220
221 return "BUILD", ro_vim_item_update
222 except (vimconn.VimConnException, NsWorkerException) as e:
223 self.logger.error(
224 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
225 )
226 ro_vim_item_update = {
227 "vim_status": "VIM_ERROR",
228 "created": created,
229 "vim_details": str(e),
230 }
231
232 return "FAILED", ro_vim_item_update
233
234 def refresh(self, ro_task):
235 """Call VIM to get network status"""
236 ro_task_id = ro_task["_id"]
237 target_vim = self.my_vims[ro_task["target_id"]]
238 vim_id = ro_task["vim_info"]["vim_id"]
239 net_to_refresh_list = [vim_id]
240
241 try:
242 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
243 vim_info = vim_dict[vim_id]
244
245 if vim_info["status"] == "ACTIVE":
246 task_status = "DONE"
247 elif vim_info["status"] == "BUILD":
248 task_status = "BUILD"
249 else:
250 task_status = "FAILED"
251 except vimconn.VimConnException as e:
252 # Mark all tasks at VIM_ERROR status
253 self.logger.error(
254 "ro_task={} vim={} get-net={}: {}".format(
255 ro_task_id, ro_task["target_id"], vim_id, e
256 )
257 )
258 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
259 task_status = "FAILED"
260
261 ro_vim_item_update = {}
262 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
263 ro_vim_item_update["vim_status"] = vim_info["status"]
264
265 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
266 ro_vim_item_update["vim_name"] = vim_info.get("name")
267
268 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
269 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
270 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
271 elif vim_info["status"] == "DELETED":
272 ro_vim_item_update["vim_id"] = None
273 ro_vim_item_update["vim_details"] = "Deleted externally"
274 else:
275 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
276 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
277
278 if ro_vim_item_update:
279 self.logger.debug(
280 "ro_task={} {} get-net={}: status={} {}".format(
281 ro_task_id,
282 ro_task["target_id"],
283 vim_id,
284 ro_vim_item_update.get("vim_status"),
285 ro_vim_item_update.get("vim_details")
286 if ro_vim_item_update.get("vim_status") != "ACTIVE"
287 else "",
288 )
289 )
290
291 return task_status, ro_vim_item_update
292
293 def delete(self, ro_task, task_index):
294 task = ro_task["tasks"][task_index]
295 task_id = task["task_id"]
296 net_vim_id = ro_task["vim_info"]["vim_id"]
297 ro_vim_item_update_ok = {
298 "vim_status": "DELETED",
299 "created": False,
300 "vim_details": "DELETED",
301 "vim_id": None,
302 }
303
304 try:
305 if net_vim_id or ro_task["vim_info"]["created_items"]:
306 target_vim = self.my_vims[ro_task["target_id"]]
307 target_vim.delete_network(
308 net_vim_id, ro_task["vim_info"]["created_items"]
309 )
310 except vimconn.VimConnNotFoundException:
311 ro_vim_item_update_ok["vim_details"] = "already deleted"
312 except vimconn.VimConnException as e:
313 self.logger.error(
314 "ro_task={} vim={} del-net={}: {}".format(
315 ro_task["_id"], ro_task["target_id"], net_vim_id, e
316 )
317 )
318 ro_vim_item_update = {
319 "vim_status": "VIM_ERROR",
320 "vim_details": "Error while deleting: {}".format(e),
321 }
322
323 return "FAILED", ro_vim_item_update
324
325 self.logger.debug(
326 "task={} {} del-net={} {}".format(
327 task_id,
328 ro_task["target_id"],
329 net_vim_id,
330 ro_vim_item_update_ok.get("vim_details", ""),
331 )
332 )
333
334 return "DONE", ro_vim_item_update_ok
335
336
337 class VimInteractionVdu(VimInteractionBase):
338 max_retries_inject_ssh_key = 20 # 20 times
339 time_retries_inject_ssh_key = 30 # wevery 30 seconds
340
341 def new(self, ro_task, task_index, task_depends):
342 task = ro_task["tasks"][task_index]
343 task_id = task["task_id"]
344 created = False
345 created_items = {}
346 target_vim = self.my_vims[ro_task["target_id"]]
347
348 try:
349 created = True
350 params = task["params"]
351 params_copy = deepcopy(params)
352 net_list = params_copy["net_list"]
353
354 for net in net_list:
355 # change task_id into network_id
356 if "net_id" in net and net["net_id"].startswith("TASK-"):
357 network_id = task_depends[net["net_id"]]
358
359 if not network_id:
360 raise NsWorkerException(
361 "Cannot create VM because depends on a network not created or found "
362 "for {}".format(net["net_id"])
363 )
364
365 net["net_id"] = network_id
366
367 if params_copy["image_id"].startswith("TASK-"):
368 params_copy["image_id"] = task_depends[params_copy["image_id"]]
369
370 if params_copy["flavor_id"].startswith("TASK-"):
371 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
372
373 affinity_group_list = params_copy["affinity_group_list"]
374 for affinity_group in affinity_group_list:
375 # change task_id into affinity_group_id
376 if "affinity_group_id" in affinity_group and affinity_group[
377 "affinity_group_id"
378 ].startswith("TASK-"):
379 affinity_group_id = task_depends[
380 affinity_group["affinity_group_id"]
381 ]
382
383 if not affinity_group_id:
384 raise NsWorkerException(
385 "found for {}".format(affinity_group["affinity_group_id"])
386 )
387
388 affinity_group["affinity_group_id"] = affinity_group_id
389
390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
392
393 ro_vim_item_update = {
394 "vim_id": vim_vm_id,
395 "vim_status": "BUILD",
396 "created": created,
397 "created_items": created_items,
398 "vim_details": None,
399 "interfaces_vim_ids": interfaces,
400 "interfaces": [],
401 }
402 self.logger.debug(
403 "task={} {} new-vm={} created={}".format(
404 task_id, ro_task["target_id"], vim_vm_id, created
405 )
406 )
407
408 return "BUILD", ro_vim_item_update
409 except (vimconn.VimConnException, NsWorkerException) as e:
410 self.logger.error(
411 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
412 )
413 ro_vim_item_update = {
414 "vim_status": "VIM_ERROR",
415 "created": created,
416 "vim_details": str(e),
417 }
418
419 return "FAILED", ro_vim_item_update
420
421 def delete(self, ro_task, task_index):
422 task = ro_task["tasks"][task_index]
423 task_id = task["task_id"]
424 vm_vim_id = ro_task["vim_info"]["vim_id"]
425 ro_vim_item_update_ok = {
426 "vim_status": "DELETED",
427 "created": False,
428 "vim_details": "DELETED",
429 "vim_id": None,
430 }
431
432 try:
433 if vm_vim_id or ro_task["vim_info"]["created_items"]:
434 target_vim = self.my_vims[ro_task["target_id"]]
435 target_vim.delete_vminstance(
436 vm_vim_id, ro_task["vim_info"]["created_items"]
437 )
438 except vimconn.VimConnNotFoundException:
439 ro_vim_item_update_ok["vim_details"] = "already deleted"
440 except vimconn.VimConnException as e:
441 self.logger.error(
442 "ro_task={} vim={} del-vm={}: {}".format(
443 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
444 )
445 )
446 ro_vim_item_update = {
447 "vim_status": "VIM_ERROR",
448 "vim_details": "Error while deleting: {}".format(e),
449 }
450
451 return "FAILED", ro_vim_item_update
452
453 self.logger.debug(
454 "task={} {} del-vm={} {}".format(
455 task_id,
456 ro_task["target_id"],
457 vm_vim_id,
458 ro_vim_item_update_ok.get("vim_details", ""),
459 )
460 )
461
462 return "DONE", ro_vim_item_update_ok
463
464 def refresh(self, ro_task):
465 """Call VIM to get vm status"""
466 ro_task_id = ro_task["_id"]
467 target_vim = self.my_vims[ro_task["target_id"]]
468 vim_id = ro_task["vim_info"]["vim_id"]
469
470 if not vim_id:
471 return None, None
472
473 vm_to_refresh_list = [vim_id]
474 try:
475 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
476 vim_info = vim_dict[vim_id]
477
478 if vim_info["status"] == "ACTIVE":
479 task_status = "DONE"
480 elif vim_info["status"] == "BUILD":
481 task_status = "BUILD"
482 else:
483 task_status = "FAILED"
484
485 # try to load and parse vim_information
486 try:
487 vim_info_info = yaml.safe_load(vim_info["vim_info"])
488 if vim_info_info.get("name"):
489 vim_info["name"] = vim_info_info["name"]
490 except Exception:
491 pass
492 except vimconn.VimConnException as e:
493 # Mark all tasks at VIM_ERROR status
494 self.logger.error(
495 "ro_task={} vim={} get-vm={}: {}".format(
496 ro_task_id, ro_task["target_id"], vim_id, e
497 )
498 )
499 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
500 task_status = "FAILED"
501
502 ro_vim_item_update = {}
503
504 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
505 vim_interfaces = []
506 if vim_info.get("interfaces"):
507 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
508 iface = next(
509 (
510 iface
511 for iface in vim_info["interfaces"]
512 if vim_iface_id == iface["vim_interface_id"]
513 ),
514 None,
515 )
516 # if iface:
517 # iface.pop("vim_info", None)
518 vim_interfaces.append(iface)
519
520 task_create = next(
521 t
522 for t in ro_task["tasks"]
523 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
524 )
525 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
526 vim_interfaces[task_create["mgmt_vnf_interface"]][
527 "mgmt_vnf_interface"
528 ] = True
529
530 mgmt_vdu_iface = task_create.get(
531 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
532 )
533 if vim_interfaces:
534 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
535
536 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
537 ro_vim_item_update["interfaces"] = vim_interfaces
538
539 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
540 ro_vim_item_update["vim_status"] = vim_info["status"]
541
542 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
543 ro_vim_item_update["vim_name"] = vim_info.get("name")
544
545 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
546 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
547 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
548 elif vim_info["status"] == "DELETED":
549 ro_vim_item_update["vim_id"] = None
550 ro_vim_item_update["vim_details"] = "Deleted externally"
551 else:
552 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
553 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
554
555 if ro_vim_item_update:
556 self.logger.debug(
557 "ro_task={} {} get-vm={}: status={} {}".format(
558 ro_task_id,
559 ro_task["target_id"],
560 vim_id,
561 ro_vim_item_update.get("vim_status"),
562 ro_vim_item_update.get("vim_details")
563 if ro_vim_item_update.get("vim_status") != "ACTIVE"
564 else "",
565 )
566 )
567
568 return task_status, ro_vim_item_update
569
570 def exec(self, ro_task, task_index, task_depends):
571 task = ro_task["tasks"][task_index]
572 task_id = task["task_id"]
573 target_vim = self.my_vims[ro_task["target_id"]]
574 db_task_update = {"retries": 0}
575 retries = task.get("retries", 0)
576
577 try:
578 params = task["params"]
579 params_copy = deepcopy(params)
580 params_copy["ro_key"] = self.db.decrypt(
581 params_copy.pop("private_key"),
582 params_copy.pop("schema_version"),
583 params_copy.pop("salt"),
584 )
585 params_copy["ip_addr"] = params_copy.pop("ip_address")
586 target_vim.inject_user_key(**params_copy)
587 self.logger.debug(
588 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
589 )
590
591 return (
592 "DONE",
593 None,
594 db_task_update,
595 ) # params_copy["key"]
596 except (vimconn.VimConnException, NsWorkerException) as e:
597 retries += 1
598
599 if retries < self.max_retries_inject_ssh_key:
600 return (
601 "BUILD",
602 None,
603 {
604 "retries": retries,
605 "next_retry": self.time_retries_inject_ssh_key,
606 },
607 )
608
609 self.logger.error(
610 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
611 )
612 ro_vim_item_update = {"vim_details": str(e)}
613
614 return "FAILED", ro_vim_item_update, db_task_update
615
616
617 class VimInteractionImage(VimInteractionBase):
618 def new(self, ro_task, task_index, task_depends):
619 task = ro_task["tasks"][task_index]
620 task_id = task["task_id"]
621 created = False
622 created_items = {}
623 target_vim = self.my_vims[ro_task["target_id"]]
624
625 try:
626 # FIND
627 if task.get("find_params"):
628 vim_images = target_vim.get_image_list(**task["find_params"])
629
630 if not vim_images:
631 raise NsWorkerExceptionNotFound(
632 "Image not found with this criteria: '{}'".format(
633 task["find_params"]
634 )
635 )
636 elif len(vim_images) > 1:
637 raise NsWorkerException(
638 "More than one image found with this criteria: '{}'".format(
639 task["find_params"]
640 )
641 )
642 else:
643 vim_image_id = vim_images[0]["id"]
644
645 ro_vim_item_update = {
646 "vim_id": vim_image_id,
647 "vim_status": "DONE",
648 "created": created,
649 "created_items": created_items,
650 "vim_details": None,
651 }
652 self.logger.debug(
653 "task={} {} new-image={} created={}".format(
654 task_id, ro_task["target_id"], vim_image_id, created
655 )
656 )
657
658 return "DONE", ro_vim_item_update
659 except (NsWorkerException, vimconn.VimConnException) as e:
660 self.logger.error(
661 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
662 )
663 ro_vim_item_update = {
664 "vim_status": "VIM_ERROR",
665 "created": created,
666 "vim_details": str(e),
667 }
668
669 return "FAILED", ro_vim_item_update
670
671
672 class VimInteractionFlavor(VimInteractionBase):
673 def delete(self, ro_task, task_index):
674 task = ro_task["tasks"][task_index]
675 task_id = task["task_id"]
676 flavor_vim_id = ro_task["vim_info"]["vim_id"]
677 ro_vim_item_update_ok = {
678 "vim_status": "DELETED",
679 "created": False,
680 "vim_details": "DELETED",
681 "vim_id": None,
682 }
683
684 try:
685 if flavor_vim_id:
686 target_vim = self.my_vims[ro_task["target_id"]]
687 target_vim.delete_flavor(flavor_vim_id)
688 except vimconn.VimConnNotFoundException:
689 ro_vim_item_update_ok["vim_details"] = "already deleted"
690 except vimconn.VimConnException as e:
691 self.logger.error(
692 "ro_task={} vim={} del-flavor={}: {}".format(
693 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
694 )
695 )
696 ro_vim_item_update = {
697 "vim_status": "VIM_ERROR",
698 "vim_details": "Error while deleting: {}".format(e),
699 }
700
701 return "FAILED", ro_vim_item_update
702
703 self.logger.debug(
704 "task={} {} del-flavor={} {}".format(
705 task_id,
706 ro_task["target_id"],
707 flavor_vim_id,
708 ro_vim_item_update_ok.get("vim_details", ""),
709 )
710 )
711
712 return "DONE", ro_vim_item_update_ok
713
714 def new(self, ro_task, task_index, task_depends):
715 task = ro_task["tasks"][task_index]
716 task_id = task["task_id"]
717 created = False
718 created_items = {}
719 target_vim = self.my_vims[ro_task["target_id"]]
720
721 try:
722 # FIND
723 vim_flavor_id = None
724
725 if task.get("find_params"):
726 try:
727 flavor_data = task["find_params"]["flavor_data"]
728 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
729 except vimconn.VimConnNotFoundException:
730 pass
731
732 if not vim_flavor_id and task.get("params"):
733 # CREATE
734 flavor_data = task["params"]["flavor_data"]
735 vim_flavor_id = target_vim.new_flavor(flavor_data)
736 created = True
737
738 ro_vim_item_update = {
739 "vim_id": vim_flavor_id,
740 "vim_status": "DONE",
741 "created": created,
742 "created_items": created_items,
743 "vim_details": None,
744 }
745 self.logger.debug(
746 "task={} {} new-flavor={} created={}".format(
747 task_id, ro_task["target_id"], vim_flavor_id, created
748 )
749 )
750
751 return "DONE", ro_vim_item_update
752 except (vimconn.VimConnException, NsWorkerException) as e:
753 self.logger.error(
754 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
755 )
756 ro_vim_item_update = {
757 "vim_status": "VIM_ERROR",
758 "created": created,
759 "vim_details": str(e),
760 }
761
762 return "FAILED", ro_vim_item_update
763
764
765 class VimInteractionAffinityGroup(VimInteractionBase):
766 def delete(self, ro_task, task_index):
767 task = ro_task["tasks"][task_index]
768 task_id = task["task_id"]
769 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
770 ro_vim_item_update_ok = {
771 "vim_status": "DELETED",
772 "created": False,
773 "vim_details": "DELETED",
774 "vim_id": None,
775 }
776
777 try:
778 if affinity_group_vim_id:
779 target_vim = self.my_vims[ro_task["target_id"]]
780 target_vim.delete_affinity_group(affinity_group_vim_id)
781 except vimconn.VimConnNotFoundException:
782 ro_vim_item_update_ok["vim_details"] = "already deleted"
783 except vimconn.VimConnException as e:
784 self.logger.error(
785 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
786 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
787 )
788 )
789 ro_vim_item_update = {
790 "vim_status": "VIM_ERROR",
791 "vim_details": "Error while deleting: {}".format(e),
792 }
793
794 return "FAILED", ro_vim_item_update
795
796 self.logger.debug(
797 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
798 task_id,
799 ro_task["target_id"],
800 affinity_group_vim_id,
801 ro_vim_item_update_ok.get("vim_details", ""),
802 )
803 )
804
805 return "DONE", ro_vim_item_update_ok
806
807 def new(self, ro_task, task_index, task_depends):
808 task = ro_task["tasks"][task_index]
809 task_id = task["task_id"]
810 created = False
811 created_items = {}
812 target_vim = self.my_vims[ro_task["target_id"]]
813
814 try:
815 affinity_group_vim_id = None
816
817 if task.get("params"):
818 affinity_group_data = task["params"]["affinity_group_data"]
819 affinity_group_vim_id = target_vim.new_affinity_group(
820 affinity_group_data
821 )
822 created = True
823
824 ro_vim_item_update = {
825 "vim_id": affinity_group_vim_id,
826 "vim_status": "DONE",
827 "created": created,
828 "created_items": created_items,
829 "vim_details": None,
830 }
831 self.logger.debug(
832 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
833 task_id, ro_task["target_id"], affinity_group_vim_id, created
834 )
835 )
836
837 return "DONE", ro_vim_item_update
838 except (vimconn.VimConnException, NsWorkerException) as e:
839 self.logger.error(
840 "task={} vim={} new-affinity-or-anti-affinity-group:"
841 " {}".format(task_id, ro_task["target_id"], e)
842 )
843 ro_vim_item_update = {
844 "vim_status": "VIM_ERROR",
845 "created": created,
846 "vim_details": str(e),
847 }
848
849 return "FAILED", ro_vim_item_update
850
851
852 class VimInteractionSdnNet(VimInteractionBase):
853 @staticmethod
854 def _match_pci(port_pci, mapping):
855 """
856 Check if port_pci matches with mapping
857 mapping can have brackets to indicate that several chars are accepted. e.g
858 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
859 :param port_pci: text
860 :param mapping: text, can contain brackets to indicate several chars are available
861 :return: True if matches, False otherwise
862 """
863 if not port_pci or not mapping:
864 return False
865 if port_pci == mapping:
866 return True
867
868 mapping_index = 0
869 pci_index = 0
870 while True:
871 bracket_start = mapping.find("[", mapping_index)
872
873 if bracket_start == -1:
874 break
875
876 bracket_end = mapping.find("]", bracket_start)
877 if bracket_end == -1:
878 break
879
880 length = bracket_start - mapping_index
881 if (
882 length
883 and port_pci[pci_index : pci_index + length]
884 != mapping[mapping_index:bracket_start]
885 ):
886 return False
887
888 if (
889 port_pci[pci_index + length]
890 not in mapping[bracket_start + 1 : bracket_end]
891 ):
892 return False
893
894 pci_index += length + 1
895 mapping_index = bracket_end + 1
896
897 if port_pci[pci_index:] != mapping[mapping_index:]:
898 return False
899
900 return True
901
902 def _get_interfaces(self, vlds_to_connect, vim_account_id):
903 """
904 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
905 :param vim_account_id:
906 :return:
907 """
908 interfaces = []
909
910 for vld in vlds_to_connect:
911 table, _, db_id = vld.partition(":")
912 db_id, _, vld = db_id.partition(":")
913 _, _, vld_id = vld.partition(".")
914
915 if table == "vnfrs":
916 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
917 iface_key = "vnf-vld-id"
918 else: # table == "nsrs"
919 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
920 iface_key = "ns-vld-id"
921
922 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
923
924 for db_vnfr in db_vnfrs:
925 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
926 for iface_index, interface in enumerate(vdur["interfaces"]):
927 if interface.get(iface_key) == vld_id and interface.get(
928 "type"
929 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
930 # only SR-IOV o PT
931 interface_ = interface.copy()
932 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
933 db_vnfr["_id"], vdu_index, iface_index
934 )
935
936 if vdur.get("status") == "ERROR":
937 interface_["status"] = "ERROR"
938
939 interfaces.append(interface_)
940
941 return interfaces
942
943 def refresh(self, ro_task):
944 # look for task create
945 task_create_index, _ = next(
946 i_t
947 for i_t in enumerate(ro_task["tasks"])
948 if i_t[1]
949 and i_t[1]["action"] == "CREATE"
950 and i_t[1]["status"] != "FINISHED"
951 )
952
953 return self.new(ro_task, task_create_index, None)
954
955 def new(self, ro_task, task_index, task_depends):
956
957 task = ro_task["tasks"][task_index]
958 task_id = task["task_id"]
959 target_vim = self.my_vims[ro_task["target_id"]]
960
961 sdn_net_id = ro_task["vim_info"]["vim_id"]
962
963 created_items = ro_task["vim_info"].get("created_items")
964 connected_ports = ro_task["vim_info"].get("connected_ports", [])
965 new_connected_ports = []
966 last_update = ro_task["vim_info"].get("last_update", 0)
967 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
968 error_list = []
969 created = ro_task["vim_info"].get("created", False)
970
971 try:
972 # CREATE
973 params = task["params"]
974 vlds_to_connect = params["vlds"]
975 associated_vim = params["target_vim"]
976 # external additional ports
977 additional_ports = params.get("sdn-ports") or ()
978 _, _, vim_account_id = associated_vim.partition(":")
979
980 if associated_vim:
981 # get associated VIM
982 if associated_vim not in self.db_vims:
983 self.db_vims[associated_vim] = self.db.get_one(
984 "vim_accounts", {"_id": vim_account_id}
985 )
986
987 db_vim = self.db_vims[associated_vim]
988
989 # look for ports to connect
990 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
991 # print(ports)
992
993 sdn_ports = []
994 pending_ports = error_ports = 0
995 vlan_used = None
996 sdn_need_update = False
997
998 for port in ports:
999 vlan_used = port.get("vlan") or vlan_used
1000
1001 # TODO. Do not connect if already done
1002 if not port.get("compute_node") or not port.get("pci"):
1003 if port.get("status") == "ERROR":
1004 error_ports += 1
1005 else:
1006 pending_ports += 1
1007 continue
1008
1009 pmap = None
1010 compute_node_mappings = next(
1011 (
1012 c
1013 for c in db_vim["config"].get("sdn-port-mapping", ())
1014 if c and c["compute_node"] == port["compute_node"]
1015 ),
1016 None,
1017 )
1018
1019 if compute_node_mappings:
1020 # process port_mapping pci of type 0000:af:1[01].[1357]
1021 pmap = next(
1022 (
1023 p
1024 for p in compute_node_mappings["ports"]
1025 if self._match_pci(port["pci"], p.get("pci"))
1026 ),
1027 None,
1028 )
1029
1030 if not pmap:
1031 if not db_vim["config"].get("mapping_not_needed"):
1032 error_list.append(
1033 "Port mapping not found for compute_node={} pci={}".format(
1034 port["compute_node"], port["pci"]
1035 )
1036 )
1037 continue
1038
1039 pmap = {}
1040
1041 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1042 new_port = {
1043 "service_endpoint_id": pmap.get("service_endpoint_id")
1044 or service_endpoint_id,
1045 "service_endpoint_encapsulation_type": "dot1q"
1046 if port["type"] == "SR-IOV"
1047 else None,
1048 "service_endpoint_encapsulation_info": {
1049 "vlan": port.get("vlan"),
1050 "mac": port.get("mac-address"),
1051 "device_id": pmap.get("device_id") or port["compute_node"],
1052 "device_interface_id": pmap.get("device_interface_id")
1053 or port["pci"],
1054 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1055 "switch_port": pmap.get("switch_port"),
1056 "service_mapping_info": pmap.get("service_mapping_info"),
1057 },
1058 }
1059
1060 # TODO
1061 # if port["modified_at"] > last_update:
1062 # sdn_need_update = True
1063 new_connected_ports.append(port["id"]) # TODO
1064 sdn_ports.append(new_port)
1065
1066 if error_ports:
1067 error_list.append(
1068 "{} interfaces have not been created as VDU is on ERROR status".format(
1069 error_ports
1070 )
1071 )
1072
1073 # connect external ports
1074 for index, additional_port in enumerate(additional_ports):
1075 additional_port_id = additional_port.get(
1076 "service_endpoint_id"
1077 ) or "external-{}".format(index)
1078 sdn_ports.append(
1079 {
1080 "service_endpoint_id": additional_port_id,
1081 "service_endpoint_encapsulation_type": additional_port.get(
1082 "service_endpoint_encapsulation_type", "dot1q"
1083 ),
1084 "service_endpoint_encapsulation_info": {
1085 "vlan": additional_port.get("vlan") or vlan_used,
1086 "mac": additional_port.get("mac_address"),
1087 "device_id": additional_port.get("device_id"),
1088 "device_interface_id": additional_port.get(
1089 "device_interface_id"
1090 ),
1091 "switch_dpid": additional_port.get("switch_dpid")
1092 or additional_port.get("switch_id"),
1093 "switch_port": additional_port.get("switch_port"),
1094 "service_mapping_info": additional_port.get(
1095 "service_mapping_info"
1096 ),
1097 },
1098 }
1099 )
1100 new_connected_ports.append(additional_port_id)
1101 sdn_info = ""
1102
1103 # if there are more ports to connect or they have been modified, call create/update
1104 if error_list:
1105 sdn_status = "ERROR"
1106 sdn_info = "; ".join(error_list)
1107 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1108 last_update = time.time()
1109
1110 if not sdn_net_id:
1111 if len(sdn_ports) < 2:
1112 sdn_status = "ACTIVE"
1113
1114 if not pending_ports:
1115 self.logger.debug(
1116 "task={} {} new-sdn-net done, less than 2 ports".format(
1117 task_id, ro_task["target_id"]
1118 )
1119 )
1120 else:
1121 net_type = params.get("type") or "ELAN"
1122 (
1123 sdn_net_id,
1124 created_items,
1125 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1126 created = True
1127 self.logger.debug(
1128 "task={} {} new-sdn-net={} created={}".format(
1129 task_id, ro_task["target_id"], sdn_net_id, created
1130 )
1131 )
1132 else:
1133 created_items = target_vim.edit_connectivity_service(
1134 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1135 )
1136 created = True
1137 self.logger.debug(
1138 "task={} {} update-sdn-net={} created={}".format(
1139 task_id, ro_task["target_id"], sdn_net_id, created
1140 )
1141 )
1142
1143 connected_ports = new_connected_ports
1144 elif sdn_net_id:
1145 wim_status_dict = target_vim.get_connectivity_service_status(
1146 sdn_net_id, conn_info=created_items
1147 )
1148 sdn_status = wim_status_dict["sdn_status"]
1149
1150 if wim_status_dict.get("sdn_info"):
1151 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1152
1153 if wim_status_dict.get("error_msg"):
1154 sdn_info = wim_status_dict.get("error_msg") or ""
1155
1156 if pending_ports:
1157 if sdn_status != "ERROR":
1158 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1159 len(ports) - pending_ports, len(ports)
1160 )
1161
1162 if sdn_status == "ACTIVE":
1163 sdn_status = "BUILD"
1164
1165 ro_vim_item_update = {
1166 "vim_id": sdn_net_id,
1167 "vim_status": sdn_status,
1168 "created": created,
1169 "created_items": created_items,
1170 "connected_ports": connected_ports,
1171 "vim_details": sdn_info,
1172 "last_update": last_update,
1173 }
1174
1175 return sdn_status, ro_vim_item_update
1176 except Exception as e:
1177 self.logger.error(
1178 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1179 exc_info=not isinstance(
1180 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1181 ),
1182 )
1183 ro_vim_item_update = {
1184 "vim_status": "VIM_ERROR",
1185 "created": created,
1186 "vim_details": str(e),
1187 }
1188
1189 return "FAILED", ro_vim_item_update
1190
1191 def delete(self, ro_task, task_index):
1192 task = ro_task["tasks"][task_index]
1193 task_id = task["task_id"]
1194 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1195 ro_vim_item_update_ok = {
1196 "vim_status": "DELETED",
1197 "created": False,
1198 "vim_details": "DELETED",
1199 "vim_id": None,
1200 }
1201
1202 try:
1203 if sdn_vim_id:
1204 target_vim = self.my_vims[ro_task["target_id"]]
1205 target_vim.delete_connectivity_service(
1206 sdn_vim_id, ro_task["vim_info"].get("created_items")
1207 )
1208
1209 except Exception as e:
1210 if (
1211 isinstance(e, sdnconn.SdnConnectorError)
1212 and e.http_code == HTTPStatus.NOT_FOUND.value
1213 ):
1214 ro_vim_item_update_ok["vim_details"] = "already deleted"
1215 else:
1216 self.logger.error(
1217 "ro_task={} vim={} del-sdn-net={}: {}".format(
1218 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1219 ),
1220 exc_info=not isinstance(
1221 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1222 ),
1223 )
1224 ro_vim_item_update = {
1225 "vim_status": "VIM_ERROR",
1226 "vim_details": "Error while deleting: {}".format(e),
1227 }
1228
1229 return "FAILED", ro_vim_item_update
1230
1231 self.logger.debug(
1232 "task={} {} del-sdn-net={} {}".format(
1233 task_id,
1234 ro_task["target_id"],
1235 sdn_vim_id,
1236 ro_vim_item_update_ok.get("vim_details", ""),
1237 )
1238 )
1239
1240 return "DONE", ro_vim_item_update_ok
1241
1242
1243 class NsWorker(threading.Thread):
1244 REFRESH_BUILD = 5 # 5 seconds
1245 REFRESH_ACTIVE = 60 # 1 minute
1246 REFRESH_ERROR = 600
1247 REFRESH_IMAGE = 3600 * 10
1248 REFRESH_DELETE = 3600 * 10
1249 QUEUE_SIZE = 100
1250 terminate = False
1251
1252 def __init__(self, worker_index, config, plugins, db):
1253 """
1254
1255 :param worker_index: thread index
1256 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1257 :param plugins: global shared dict with the loaded plugins
1258 :param db: database class instance to use
1259 """
1260 threading.Thread.__init__(self)
1261 self.config = config
1262 self.plugins = plugins
1263 self.plugin_name = "unknown"
1264 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1265 self.worker_index = worker_index
1266 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1267 # targetvim: vimplugin class
1268 self.my_vims = {}
1269 # targetvim: vim information from database
1270 self.db_vims = {}
1271 # targetvim list
1272 self.vim_targets = []
1273 self.my_id = config["process_id"] + ":" + str(worker_index)
1274 self.db = db
1275 self.item2class = {
1276 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1277 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1278 "image": VimInteractionImage(
1279 self.db, self.my_vims, self.db_vims, self.logger
1280 ),
1281 "flavor": VimInteractionFlavor(
1282 self.db, self.my_vims, self.db_vims, self.logger
1283 ),
1284 "sdn_net": VimInteractionSdnNet(
1285 self.db, self.my_vims, self.db_vims, self.logger
1286 ),
1287 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1288 self.db, self.my_vims, self.db_vims, self.logger
1289 ),
1290 }
1291 self.time_last_task_processed = None
1292 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1293 self.tasks_to_delete = []
1294 # it is idle when there are not vim_targets associated
1295 self.idle = True
1296 self.task_locked_time = config["global"]["task_locked_time"]
1297
1298 def insert_task(self, task):
1299 try:
1300 self.task_queue.put(task, False)
1301 return None
1302 except queue.Full:
1303 raise NsWorkerException("timeout inserting a task")
1304
1305 def terminate(self):
1306 self.insert_task("exit")
1307
1308 def del_task(self, task):
1309 with self.task_lock:
1310 if task["status"] == "SCHEDULED":
1311 task["status"] = "SUPERSEDED"
1312 return True
1313 else: # task["status"] == "processing"
1314 self.task_lock.release()
1315 return False
1316
1317 def _process_vim_config(self, target_id, db_vim):
1318 """
1319 Process vim config, creating vim configuration files as ca_cert
1320 :param target_id: vim/sdn/wim + id
1321 :param db_vim: Vim dictionary obtained from database
1322 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1323 """
1324 if not db_vim.get("config"):
1325 return
1326
1327 file_name = ""
1328
1329 try:
1330 if db_vim["config"].get("ca_cert_content"):
1331 file_name = "{}:{}".format(target_id, self.worker_index)
1332
1333 try:
1334 mkdir(file_name)
1335 except FileExistsError:
1336 pass
1337
1338 file_name = file_name + "/ca_cert"
1339
1340 with open(file_name, "w") as f:
1341 f.write(db_vim["config"]["ca_cert_content"])
1342 del db_vim["config"]["ca_cert_content"]
1343 db_vim["config"]["ca_cert"] = file_name
1344 except Exception as e:
1345 raise NsWorkerException(
1346 "Error writing to file '{}': {}".format(file_name, e)
1347 )
1348
1349 def _load_plugin(self, name, type="vim"):
1350 # type can be vim or sdn
1351 if "rovim_dummy" not in self.plugins:
1352 self.plugins["rovim_dummy"] = VimDummyConnector
1353
1354 if "rosdn_dummy" not in self.plugins:
1355 self.plugins["rosdn_dummy"] = SdnDummyConnector
1356
1357 if name in self.plugins:
1358 return self.plugins[name]
1359
1360 try:
1361 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1362 self.plugins[name] = ep.load()
1363 except Exception as e:
1364 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1365
1366 if name and name not in self.plugins:
1367 raise NsWorkerException(
1368 "Plugin 'osm_{n}' has not been installed".format(n=name)
1369 )
1370
1371 return self.plugins[name]
1372
1373 def _unload_vim(self, target_id):
1374 """
1375 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1376 :param target_id: Contains type:_id; where type can be 'vim', ...
1377 :return: None.
1378 """
1379 try:
1380 self.db_vims.pop(target_id, None)
1381 self.my_vims.pop(target_id, None)
1382
1383 if target_id in self.vim_targets:
1384 self.vim_targets.remove(target_id)
1385
1386 self.logger.info("Unloaded {}".format(target_id))
1387 rmtree("{}:{}".format(target_id, self.worker_index))
1388 except FileNotFoundError:
1389 pass # this is raised by rmtree if folder does not exist
1390 except Exception as e:
1391 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1392
1393 def _check_vim(self, target_id):
1394 """
1395 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1396 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1397 :return: None.
1398 """
1399 target, _, _id = target_id.partition(":")
1400 now = time.time()
1401 update_dict = {}
1402 unset_dict = {}
1403 op_text = ""
1404 step = ""
1405 loaded = target_id in self.vim_targets
1406 target_database = (
1407 "vim_accounts"
1408 if target == "vim"
1409 else "wim_accounts"
1410 if target == "wim"
1411 else "sdns"
1412 )
1413
1414 try:
1415 step = "Getting {} from db".format(target_id)
1416 db_vim = self.db.get_one(target_database, {"_id": _id})
1417
1418 for op_index, operation in enumerate(
1419 db_vim["_admin"].get("operations", ())
1420 ):
1421 if operation["operationState"] != "PROCESSING":
1422 continue
1423
1424 locked_at = operation.get("locked_at")
1425
1426 if locked_at is not None and locked_at >= now - self.task_locked_time:
1427 # some other thread is doing this operation
1428 return
1429
1430 # lock
1431 op_text = "_admin.operations.{}.".format(op_index)
1432
1433 if not self.db.set_one(
1434 target_database,
1435 q_filter={
1436 "_id": _id,
1437 op_text + "operationState": "PROCESSING",
1438 op_text + "locked_at": locked_at,
1439 },
1440 update_dict={
1441 op_text + "locked_at": now,
1442 "admin.current_operation": op_index,
1443 },
1444 fail_on_empty=False,
1445 ):
1446 return
1447
1448 unset_dict[op_text + "locked_at"] = None
1449 unset_dict["current_operation"] = None
1450 step = "Loading " + target_id
1451 error_text = self._load_vim(target_id)
1452
1453 if not error_text:
1454 step = "Checking connectivity"
1455
1456 if target == "vim":
1457 self.my_vims[target_id].check_vim_connectivity()
1458 else:
1459 self.my_vims[target_id].check_credentials()
1460
1461 update_dict["_admin.operationalState"] = "ENABLED"
1462 update_dict["_admin.detailed-status"] = ""
1463 unset_dict[op_text + "detailed-status"] = None
1464 update_dict[op_text + "operationState"] = "COMPLETED"
1465
1466 return
1467
1468 except Exception as e:
1469 error_text = "{}: {}".format(step, e)
1470 self.logger.error("{} for {}: {}".format(step, target_id, e))
1471
1472 finally:
1473 if update_dict or unset_dict:
1474 if error_text:
1475 update_dict[op_text + "operationState"] = "FAILED"
1476 update_dict[op_text + "detailed-status"] = error_text
1477 unset_dict.pop(op_text + "detailed-status", None)
1478 update_dict["_admin.operationalState"] = "ERROR"
1479 update_dict["_admin.detailed-status"] = error_text
1480
1481 if op_text:
1482 update_dict[op_text + "statusEnteredTime"] = now
1483
1484 self.db.set_one(
1485 target_database,
1486 q_filter={"_id": _id},
1487 update_dict=update_dict,
1488 unset=unset_dict,
1489 fail_on_empty=False,
1490 )
1491
1492 if not loaded:
1493 self._unload_vim(target_id)
1494
1495 def _reload_vim(self, target_id):
1496 if target_id in self.vim_targets:
1497 self._load_vim(target_id)
1498 else:
1499 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1500 # just remove it to force load again next time it is needed
1501 self.db_vims.pop(target_id, None)
1502
1503 def _load_vim(self, target_id):
1504 """
1505 Load or reload a vim_account, sdn_controller or wim_account.
1506 Read content from database, load the plugin if not loaded.
1507 In case of error loading the plugin, it load a failing VIM_connector
1508 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1509 :param target_id: Contains type:_id; where type can be 'vim', ...
1510 :return: None if ok, descriptive text if error
1511 """
1512 target, _, _id = target_id.partition(":")
1513 target_database = (
1514 "vim_accounts"
1515 if target == "vim"
1516 else "wim_accounts"
1517 if target == "wim"
1518 else "sdns"
1519 )
1520 plugin_name = ""
1521 vim = None
1522
1523 try:
1524 step = "Getting {}={} from db".format(target, _id)
1525 # TODO process for wim, sdnc, ...
1526 vim = self.db.get_one(target_database, {"_id": _id})
1527
1528 # if deep_get(vim, "config", "sdn-controller"):
1529 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1530 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1531
1532 step = "Decrypting password"
1533 schema_version = vim.get("schema_version")
1534 self.db.encrypt_decrypt_fields(
1535 vim,
1536 "decrypt",
1537 fields=("password", "secret"),
1538 schema_version=schema_version,
1539 salt=_id,
1540 )
1541 self._process_vim_config(target_id, vim)
1542
1543 if target == "vim":
1544 plugin_name = "rovim_" + vim["vim_type"]
1545 step = "Loading plugin '{}'".format(plugin_name)
1546 vim_module_conn = self._load_plugin(plugin_name)
1547 step = "Loading {}'".format(target_id)
1548 self.my_vims[target_id] = vim_module_conn(
1549 uuid=vim["_id"],
1550 name=vim["name"],
1551 tenant_id=vim.get("vim_tenant_id"),
1552 tenant_name=vim.get("vim_tenant_name"),
1553 url=vim["vim_url"],
1554 url_admin=None,
1555 user=vim["vim_user"],
1556 passwd=vim["vim_password"],
1557 config=vim.get("config") or {},
1558 persistent_info={},
1559 )
1560 else: # sdn
1561 plugin_name = "rosdn_" + vim["type"]
1562 step = "Loading plugin '{}'".format(plugin_name)
1563 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1564 step = "Loading {}'".format(target_id)
1565 wim = deepcopy(vim)
1566 wim_config = wim.pop("config", {}) or {}
1567 wim["uuid"] = wim["_id"]
1568 wim["wim_url"] = wim["url"]
1569
1570 if wim.get("dpid"):
1571 wim_config["dpid"] = wim.pop("dpid")
1572
1573 if wim.get("switch_id"):
1574 wim_config["switch_id"] = wim.pop("switch_id")
1575
1576 # wim, wim_account, config
1577 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1578 self.db_vims[target_id] = vim
1579 self.error_status = None
1580
1581 self.logger.info(
1582 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1583 )
1584 except Exception as e:
1585 self.logger.error(
1586 "Cannot load {} plugin={}: {} {}".format(
1587 target_id, plugin_name, step, e
1588 )
1589 )
1590
1591 self.db_vims[target_id] = vim or {}
1592 self.db_vims[target_id] = FailingConnector(str(e))
1593 error_status = "{} Error: {}".format(step, e)
1594
1595 return error_status
1596 finally:
1597 if target_id not in self.vim_targets:
1598 self.vim_targets.append(target_id)
1599
1600 def _get_db_task(self):
1601 """
1602 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1603 :return: None
1604 """
1605 now = time.time()
1606
1607 if not self.time_last_task_processed:
1608 self.time_last_task_processed = now
1609
1610 try:
1611 while True:
1612 """
1613 # Log RO tasks only when loglevel is DEBUG
1614 if self.logger.getEffectiveLevel() == logging.DEBUG:
1615 self._log_ro_task(
1616 None,
1617 None,
1618 None,
1619 "TASK_WF",
1620 "task_locked_time="
1621 + str(self.task_locked_time)
1622 + " "
1623 + "time_last_task_processed="
1624 + str(self.time_last_task_processed)
1625 + " "
1626 + "now="
1627 + str(now),
1628 )
1629 """
1630 locked = self.db.set_one(
1631 "ro_tasks",
1632 q_filter={
1633 "target_id": self.vim_targets,
1634 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1635 "locked_at.lt": now - self.task_locked_time,
1636 "to_check_at.lt": self.time_last_task_processed,
1637 },
1638 update_dict={"locked_by": self.my_id, "locked_at": now},
1639 fail_on_empty=False,
1640 )
1641
1642 if locked:
1643 # read and return
1644 ro_task = self.db.get_one(
1645 "ro_tasks",
1646 q_filter={
1647 "target_id": self.vim_targets,
1648 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1649 "locked_at": now,
1650 },
1651 )
1652 return ro_task
1653
1654 if self.time_last_task_processed == now:
1655 self.time_last_task_processed = None
1656 return None
1657 else:
1658 self.time_last_task_processed = now
1659 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1660
1661 except DbException as e:
1662 self.logger.error("Database exception at _get_db_task: {}".format(e))
1663 except Exception as e:
1664 self.logger.critical(
1665 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1666 )
1667
1668 return None
1669
1670 def _get_db_all_tasks(self):
1671 """
1672 Read all content of table ro_tasks to log it
1673 :return: None
1674 """
1675 try:
1676 # Checking the content of the BD:
1677
1678 # read and return
1679 ro_task = self.db.get_list("ro_tasks")
1680 for rt in ro_task:
1681 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1682 return ro_task
1683
1684 except DbException as e:
1685 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1686 except Exception as e:
1687 self.logger.critical(
1688 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1689 )
1690
1691 return None
1692
1693 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1694 """
1695 Generate a log with the following format:
1696
1697 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1698 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1699 task_array_index;task_id;task_action;task_item;task_args
1700
1701 Example:
1702
1703 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1704 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1705 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1706 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1707 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1708 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1709 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1710 """
1711 try:
1712 line = []
1713 i = 0
1714 if ro_task is not None and isinstance(ro_task, dict):
1715 for t in ro_task["tasks"]:
1716 line.clear()
1717 line.append(mark)
1718 line.append(event)
1719 line.append(ro_task.get("_id", ""))
1720 line.append(str(ro_task.get("locked_at", "")))
1721 line.append(str(ro_task.get("modified_at", "")))
1722 line.append(str(ro_task.get("created_at", "")))
1723 line.append(str(ro_task.get("to_check_at", "")))
1724 line.append(str(ro_task.get("locked_by", "")))
1725 line.append(str(ro_task.get("target_id", "")))
1726 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1727 line.append(str(ro_task.get("vim_info", "")))
1728 line.append(str(ro_task.get("tasks", "")))
1729 if isinstance(t, dict):
1730 line.append(str(t.get("status", "")))
1731 line.append(str(t.get("action_id", "")))
1732 line.append(str(i))
1733 line.append(str(t.get("task_id", "")))
1734 line.append(str(t.get("action", "")))
1735 line.append(str(t.get("item", "")))
1736 line.append(str(t.get("find_params", "")))
1737 line.append(str(t.get("params", "")))
1738 else:
1739 line.extend([""] * 2)
1740 line.append(str(i))
1741 line.extend([""] * 5)
1742
1743 i += 1
1744 self.logger.debug(";".join(line))
1745 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1746 i = 0
1747 while True:
1748 st = "tasks.{}.status".format(i)
1749 if st not in db_ro_task_update:
1750 break
1751 line.clear()
1752 line.append(mark)
1753 line.append(event)
1754 line.append(db_ro_task_update.get("_id", ""))
1755 line.append(str(db_ro_task_update.get("locked_at", "")))
1756 line.append(str(db_ro_task_update.get("modified_at", "")))
1757 line.append("")
1758 line.append(str(db_ro_task_update.get("to_check_at", "")))
1759 line.append(str(db_ro_task_update.get("locked_by", "")))
1760 line.append("")
1761 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1762 line.append("")
1763 line.append(str(db_ro_task_update.get("vim_info", "")))
1764 line.append(str(str(db_ro_task_update).count(".status")))
1765 line.append(db_ro_task_update.get(st, ""))
1766 line.append("")
1767 line.append(str(i))
1768 line.extend([""] * 3)
1769 i += 1
1770 self.logger.debug(";".join(line))
1771
1772 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1773 line.clear()
1774 line.append(mark)
1775 line.append(event)
1776 line.append(db_ro_task_delete.get("_id", ""))
1777 line.append("")
1778 line.append(db_ro_task_delete.get("modified_at", ""))
1779 line.extend([""] * 13)
1780 self.logger.debug(";".join(line))
1781
1782 else:
1783 line.clear()
1784 line.append(mark)
1785 line.append(event)
1786 line.extend([""] * 16)
1787 self.logger.debug(";".join(line))
1788
1789 except Exception as e:
1790 self.logger.error("Error logging ro_task: {}".format(e))
1791
1792 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1793 """
1794 Determine if this task need to be done or superseded
1795 :return: None
1796 """
1797 my_task = ro_task["tasks"][task_index]
1798 task_id = my_task["task_id"]
1799 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1800 "created_items", False
1801 )
1802
1803 if my_task["status"] == "FAILED":
1804 return None, None # TODO need to be retry??
1805
1806 try:
1807 for index, task in enumerate(ro_task["tasks"]):
1808 if index == task_index or not task:
1809 continue # own task
1810
1811 if (
1812 my_task["target_record"] == task["target_record"]
1813 and task["action"] == "CREATE"
1814 ):
1815 # set to finished
1816 db_update["tasks.{}.status".format(index)] = task[
1817 "status"
1818 ] = "FINISHED"
1819 elif task["action"] == "CREATE" and task["status"] not in (
1820 "FINISHED",
1821 "SUPERSEDED",
1822 ):
1823 needed_delete = False
1824
1825 if needed_delete:
1826 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1827 else:
1828 return "SUPERSEDED", None
1829 except Exception as e:
1830 if not isinstance(e, NsWorkerException):
1831 self.logger.critical(
1832 "Unexpected exception at _delete_task task={}: {}".format(
1833 task_id, e
1834 ),
1835 exc_info=True,
1836 )
1837
1838 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1839
1840 def _create_task(self, ro_task, task_index, task_depends, db_update):
1841 """
1842 Determine if this task need to create something at VIM
1843 :return: None
1844 """
1845 my_task = ro_task["tasks"][task_index]
1846 task_id = my_task["task_id"]
1847 task_status = None
1848
1849 if my_task["status"] == "FAILED":
1850 return None, None # TODO need to be retry??
1851 elif my_task["status"] == "SCHEDULED":
1852 # check if already created by another task
1853 for index, task in enumerate(ro_task["tasks"]):
1854 if index == task_index or not task:
1855 continue # own task
1856
1857 if task["action"] == "CREATE" and task["status"] not in (
1858 "SCHEDULED",
1859 "FINISHED",
1860 "SUPERSEDED",
1861 ):
1862 return task["status"], "COPY_VIM_INFO"
1863
1864 try:
1865 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1866 ro_task, task_index, task_depends
1867 )
1868 # TODO update other CREATE tasks
1869 except Exception as e:
1870 if not isinstance(e, NsWorkerException):
1871 self.logger.error(
1872 "Error executing task={}: {}".format(task_id, e), exc_info=True
1873 )
1874
1875 task_status = "FAILED"
1876 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1877 # TODO update ro_vim_item_update
1878
1879 return task_status, ro_vim_item_update
1880 else:
1881 return None, None
1882
1883 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1884 """
1885 Look for dependency task
1886 :param task_id: Can be one of
1887 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1888 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1889 3. task.task_id: "<action_id>:number"
1890 :param ro_task:
1891 :param target_id:
1892 :return: database ro_task plus index of task
1893 """
1894 if (
1895 task_id.startswith("vim:")
1896 or task_id.startswith("sdn:")
1897 or task_id.startswith("wim:")
1898 ):
1899 target_id, _, task_id = task_id.partition(" ")
1900
1901 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1902 ro_task_dependency = self.db.get_one(
1903 "ro_tasks",
1904 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1905 fail_on_empty=False,
1906 )
1907
1908 if ro_task_dependency:
1909 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1910 if task["target_record_id"] == task_id:
1911 return ro_task_dependency, task_index
1912
1913 else:
1914 if ro_task:
1915 for task_index, task in enumerate(ro_task["tasks"]):
1916 if task and task["task_id"] == task_id:
1917 return ro_task, task_index
1918
1919 ro_task_dependency = self.db.get_one(
1920 "ro_tasks",
1921 q_filter={
1922 "tasks.ANYINDEX.task_id": task_id,
1923 "tasks.ANYINDEX.target_record.ne": None,
1924 },
1925 fail_on_empty=False,
1926 )
1927
1928 if ro_task_dependency:
1929 for task_index, task in ro_task_dependency["tasks"]:
1930 if task["task_id"] == task_id:
1931 return ro_task_dependency, task_index
1932 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1933
1934 def _process_pending_tasks(self, ro_task):
1935 ro_task_id = ro_task["_id"]
1936 now = time.time()
1937 # one day
1938 next_check_at = now + (24 * 60 * 60)
1939 db_ro_task_update = {}
1940
1941 def _update_refresh(new_status):
1942 # compute next_refresh
1943 nonlocal task
1944 nonlocal next_check_at
1945 nonlocal db_ro_task_update
1946 nonlocal ro_task
1947
1948 next_refresh = time.time()
1949
1950 if task["item"] in ("image", "flavor"):
1951 next_refresh += self.REFRESH_IMAGE
1952 elif new_status == "BUILD":
1953 next_refresh += self.REFRESH_BUILD
1954 elif new_status == "DONE":
1955 next_refresh += self.REFRESH_ACTIVE
1956 else:
1957 next_refresh += self.REFRESH_ERROR
1958
1959 next_check_at = min(next_check_at, next_refresh)
1960 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1961 ro_task["vim_info"]["refresh_at"] = next_refresh
1962
1963 try:
1964 """
1965 # Log RO tasks only when loglevel is DEBUG
1966 if self.logger.getEffectiveLevel() == logging.DEBUG:
1967 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1968 """
1969 # 0: get task_status_create
1970 lock_object = None
1971 task_status_create = None
1972 task_create = next(
1973 (
1974 t
1975 for t in ro_task["tasks"]
1976 if t
1977 and t["action"] == "CREATE"
1978 and t["status"] in ("BUILD", "DONE")
1979 ),
1980 None,
1981 )
1982
1983 if task_create:
1984 task_status_create = task_create["status"]
1985
1986 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1987 for task_action in ("DELETE", "CREATE", "EXEC"):
1988 db_vim_update = None
1989 new_status = None
1990
1991 for task_index, task in enumerate(ro_task["tasks"]):
1992 if not task:
1993 continue # task deleted
1994
1995 task_depends = {}
1996 target_update = None
1997
1998 if (
1999 (
2000 task_action in ("DELETE", "EXEC")
2001 and task["status"] not in ("SCHEDULED", "BUILD")
2002 )
2003 or task["action"] != task_action
2004 or (
2005 task_action == "CREATE"
2006 and task["status"] in ("FINISHED", "SUPERSEDED")
2007 )
2008 ):
2009 continue
2010
2011 task_path = "tasks.{}.status".format(task_index)
2012 try:
2013 db_vim_info_update = None
2014
2015 if task["status"] == "SCHEDULED":
2016 # check if tasks that this depends on have been completed
2017 dependency_not_completed = False
2018
2019 for dependency_task_id in task.get("depends_on") or ():
2020 (
2021 dependency_ro_task,
2022 dependency_task_index,
2023 ) = self._get_dependency(
2024 dependency_task_id, target_id=ro_task["target_id"]
2025 )
2026 dependency_task = dependency_ro_task["tasks"][
2027 dependency_task_index
2028 ]
2029
2030 if dependency_task["status"] == "SCHEDULED":
2031 dependency_not_completed = True
2032 next_check_at = min(
2033 next_check_at, dependency_ro_task["to_check_at"]
2034 )
2035 # must allow dependent task to be processed first
2036 # to do this set time after last_task_processed
2037 next_check_at = max(
2038 self.time_last_task_processed, next_check_at
2039 )
2040 break
2041 elif dependency_task["status"] == "FAILED":
2042 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2043 task["action"],
2044 task["item"],
2045 dependency_task["action"],
2046 dependency_task["item"],
2047 dependency_task_id,
2048 dependency_ro_task["vim_info"].get(
2049 "vim_details"
2050 ),
2051 )
2052 self.logger.error(
2053 "task={} {}".format(task["task_id"], error_text)
2054 )
2055 raise NsWorkerException(error_text)
2056
2057 task_depends[dependency_task_id] = dependency_ro_task[
2058 "vim_info"
2059 ]["vim_id"]
2060 task_depends[
2061 "TASK-{}".format(dependency_task_id)
2062 ] = dependency_ro_task["vim_info"]["vim_id"]
2063
2064 if dependency_not_completed:
2065 # TODO set at vim_info.vim_details that it is waiting
2066 continue
2067
2068 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2069 # the task of renew this locking. It will update database locket_at periodically
2070 if not lock_object:
2071 lock_object = LockRenew.add_lock_object(
2072 "ro_tasks", ro_task, self
2073 )
2074
2075 if task["action"] == "DELETE":
2076 (new_status, db_vim_info_update,) = self._delete_task(
2077 ro_task, task_index, task_depends, db_ro_task_update
2078 )
2079 new_status = (
2080 "FINISHED" if new_status == "DONE" else new_status
2081 )
2082 # ^with FINISHED instead of DONE it will not be refreshing
2083
2084 if new_status in ("FINISHED", "SUPERSEDED"):
2085 target_update = "DELETE"
2086 elif task["action"] == "EXEC":
2087 (
2088 new_status,
2089 db_vim_info_update,
2090 db_task_update,
2091 ) = self.item2class[task["item"]].exec(
2092 ro_task, task_index, task_depends
2093 )
2094 new_status = (
2095 "FINISHED" if new_status == "DONE" else new_status
2096 )
2097 # ^with FINISHED instead of DONE it will not be refreshing
2098
2099 if db_task_update:
2100 # load into database the modified db_task_update "retries" and "next_retry"
2101 if db_task_update.get("retries"):
2102 db_ro_task_update[
2103 "tasks.{}.retries".format(task_index)
2104 ] = db_task_update["retries"]
2105
2106 next_check_at = time.time() + db_task_update.get(
2107 "next_retry", 60
2108 )
2109 target_update = None
2110 elif task["action"] == "CREATE":
2111 if task["status"] == "SCHEDULED":
2112 if task_status_create:
2113 new_status = task_status_create
2114 target_update = "COPY_VIM_INFO"
2115 else:
2116 new_status, db_vim_info_update = self.item2class[
2117 task["item"]
2118 ].new(ro_task, task_index, task_depends)
2119 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2120 _update_refresh(new_status)
2121 else:
2122 if (
2123 ro_task["vim_info"]["refresh_at"]
2124 and now > ro_task["vim_info"]["refresh_at"]
2125 ):
2126 new_status, db_vim_info_update = self.item2class[
2127 task["item"]
2128 ].refresh(ro_task)
2129 _update_refresh(new_status)
2130 else:
2131 # The refresh is updated to avoid set the value of "refresh_at" to
2132 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2133 # because it can happen that in this case the task is never processed
2134 _update_refresh(task["status"])
2135
2136 except Exception as e:
2137 new_status = "FAILED"
2138 db_vim_info_update = {
2139 "vim_status": "VIM_ERROR",
2140 "vim_details": str(e),
2141 }
2142
2143 if not isinstance(
2144 e, (NsWorkerException, vimconn.VimConnException)
2145 ):
2146 self.logger.error(
2147 "Unexpected exception at _delete_task task={}: {}".format(
2148 task["task_id"], e
2149 ),
2150 exc_info=True,
2151 )
2152
2153 try:
2154 if db_vim_info_update:
2155 db_vim_update = db_vim_info_update.copy()
2156 db_ro_task_update.update(
2157 {
2158 "vim_info." + k: v
2159 for k, v in db_vim_info_update.items()
2160 }
2161 )
2162 ro_task["vim_info"].update(db_vim_info_update)
2163
2164 if new_status:
2165 if task_action == "CREATE":
2166 task_status_create = new_status
2167 db_ro_task_update[task_path] = new_status
2168
2169 if target_update or db_vim_update:
2170 if target_update == "DELETE":
2171 self._update_target(task, None)
2172 elif target_update == "COPY_VIM_INFO":
2173 self._update_target(task, ro_task["vim_info"])
2174 else:
2175 self._update_target(task, db_vim_update)
2176
2177 except Exception as e:
2178 if (
2179 isinstance(e, DbException)
2180 and e.http_code == HTTPStatus.NOT_FOUND
2181 ):
2182 # if the vnfrs or nsrs has been removed from database, this task must be removed
2183 self.logger.debug(
2184 "marking to delete task={}".format(task["task_id"])
2185 )
2186 self.tasks_to_delete.append(task)
2187 else:
2188 self.logger.error(
2189 "Unexpected exception at _update_target task={}: {}".format(
2190 task["task_id"], e
2191 ),
2192 exc_info=True,
2193 )
2194
2195 locked_at = ro_task["locked_at"]
2196
2197 if lock_object:
2198 locked_at = [
2199 lock_object["locked_at"],
2200 lock_object["locked_at"] + self.task_locked_time,
2201 ]
2202 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2203 # contain exactly locked_at + self.task_locked_time
2204 LockRenew.remove_lock_object(lock_object)
2205
2206 q_filter = {
2207 "_id": ro_task["_id"],
2208 "to_check_at": ro_task["to_check_at"],
2209 "locked_at": locked_at,
2210 }
2211 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2212 # outside this task (by ro_nbi) do not update it
2213 db_ro_task_update["locked_by"] = None
2214 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2215 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2216 db_ro_task_update["modified_at"] = now
2217 db_ro_task_update["to_check_at"] = next_check_at
2218
2219 """
2220 # Log RO tasks only when loglevel is DEBUG
2221 if self.logger.getEffectiveLevel() == logging.DEBUG:
2222 db_ro_task_update_log = db_ro_task_update.copy()
2223 db_ro_task_update_log["_id"] = q_filter["_id"]
2224 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2225 """
2226
2227 if not self.db.set_one(
2228 "ro_tasks",
2229 update_dict=db_ro_task_update,
2230 q_filter=q_filter,
2231 fail_on_empty=False,
2232 ):
2233 del db_ro_task_update["to_check_at"]
2234 del q_filter["to_check_at"]
2235 """
2236 # Log RO tasks only when loglevel is DEBUG
2237 if self.logger.getEffectiveLevel() == logging.DEBUG:
2238 self._log_ro_task(
2239 None,
2240 db_ro_task_update_log,
2241 None,
2242 "TASK_WF",
2243 "SET_TASK " + str(q_filter),
2244 )
2245 """
2246 self.db.set_one(
2247 "ro_tasks",
2248 q_filter=q_filter,
2249 update_dict=db_ro_task_update,
2250 fail_on_empty=True,
2251 )
2252 except DbException as e:
2253 self.logger.error(
2254 "ro_task={} Error updating database {}".format(ro_task_id, e)
2255 )
2256 except Exception as e:
2257 self.logger.error(
2258 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2259 )
2260
2261 def _update_target(self, task, ro_vim_item_update):
2262 table, _, temp = task["target_record"].partition(":")
2263 _id, _, path_vim_status = temp.partition(":")
2264 path_item = path_vim_status[: path_vim_status.rfind(".")]
2265 path_item = path_item[: path_item.rfind(".")]
2266 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2267 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2268
2269 if ro_vim_item_update:
2270 update_dict = {
2271 path_vim_status + "." + k: v
2272 for k, v in ro_vim_item_update.items()
2273 if k
2274 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2275 }
2276
2277 if path_vim_status.startswith("vdur."):
2278 # for backward compatibility, add vdur.name apart from vdur.vim_name
2279 if ro_vim_item_update.get("vim_name"):
2280 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2281
2282 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2283 if ro_vim_item_update.get("vim_id"):
2284 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2285
2286 # update general status
2287 if ro_vim_item_update.get("vim_status"):
2288 update_dict[path_item + ".status"] = ro_vim_item_update[
2289 "vim_status"
2290 ]
2291
2292 if ro_vim_item_update.get("interfaces"):
2293 path_interfaces = path_item + ".interfaces"
2294
2295 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2296 if iface:
2297 update_dict.update(
2298 {
2299 path_interfaces + ".{}.".format(i) + k: v
2300 for k, v in iface.items()
2301 if k in ("vlan", "compute_node", "pci")
2302 }
2303 )
2304
2305 # put ip_address and mac_address with ip-address and mac-address
2306 if iface.get("ip_address"):
2307 update_dict[
2308 path_interfaces + ".{}.".format(i) + "ip-address"
2309 ] = iface["ip_address"]
2310
2311 if iface.get("mac_address"):
2312 update_dict[
2313 path_interfaces + ".{}.".format(i) + "mac-address"
2314 ] = iface["mac_address"]
2315
2316 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2317 update_dict["ip-address"] = iface.get("ip_address").split(
2318 ";"
2319 )[0]
2320
2321 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2322 update_dict[path_item + ".ip-address"] = iface.get(
2323 "ip_address"
2324 ).split(";")[0]
2325
2326 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2327 else:
2328 update_dict = {path_item + ".status": "DELETED"}
2329 self.db.set_one(
2330 table,
2331 q_filter={"_id": _id},
2332 update_dict=update_dict,
2333 unset={path_vim_status: None},
2334 )
2335
2336 def _process_delete_db_tasks(self):
2337 """
2338 Delete task from database because vnfrs or nsrs or both have been deleted
2339 :return: None. Uses and modify self.tasks_to_delete
2340 """
2341 while self.tasks_to_delete:
2342 task = self.tasks_to_delete[0]
2343 vnfrs_deleted = None
2344 nsr_id = task["nsr_id"]
2345
2346 if task["target_record"].startswith("vnfrs:"):
2347 # check if nsrs is present
2348 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2349 vnfrs_deleted = task["target_record"].split(":")[1]
2350
2351 try:
2352 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2353 except Exception as e:
2354 self.logger.error(
2355 "Error deleting task={}: {}".format(task["task_id"], e)
2356 )
2357 self.tasks_to_delete.pop(0)
2358
2359 @staticmethod
2360 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2361 """
2362 Static method because it is called from osm_ng_ro.ns
2363 :param db: instance of database to use
2364 :param nsr_id: affected nsrs id
2365 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2366 :return: None, exception is fails
2367 """
2368 retries = 5
2369 for retry in range(retries):
2370 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2371 now = time.time()
2372 conflict = False
2373
2374 for ro_task in ro_tasks:
2375 db_update = {}
2376 to_delete_ro_task = True
2377
2378 for index, task in enumerate(ro_task["tasks"]):
2379 if not task:
2380 pass
2381 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2382 vnfrs_deleted
2383 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2384 ):
2385 db_update["tasks.{}".format(index)] = None
2386 else:
2387 # used by other nsr, ro_task cannot be deleted
2388 to_delete_ro_task = False
2389
2390 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2391 if to_delete_ro_task:
2392 if not db.del_one(
2393 "ro_tasks",
2394 q_filter={
2395 "_id": ro_task["_id"],
2396 "modified_at": ro_task["modified_at"],
2397 },
2398 fail_on_empty=False,
2399 ):
2400 conflict = True
2401 elif db_update:
2402 db_update["modified_at"] = now
2403 if not db.set_one(
2404 "ro_tasks",
2405 q_filter={
2406 "_id": ro_task["_id"],
2407 "modified_at": ro_task["modified_at"],
2408 },
2409 update_dict=db_update,
2410 fail_on_empty=False,
2411 ):
2412 conflict = True
2413 if not conflict:
2414 return
2415 else:
2416 raise NsWorkerException("Exceeded {} retries".format(retries))
2417
2418 def run(self):
2419 # load database
2420 self.logger.info("Starting")
2421 while True:
2422 # step 1: get commands from queue
2423 try:
2424 if self.vim_targets:
2425 task = self.task_queue.get(block=False)
2426 else:
2427 if not self.idle:
2428 self.logger.debug("enters in idle state")
2429 self.idle = True
2430 task = self.task_queue.get(block=True)
2431 self.idle = False
2432
2433 if task[0] == "terminate":
2434 break
2435 elif task[0] == "load_vim":
2436 self.logger.info("order to load vim {}".format(task[1]))
2437 self._load_vim(task[1])
2438 elif task[0] == "unload_vim":
2439 self.logger.info("order to unload vim {}".format(task[1]))
2440 self._unload_vim(task[1])
2441 elif task[0] == "reload_vim":
2442 self._reload_vim(task[1])
2443 elif task[0] == "check_vim":
2444 self.logger.info("order to check vim {}".format(task[1]))
2445 self._check_vim(task[1])
2446 continue
2447 except Exception as e:
2448 if isinstance(e, queue.Empty):
2449 pass
2450 else:
2451 self.logger.critical(
2452 "Error processing task: {}".format(e), exc_info=True
2453 )
2454
2455 # step 2: process pending_tasks, delete not needed tasks
2456 try:
2457 if self.tasks_to_delete:
2458 self._process_delete_db_tasks()
2459 busy = False
2460 """
2461 # Log RO tasks only when loglevel is DEBUG
2462 if self.logger.getEffectiveLevel() == logging.DEBUG:
2463 _ = self._get_db_all_tasks()
2464 """
2465 ro_task = self._get_db_task()
2466 if ro_task:
2467 self._process_pending_tasks(ro_task)
2468 busy = True
2469 if not busy:
2470 time.sleep(5)
2471 except Exception as e:
2472 self.logger.critical(
2473 "Unexpected exception at run: " + str(e), exc_info=True
2474 )
2475
2476 self.logger.info("Finishing")