Feature 10906: Support for Anti-Affinity groups
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 }
215 self.logger.debug(
216 "task={} {} new-net={} created={}".format(
217 task_id, ro_task["target_id"], vim_net_id, created
218 )
219 )
220
221 return "BUILD", ro_vim_item_update
222 except (vimconn.VimConnException, NsWorkerException) as e:
223 self.logger.error(
224 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
225 )
226 ro_vim_item_update = {
227 "vim_status": "VIM_ERROR",
228 "created": created,
229 "vim_details": str(e),
230 }
231
232 return "FAILED", ro_vim_item_update
233
234 def refresh(self, ro_task):
235 """Call VIM to get network status"""
236 ro_task_id = ro_task["_id"]
237 target_vim = self.my_vims[ro_task["target_id"]]
238 vim_id = ro_task["vim_info"]["vim_id"]
239 net_to_refresh_list = [vim_id]
240
241 try:
242 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
243 vim_info = vim_dict[vim_id]
244
245 if vim_info["status"] == "ACTIVE":
246 task_status = "DONE"
247 elif vim_info["status"] == "BUILD":
248 task_status = "BUILD"
249 else:
250 task_status = "FAILED"
251 except vimconn.VimConnException as e:
252 # Mark all tasks at VIM_ERROR status
253 self.logger.error(
254 "ro_task={} vim={} get-net={}: {}".format(
255 ro_task_id, ro_task["target_id"], vim_id, e
256 )
257 )
258 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
259 task_status = "FAILED"
260
261 ro_vim_item_update = {}
262 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
263 ro_vim_item_update["vim_status"] = vim_info["status"]
264
265 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
266 ro_vim_item_update["vim_name"] = vim_info.get("name")
267
268 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
269 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
270 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
271 elif vim_info["status"] == "DELETED":
272 ro_vim_item_update["vim_id"] = None
273 ro_vim_item_update["vim_details"] = "Deleted externally"
274 else:
275 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
276 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
277
278 if ro_vim_item_update:
279 self.logger.debug(
280 "ro_task={} {} get-net={}: status={} {}".format(
281 ro_task_id,
282 ro_task["target_id"],
283 vim_id,
284 ro_vim_item_update.get("vim_status"),
285 ro_vim_item_update.get("vim_details")
286 if ro_vim_item_update.get("vim_status") != "ACTIVE"
287 else "",
288 )
289 )
290
291 return task_status, ro_vim_item_update
292
293 def delete(self, ro_task, task_index):
294 task = ro_task["tasks"][task_index]
295 task_id = task["task_id"]
296 net_vim_id = ro_task["vim_info"]["vim_id"]
297 ro_vim_item_update_ok = {
298 "vim_status": "DELETED",
299 "created": False,
300 "vim_details": "DELETED",
301 "vim_id": None,
302 }
303
304 try:
305 if net_vim_id or ro_task["vim_info"]["created_items"]:
306 target_vim = self.my_vims[ro_task["target_id"]]
307 target_vim.delete_network(
308 net_vim_id, ro_task["vim_info"]["created_items"]
309 )
310 except vimconn.VimConnNotFoundException:
311 ro_vim_item_update_ok["vim_details"] = "already deleted"
312 except vimconn.VimConnException as e:
313 self.logger.error(
314 "ro_task={} vim={} del-net={}: {}".format(
315 ro_task["_id"], ro_task["target_id"], net_vim_id, e
316 )
317 )
318 ro_vim_item_update = {
319 "vim_status": "VIM_ERROR",
320 "vim_details": "Error while deleting: {}".format(e),
321 }
322
323 return "FAILED", ro_vim_item_update
324
325 self.logger.debug(
326 "task={} {} del-net={} {}".format(
327 task_id,
328 ro_task["target_id"],
329 net_vim_id,
330 ro_vim_item_update_ok.get("vim_details", ""),
331 )
332 )
333
334 return "DONE", ro_vim_item_update_ok
335
336
337 class VimInteractionVdu(VimInteractionBase):
338 max_retries_inject_ssh_key = 20 # 20 times
339 time_retries_inject_ssh_key = 30 # wevery 30 seconds
340
341 def new(self, ro_task, task_index, task_depends):
342 task = ro_task["tasks"][task_index]
343 task_id = task["task_id"]
344 created = False
345 created_items = {}
346 target_vim = self.my_vims[ro_task["target_id"]]
347
348 try:
349 created = True
350 params = task["params"]
351 params_copy = deepcopy(params)
352 net_list = params_copy["net_list"]
353
354 for net in net_list:
355 # change task_id into network_id
356 if "net_id" in net and net["net_id"].startswith("TASK-"):
357 network_id = task_depends[net["net_id"]]
358
359 if not network_id:
360 raise NsWorkerException(
361 "Cannot create VM because depends on a network not created or found "
362 "for {}".format(net["net_id"])
363 )
364
365 net["net_id"] = network_id
366
367 if params_copy["image_id"].startswith("TASK-"):
368 params_copy["image_id"] = task_depends[params_copy["image_id"]]
369
370 if params_copy["flavor_id"].startswith("TASK-"):
371 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
372
373 affinity_group_list = params_copy["affinity_group_list"]
374 for affinity_group in affinity_group_list:
375 # change task_id into affinity_group_id
376 if "affinity_group_id" in affinity_group and affinity_group[
377 "affinity_group_id"
378 ].startswith("TASK-"):
379 affinity_group_id = task_depends[
380 affinity_group["affinity_group_id"]
381 ]
382
383 if not affinity_group_id:
384 raise NsWorkerException(
385 "found for {}".format(affinity_group["affinity_group_id"])
386 )
387
388 affinity_group["affinity_group_id"] = affinity_group_id
389
390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
392
393 ro_vim_item_update = {
394 "vim_id": vim_vm_id,
395 "vim_status": "BUILD",
396 "created": created,
397 "created_items": created_items,
398 "vim_details": None,
399 "interfaces_vim_ids": interfaces,
400 "interfaces": [],
401 }
402 self.logger.debug(
403 "task={} {} new-vm={} created={}".format(
404 task_id, ro_task["target_id"], vim_vm_id, created
405 )
406 )
407
408 return "BUILD", ro_vim_item_update
409 except (vimconn.VimConnException, NsWorkerException) as e:
410 self.logger.error(
411 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
412 )
413 ro_vim_item_update = {
414 "vim_status": "VIM_ERROR",
415 "created": created,
416 "vim_details": str(e),
417 }
418
419 return "FAILED", ro_vim_item_update
420
421 def delete(self, ro_task, task_index):
422 task = ro_task["tasks"][task_index]
423 task_id = task["task_id"]
424 vm_vim_id = ro_task["vim_info"]["vim_id"]
425 ro_vim_item_update_ok = {
426 "vim_status": "DELETED",
427 "created": False,
428 "vim_details": "DELETED",
429 "vim_id": None,
430 }
431
432 try:
433 if vm_vim_id or ro_task["vim_info"]["created_items"]:
434 target_vim = self.my_vims[ro_task["target_id"]]
435 target_vim.delete_vminstance(
436 vm_vim_id, ro_task["vim_info"]["created_items"]
437 )
438 except vimconn.VimConnNotFoundException:
439 ro_vim_item_update_ok["vim_details"] = "already deleted"
440 except vimconn.VimConnException as e:
441 self.logger.error(
442 "ro_task={} vim={} del-vm={}: {}".format(
443 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
444 )
445 )
446 ro_vim_item_update = {
447 "vim_status": "VIM_ERROR",
448 "vim_details": "Error while deleting: {}".format(e),
449 }
450
451 return "FAILED", ro_vim_item_update
452
453 self.logger.debug(
454 "task={} {} del-vm={} {}".format(
455 task_id,
456 ro_task["target_id"],
457 vm_vim_id,
458 ro_vim_item_update_ok.get("vim_details", ""),
459 )
460 )
461
462 return "DONE", ro_vim_item_update_ok
463
464 def refresh(self, ro_task):
465 """Call VIM to get vm status"""
466 ro_task_id = ro_task["_id"]
467 target_vim = self.my_vims[ro_task["target_id"]]
468 vim_id = ro_task["vim_info"]["vim_id"]
469
470 if not vim_id:
471 return None, None
472
473 vm_to_refresh_list = [vim_id]
474 try:
475 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
476 vim_info = vim_dict[vim_id]
477
478 if vim_info["status"] == "ACTIVE":
479 task_status = "DONE"
480 elif vim_info["status"] == "BUILD":
481 task_status = "BUILD"
482 else:
483 task_status = "FAILED"
484
485 # try to load and parse vim_information
486 try:
487 vim_info_info = yaml.safe_load(vim_info["vim_info"])
488 if vim_info_info.get("name"):
489 vim_info["name"] = vim_info_info["name"]
490 except Exception:
491 pass
492 except vimconn.VimConnException as e:
493 # Mark all tasks at VIM_ERROR status
494 self.logger.error(
495 "ro_task={} vim={} get-vm={}: {}".format(
496 ro_task_id, ro_task["target_id"], vim_id, e
497 )
498 )
499 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
500 task_status = "FAILED"
501
502 ro_vim_item_update = {}
503
504 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
505 vim_interfaces = []
506 if vim_info.get("interfaces"):
507 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
508 iface = next(
509 (
510 iface
511 for iface in vim_info["interfaces"]
512 if vim_iface_id == iface["vim_interface_id"]
513 ),
514 None,
515 )
516 # if iface:
517 # iface.pop("vim_info", None)
518 vim_interfaces.append(iface)
519
520 task_create = next(
521 t
522 for t in ro_task["tasks"]
523 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
524 )
525 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
526 vim_interfaces[task_create["mgmt_vnf_interface"]][
527 "mgmt_vnf_interface"
528 ] = True
529
530 mgmt_vdu_iface = task_create.get(
531 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
532 )
533 if vim_interfaces:
534 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
535
536 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
537 ro_vim_item_update["interfaces"] = vim_interfaces
538
539 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
540 ro_vim_item_update["vim_status"] = vim_info["status"]
541
542 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
543 ro_vim_item_update["vim_name"] = vim_info.get("name")
544
545 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
546 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
547 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
548 elif vim_info["status"] == "DELETED":
549 ro_vim_item_update["vim_id"] = None
550 ro_vim_item_update["vim_details"] = "Deleted externally"
551 else:
552 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
553 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
554
555 if ro_vim_item_update:
556 self.logger.debug(
557 "ro_task={} {} get-vm={}: status={} {}".format(
558 ro_task_id,
559 ro_task["target_id"],
560 vim_id,
561 ro_vim_item_update.get("vim_status"),
562 ro_vim_item_update.get("vim_details")
563 if ro_vim_item_update.get("vim_status") != "ACTIVE"
564 else "",
565 )
566 )
567
568 return task_status, ro_vim_item_update
569
570 def exec(self, ro_task, task_index, task_depends):
571 task = ro_task["tasks"][task_index]
572 task_id = task["task_id"]
573 target_vim = self.my_vims[ro_task["target_id"]]
574 db_task_update = {"retries": 0}
575 retries = task.get("retries", 0)
576
577 try:
578 params = task["params"]
579 params_copy = deepcopy(params)
580 params_copy["ro_key"] = self.db.decrypt(
581 params_copy.pop("private_key"),
582 params_copy.pop("schema_version"),
583 params_copy.pop("salt"),
584 )
585 params_copy["ip_addr"] = params_copy.pop("ip_address")
586 target_vim.inject_user_key(**params_copy)
587 self.logger.debug(
588 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
589 )
590
591 return (
592 "DONE",
593 None,
594 db_task_update,
595 ) # params_copy["key"]
596 except (vimconn.VimConnException, NsWorkerException) as e:
597 retries += 1
598
599 if retries < self.max_retries_inject_ssh_key:
600 return (
601 "BUILD",
602 None,
603 {
604 "retries": retries,
605 "next_retry": self.time_retries_inject_ssh_key,
606 },
607 )
608
609 self.logger.error(
610 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
611 )
612 ro_vim_item_update = {"vim_details": str(e)}
613
614 return "FAILED", ro_vim_item_update, db_task_update
615
616
617 class VimInteractionImage(VimInteractionBase):
618 def new(self, ro_task, task_index, task_depends):
619 task = ro_task["tasks"][task_index]
620 task_id = task["task_id"]
621 created = False
622 created_items = {}
623 target_vim = self.my_vims[ro_task["target_id"]]
624
625 try:
626 # FIND
627 if task.get("find_params"):
628 vim_images = target_vim.get_image_list(**task["find_params"])
629
630 if not vim_images:
631 raise NsWorkerExceptionNotFound(
632 "Image not found with this criteria: '{}'".format(
633 task["find_params"]
634 )
635 )
636 elif len(vim_images) > 1:
637 raise NsWorkerException(
638 "More than one image found with this criteria: '{}'".format(
639 task["find_params"]
640 )
641 )
642 else:
643 vim_image_id = vim_images[0]["id"]
644
645 ro_vim_item_update = {
646 "vim_id": vim_image_id,
647 "vim_status": "DONE",
648 "created": created,
649 "created_items": created_items,
650 "vim_details": None,
651 }
652 self.logger.debug(
653 "task={} {} new-image={} created={}".format(
654 task_id, ro_task["target_id"], vim_image_id, created
655 )
656 )
657
658 return "DONE", ro_vim_item_update
659 except (NsWorkerException, vimconn.VimConnException) as e:
660 self.logger.error(
661 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
662 )
663 ro_vim_item_update = {
664 "vim_status": "VIM_ERROR",
665 "created": created,
666 "vim_details": str(e),
667 }
668
669 return "FAILED", ro_vim_item_update
670
671
672 class VimInteractionFlavor(VimInteractionBase):
673 def delete(self, ro_task, task_index):
674 task = ro_task["tasks"][task_index]
675 task_id = task["task_id"]
676 flavor_vim_id = ro_task["vim_info"]["vim_id"]
677 ro_vim_item_update_ok = {
678 "vim_status": "DELETED",
679 "created": False,
680 "vim_details": "DELETED",
681 "vim_id": None,
682 }
683
684 try:
685 if flavor_vim_id:
686 target_vim = self.my_vims[ro_task["target_id"]]
687 target_vim.delete_flavor(flavor_vim_id)
688 except vimconn.VimConnNotFoundException:
689 ro_vim_item_update_ok["vim_details"] = "already deleted"
690 except vimconn.VimConnException as e:
691 self.logger.error(
692 "ro_task={} vim={} del-flavor={}: {}".format(
693 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
694 )
695 )
696 ro_vim_item_update = {
697 "vim_status": "VIM_ERROR",
698 "vim_details": "Error while deleting: {}".format(e),
699 }
700
701 return "FAILED", ro_vim_item_update
702
703 self.logger.debug(
704 "task={} {} del-flavor={} {}".format(
705 task_id,
706 ro_task["target_id"],
707 flavor_vim_id,
708 ro_vim_item_update_ok.get("vim_details", ""),
709 )
710 )
711
712 return "DONE", ro_vim_item_update_ok
713
714 def new(self, ro_task, task_index, task_depends):
715 task = ro_task["tasks"][task_index]
716 task_id = task["task_id"]
717 created = False
718 created_items = {}
719 target_vim = self.my_vims[ro_task["target_id"]]
720
721 try:
722 # FIND
723 vim_flavor_id = None
724
725 if task.get("find_params"):
726 try:
727 flavor_data = task["find_params"]["flavor_data"]
728 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
729 except vimconn.VimConnNotFoundException:
730 pass
731
732 if not vim_flavor_id and task.get("params"):
733 # CREATE
734 flavor_data = task["params"]["flavor_data"]
735 vim_flavor_id = target_vim.new_flavor(flavor_data)
736 created = True
737
738 ro_vim_item_update = {
739 "vim_id": vim_flavor_id,
740 "vim_status": "DONE",
741 "created": created,
742 "created_items": created_items,
743 "vim_details": None,
744 }
745 self.logger.debug(
746 "task={} {} new-flavor={} created={}".format(
747 task_id, ro_task["target_id"], vim_flavor_id, created
748 )
749 )
750
751 return "DONE", ro_vim_item_update
752 except (vimconn.VimConnException, NsWorkerException) as e:
753 self.logger.error(
754 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
755 )
756 ro_vim_item_update = {
757 "vim_status": "VIM_ERROR",
758 "created": created,
759 "vim_details": str(e),
760 }
761
762 return "FAILED", ro_vim_item_update
763
764
765 class VimInteractionAffinityGroup(VimInteractionBase):
766 def delete(self, ro_task, task_index):
767 task = ro_task["tasks"][task_index]
768 task_id = task["task_id"]
769 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
770 ro_vim_item_update_ok = {
771 "vim_status": "DELETED",
772 "created": False,
773 "vim_details": "DELETED",
774 "vim_id": None,
775 }
776
777 try:
778 if affinity_group_vim_id:
779 target_vim = self.my_vims[ro_task["target_id"]]
780 target_vim.delete_affinity_group(affinity_group_vim_id)
781 except vimconn.VimConnNotFoundException:
782 ro_vim_item_update_ok["vim_details"] = "already deleted"
783 except vimconn.VimConnException as e:
784 self.logger.error(
785 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
786 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
787 )
788 )
789 ro_vim_item_update = {
790 "vim_status": "VIM_ERROR",
791 "vim_details": "Error while deleting: {}".format(e),
792 }
793
794 return "FAILED", ro_vim_item_update
795
796 self.logger.debug(
797 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
798 task_id,
799 ro_task["target_id"],
800 affinity_group_vim_id,
801 ro_vim_item_update_ok.get("vim_details", ""),
802 )
803 )
804
805 return "DONE", ro_vim_item_update_ok
806
807 def new(self, ro_task, task_index, task_depends):
808 task = ro_task["tasks"][task_index]
809 task_id = task["task_id"]
810 created = False
811 created_items = {}
812 target_vim = self.my_vims[ro_task["target_id"]]
813
814 try:
815 affinity_group_vim_id = None
816
817 if task.get("params"):
818 affinity_group_data = task["params"]["affinity_group_data"]
819 affinity_group_vim_id = target_vim.new_affinity_group(
820 affinity_group_data
821 )
822 created = True
823
824 ro_vim_item_update = {
825 "vim_id": affinity_group_vim_id,
826 "vim_status": "DONE",
827 "created": created,
828 "created_items": created_items,
829 "vim_details": None,
830 }
831 self.logger.debug(
832 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
833 task_id, ro_task["target_id"], affinity_group_vim_id, created
834 )
835 )
836
837 return "DONE", ro_vim_item_update
838 except (vimconn.VimConnException, NsWorkerException) as e:
839 self.logger.error(
840 "task={} vim={} new-affinity-or-anti-affinity-group:"
841 " {}".format(task_id, ro_task["target_id"], e)
842 )
843 ro_vim_item_update = {
844 "vim_status": "VIM_ERROR",
845 "created": created,
846 "vim_details": str(e),
847 }
848
849 return "FAILED", ro_vim_item_update
850
851
852 class VimInteractionSdnNet(VimInteractionBase):
853 @staticmethod
854 def _match_pci(port_pci, mapping):
855 """
856 Check if port_pci matches with mapping
857 mapping can have brackets to indicate that several chars are accepted. e.g
858 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
859 :param port_pci: text
860 :param mapping: text, can contain brackets to indicate several chars are available
861 :return: True if matches, False otherwise
862 """
863 if not port_pci or not mapping:
864 return False
865 if port_pci == mapping:
866 return True
867
868 mapping_index = 0
869 pci_index = 0
870 while True:
871 bracket_start = mapping.find("[", mapping_index)
872
873 if bracket_start == -1:
874 break
875
876 bracket_end = mapping.find("]", bracket_start)
877 if bracket_end == -1:
878 break
879
880 length = bracket_start - mapping_index
881 if (
882 length
883 and port_pci[pci_index : pci_index + length]
884 != mapping[mapping_index:bracket_start]
885 ):
886 return False
887
888 if (
889 port_pci[pci_index + length]
890 not in mapping[bracket_start + 1 : bracket_end]
891 ):
892 return False
893
894 pci_index += length + 1
895 mapping_index = bracket_end + 1
896
897 if port_pci[pci_index:] != mapping[mapping_index:]:
898 return False
899
900 return True
901
902 def _get_interfaces(self, vlds_to_connect, vim_account_id):
903 """
904 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
905 :param vim_account_id:
906 :return:
907 """
908 interfaces = []
909
910 for vld in vlds_to_connect:
911 table, _, db_id = vld.partition(":")
912 db_id, _, vld = db_id.partition(":")
913 _, _, vld_id = vld.partition(".")
914
915 if table == "vnfrs":
916 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
917 iface_key = "vnf-vld-id"
918 else: # table == "nsrs"
919 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
920 iface_key = "ns-vld-id"
921
922 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
923
924 for db_vnfr in db_vnfrs:
925 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
926 for iface_index, interface in enumerate(vdur["interfaces"]):
927 if interface.get(iface_key) == vld_id and interface.get(
928 "type"
929 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
930 # only SR-IOV o PT
931 interface_ = interface.copy()
932 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
933 db_vnfr["_id"], vdu_index, iface_index
934 )
935
936 if vdur.get("status") == "ERROR":
937 interface_["status"] = "ERROR"
938
939 interfaces.append(interface_)
940
941 return interfaces
942
943 def refresh(self, ro_task):
944 # look for task create
945 task_create_index, _ = next(
946 i_t
947 for i_t in enumerate(ro_task["tasks"])
948 if i_t[1]
949 and i_t[1]["action"] == "CREATE"
950 and i_t[1]["status"] != "FINISHED"
951 )
952
953 return self.new(ro_task, task_create_index, None)
954
955 def new(self, ro_task, task_index, task_depends):
956
957 task = ro_task["tasks"][task_index]
958 task_id = task["task_id"]
959 target_vim = self.my_vims[ro_task["target_id"]]
960
961 sdn_net_id = ro_task["vim_info"]["vim_id"]
962
963 created_items = ro_task["vim_info"].get("created_items")
964 connected_ports = ro_task["vim_info"].get("connected_ports", [])
965 new_connected_ports = []
966 last_update = ro_task["vim_info"].get("last_update", 0)
967 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
968 error_list = []
969 created = ro_task["vim_info"].get("created", False)
970
971 try:
972 # CREATE
973 params = task["params"]
974 vlds_to_connect = params.get("vlds", [])
975 associated_vim = params.get("target_vim")
976 # external additional ports
977 additional_ports = params.get("sdn-ports") or ()
978 _, _, vim_account_id = (
979 (None, None, None)
980 if associated_vim is None
981 else associated_vim.partition(":")
982 )
983
984 if associated_vim:
985 # get associated VIM
986 if associated_vim not in self.db_vims:
987 self.db_vims[associated_vim] = self.db.get_one(
988 "vim_accounts", {"_id": vim_account_id}
989 )
990
991 db_vim = self.db_vims[associated_vim]
992
993 # look for ports to connect
994 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
995 # print(ports)
996
997 sdn_ports = []
998 pending_ports = error_ports = 0
999 vlan_used = None
1000 sdn_need_update = False
1001
1002 for port in ports:
1003 vlan_used = port.get("vlan") or vlan_used
1004
1005 # TODO. Do not connect if already done
1006 if not port.get("compute_node") or not port.get("pci"):
1007 if port.get("status") == "ERROR":
1008 error_ports += 1
1009 else:
1010 pending_ports += 1
1011 continue
1012
1013 pmap = None
1014 compute_node_mappings = next(
1015 (
1016 c
1017 for c in db_vim["config"].get("sdn-port-mapping", ())
1018 if c and c["compute_node"] == port["compute_node"]
1019 ),
1020 None,
1021 )
1022
1023 if compute_node_mappings:
1024 # process port_mapping pci of type 0000:af:1[01].[1357]
1025 pmap = next(
1026 (
1027 p
1028 for p in compute_node_mappings["ports"]
1029 if self._match_pci(port["pci"], p.get("pci"))
1030 ),
1031 None,
1032 )
1033
1034 if not pmap:
1035 if not db_vim["config"].get("mapping_not_needed"):
1036 error_list.append(
1037 "Port mapping not found for compute_node={} pci={}".format(
1038 port["compute_node"], port["pci"]
1039 )
1040 )
1041 continue
1042
1043 pmap = {}
1044
1045 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1046 new_port = {
1047 "service_endpoint_id": pmap.get("service_endpoint_id")
1048 or service_endpoint_id,
1049 "service_endpoint_encapsulation_type": "dot1q"
1050 if port["type"] == "SR-IOV"
1051 else None,
1052 "service_endpoint_encapsulation_info": {
1053 "vlan": port.get("vlan"),
1054 "mac": port.get("mac-address"),
1055 "device_id": pmap.get("device_id") or port["compute_node"],
1056 "device_interface_id": pmap.get("device_interface_id")
1057 or port["pci"],
1058 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1059 "switch_port": pmap.get("switch_port"),
1060 "service_mapping_info": pmap.get("service_mapping_info"),
1061 },
1062 }
1063
1064 # TODO
1065 # if port["modified_at"] > last_update:
1066 # sdn_need_update = True
1067 new_connected_ports.append(port["id"]) # TODO
1068 sdn_ports.append(new_port)
1069
1070 if error_ports:
1071 error_list.append(
1072 "{} interfaces have not been created as VDU is on ERROR status".format(
1073 error_ports
1074 )
1075 )
1076
1077 # connect external ports
1078 for index, additional_port in enumerate(additional_ports):
1079 additional_port_id = additional_port.get(
1080 "service_endpoint_id"
1081 ) or "external-{}".format(index)
1082 sdn_ports.append(
1083 {
1084 "service_endpoint_id": additional_port_id,
1085 "service_endpoint_encapsulation_type": additional_port.get(
1086 "service_endpoint_encapsulation_type", "dot1q"
1087 ),
1088 "service_endpoint_encapsulation_info": {
1089 "vlan": additional_port.get("vlan") or vlan_used,
1090 "mac": additional_port.get("mac_address"),
1091 "device_id": additional_port.get("device_id"),
1092 "device_interface_id": additional_port.get(
1093 "device_interface_id"
1094 ),
1095 "switch_dpid": additional_port.get("switch_dpid")
1096 or additional_port.get("switch_id"),
1097 "switch_port": additional_port.get("switch_port"),
1098 "service_mapping_info": additional_port.get(
1099 "service_mapping_info"
1100 ),
1101 },
1102 }
1103 )
1104 new_connected_ports.append(additional_port_id)
1105 sdn_info = ""
1106
1107 # if there are more ports to connect or they have been modified, call create/update
1108 if error_list:
1109 sdn_status = "ERROR"
1110 sdn_info = "; ".join(error_list)
1111 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1112 last_update = time.time()
1113
1114 if not sdn_net_id:
1115 if len(sdn_ports) < 2:
1116 sdn_status = "ACTIVE"
1117
1118 if not pending_ports:
1119 self.logger.debug(
1120 "task={} {} new-sdn-net done, less than 2 ports".format(
1121 task_id, ro_task["target_id"]
1122 )
1123 )
1124 else:
1125 net_type = params.get("type") or "ELAN"
1126 (
1127 sdn_net_id,
1128 created_items,
1129 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1130 created = True
1131 self.logger.debug(
1132 "task={} {} new-sdn-net={} created={}".format(
1133 task_id, ro_task["target_id"], sdn_net_id, created
1134 )
1135 )
1136 else:
1137 created_items = target_vim.edit_connectivity_service(
1138 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1139 )
1140 created = True
1141 self.logger.debug(
1142 "task={} {} update-sdn-net={} created={}".format(
1143 task_id, ro_task["target_id"], sdn_net_id, created
1144 )
1145 )
1146
1147 connected_ports = new_connected_ports
1148 elif sdn_net_id:
1149 wim_status_dict = target_vim.get_connectivity_service_status(
1150 sdn_net_id, conn_info=created_items
1151 )
1152 sdn_status = wim_status_dict["sdn_status"]
1153
1154 if wim_status_dict.get("sdn_info"):
1155 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1156
1157 if wim_status_dict.get("error_msg"):
1158 sdn_info = wim_status_dict.get("error_msg") or ""
1159
1160 if pending_ports:
1161 if sdn_status != "ERROR":
1162 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1163 len(ports) - pending_ports, len(ports)
1164 )
1165
1166 if sdn_status == "ACTIVE":
1167 sdn_status = "BUILD"
1168
1169 ro_vim_item_update = {
1170 "vim_id": sdn_net_id,
1171 "vim_status": sdn_status,
1172 "created": created,
1173 "created_items": created_items,
1174 "connected_ports": connected_ports,
1175 "vim_details": sdn_info,
1176 "last_update": last_update,
1177 }
1178
1179 return sdn_status, ro_vim_item_update
1180 except Exception as e:
1181 self.logger.error(
1182 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1183 exc_info=not isinstance(
1184 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1185 ),
1186 )
1187 ro_vim_item_update = {
1188 "vim_status": "VIM_ERROR",
1189 "created": created,
1190 "vim_details": str(e),
1191 }
1192
1193 return "FAILED", ro_vim_item_update
1194
1195 def delete(self, ro_task, task_index):
1196 task = ro_task["tasks"][task_index]
1197 task_id = task["task_id"]
1198 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1199 ro_vim_item_update_ok = {
1200 "vim_status": "DELETED",
1201 "created": False,
1202 "vim_details": "DELETED",
1203 "vim_id": None,
1204 }
1205
1206 try:
1207 if sdn_vim_id:
1208 target_vim = self.my_vims[ro_task["target_id"]]
1209 target_vim.delete_connectivity_service(
1210 sdn_vim_id, ro_task["vim_info"].get("created_items")
1211 )
1212
1213 except Exception as e:
1214 if (
1215 isinstance(e, sdnconn.SdnConnectorError)
1216 and e.http_code == HTTPStatus.NOT_FOUND.value
1217 ):
1218 ro_vim_item_update_ok["vim_details"] = "already deleted"
1219 else:
1220 self.logger.error(
1221 "ro_task={} vim={} del-sdn-net={}: {}".format(
1222 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1223 ),
1224 exc_info=not isinstance(
1225 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1226 ),
1227 )
1228 ro_vim_item_update = {
1229 "vim_status": "VIM_ERROR",
1230 "vim_details": "Error while deleting: {}".format(e),
1231 }
1232
1233 return "FAILED", ro_vim_item_update
1234
1235 self.logger.debug(
1236 "task={} {} del-sdn-net={} {}".format(
1237 task_id,
1238 ro_task["target_id"],
1239 sdn_vim_id,
1240 ro_vim_item_update_ok.get("vim_details", ""),
1241 )
1242 )
1243
1244 return "DONE", ro_vim_item_update_ok
1245
1246
1247 class NsWorker(threading.Thread):
1248 REFRESH_BUILD = 5 # 5 seconds
1249 REFRESH_ACTIVE = 60 # 1 minute
1250 REFRESH_ERROR = 600
1251 REFRESH_IMAGE = 3600 * 10
1252 REFRESH_DELETE = 3600 * 10
1253 QUEUE_SIZE = 100
1254 terminate = False
1255
1256 def __init__(self, worker_index, config, plugins, db):
1257 """
1258
1259 :param worker_index: thread index
1260 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1261 :param plugins: global shared dict with the loaded plugins
1262 :param db: database class instance to use
1263 """
1264 threading.Thread.__init__(self)
1265 self.config = config
1266 self.plugins = plugins
1267 self.plugin_name = "unknown"
1268 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1269 self.worker_index = worker_index
1270 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1271 # targetvim: vimplugin class
1272 self.my_vims = {}
1273 # targetvim: vim information from database
1274 self.db_vims = {}
1275 # targetvim list
1276 self.vim_targets = []
1277 self.my_id = config["process_id"] + ":" + str(worker_index)
1278 self.db = db
1279 self.item2class = {
1280 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1281 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1282 "image": VimInteractionImage(
1283 self.db, self.my_vims, self.db_vims, self.logger
1284 ),
1285 "flavor": VimInteractionFlavor(
1286 self.db, self.my_vims, self.db_vims, self.logger
1287 ),
1288 "sdn_net": VimInteractionSdnNet(
1289 self.db, self.my_vims, self.db_vims, self.logger
1290 ),
1291 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1292 self.db, self.my_vims, self.db_vims, self.logger
1293 ),
1294 }
1295 self.time_last_task_processed = None
1296 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1297 self.tasks_to_delete = []
1298 # it is idle when there are not vim_targets associated
1299 self.idle = True
1300 self.task_locked_time = config["global"]["task_locked_time"]
1301
1302 def insert_task(self, task):
1303 try:
1304 self.task_queue.put(task, False)
1305 return None
1306 except queue.Full:
1307 raise NsWorkerException("timeout inserting a task")
1308
1309 def terminate(self):
1310 self.insert_task("exit")
1311
1312 def del_task(self, task):
1313 with self.task_lock:
1314 if task["status"] == "SCHEDULED":
1315 task["status"] = "SUPERSEDED"
1316 return True
1317 else: # task["status"] == "processing"
1318 self.task_lock.release()
1319 return False
1320
1321 def _process_vim_config(self, target_id, db_vim):
1322 """
1323 Process vim config, creating vim configuration files as ca_cert
1324 :param target_id: vim/sdn/wim + id
1325 :param db_vim: Vim dictionary obtained from database
1326 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1327 """
1328 if not db_vim.get("config"):
1329 return
1330
1331 file_name = ""
1332
1333 try:
1334 if db_vim["config"].get("ca_cert_content"):
1335 file_name = "{}:{}".format(target_id, self.worker_index)
1336
1337 try:
1338 mkdir(file_name)
1339 except FileExistsError:
1340 pass
1341
1342 file_name = file_name + "/ca_cert"
1343
1344 with open(file_name, "w") as f:
1345 f.write(db_vim["config"]["ca_cert_content"])
1346 del db_vim["config"]["ca_cert_content"]
1347 db_vim["config"]["ca_cert"] = file_name
1348 except Exception as e:
1349 raise NsWorkerException(
1350 "Error writing to file '{}': {}".format(file_name, e)
1351 )
1352
1353 def _load_plugin(self, name, type="vim"):
1354 # type can be vim or sdn
1355 if "rovim_dummy" not in self.plugins:
1356 self.plugins["rovim_dummy"] = VimDummyConnector
1357
1358 if "rosdn_dummy" not in self.plugins:
1359 self.plugins["rosdn_dummy"] = SdnDummyConnector
1360
1361 if name in self.plugins:
1362 return self.plugins[name]
1363
1364 try:
1365 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1366 self.plugins[name] = ep.load()
1367 except Exception as e:
1368 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1369
1370 if name and name not in self.plugins:
1371 raise NsWorkerException(
1372 "Plugin 'osm_{n}' has not been installed".format(n=name)
1373 )
1374
1375 return self.plugins[name]
1376
1377 def _unload_vim(self, target_id):
1378 """
1379 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1380 :param target_id: Contains type:_id; where type can be 'vim', ...
1381 :return: None.
1382 """
1383 try:
1384 self.db_vims.pop(target_id, None)
1385 self.my_vims.pop(target_id, None)
1386
1387 if target_id in self.vim_targets:
1388 self.vim_targets.remove(target_id)
1389
1390 self.logger.info("Unloaded {}".format(target_id))
1391 rmtree("{}:{}".format(target_id, self.worker_index))
1392 except FileNotFoundError:
1393 pass # this is raised by rmtree if folder does not exist
1394 except Exception as e:
1395 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1396
1397 def _check_vim(self, target_id):
1398 """
1399 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1400 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1401 :return: None.
1402 """
1403 target, _, _id = target_id.partition(":")
1404 now = time.time()
1405 update_dict = {}
1406 unset_dict = {}
1407 op_text = ""
1408 step = ""
1409 loaded = target_id in self.vim_targets
1410 target_database = (
1411 "vim_accounts"
1412 if target == "vim"
1413 else "wim_accounts"
1414 if target == "wim"
1415 else "sdns"
1416 )
1417
1418 try:
1419 step = "Getting {} from db".format(target_id)
1420 db_vim = self.db.get_one(target_database, {"_id": _id})
1421
1422 for op_index, operation in enumerate(
1423 db_vim["_admin"].get("operations", ())
1424 ):
1425 if operation["operationState"] != "PROCESSING":
1426 continue
1427
1428 locked_at = operation.get("locked_at")
1429
1430 if locked_at is not None and locked_at >= now - self.task_locked_time:
1431 # some other thread is doing this operation
1432 return
1433
1434 # lock
1435 op_text = "_admin.operations.{}.".format(op_index)
1436
1437 if not self.db.set_one(
1438 target_database,
1439 q_filter={
1440 "_id": _id,
1441 op_text + "operationState": "PROCESSING",
1442 op_text + "locked_at": locked_at,
1443 },
1444 update_dict={
1445 op_text + "locked_at": now,
1446 "admin.current_operation": op_index,
1447 },
1448 fail_on_empty=False,
1449 ):
1450 return
1451
1452 unset_dict[op_text + "locked_at"] = None
1453 unset_dict["current_operation"] = None
1454 step = "Loading " + target_id
1455 error_text = self._load_vim(target_id)
1456
1457 if not error_text:
1458 step = "Checking connectivity"
1459
1460 if target == "vim":
1461 self.my_vims[target_id].check_vim_connectivity()
1462 else:
1463 self.my_vims[target_id].check_credentials()
1464
1465 update_dict["_admin.operationalState"] = "ENABLED"
1466 update_dict["_admin.detailed-status"] = ""
1467 unset_dict[op_text + "detailed-status"] = None
1468 update_dict[op_text + "operationState"] = "COMPLETED"
1469
1470 return
1471
1472 except Exception as e:
1473 error_text = "{}: {}".format(step, e)
1474 self.logger.error("{} for {}: {}".format(step, target_id, e))
1475
1476 finally:
1477 if update_dict or unset_dict:
1478 if error_text:
1479 update_dict[op_text + "operationState"] = "FAILED"
1480 update_dict[op_text + "detailed-status"] = error_text
1481 unset_dict.pop(op_text + "detailed-status", None)
1482 update_dict["_admin.operationalState"] = "ERROR"
1483 update_dict["_admin.detailed-status"] = error_text
1484
1485 if op_text:
1486 update_dict[op_text + "statusEnteredTime"] = now
1487
1488 self.db.set_one(
1489 target_database,
1490 q_filter={"_id": _id},
1491 update_dict=update_dict,
1492 unset=unset_dict,
1493 fail_on_empty=False,
1494 )
1495
1496 if not loaded:
1497 self._unload_vim(target_id)
1498
1499 def _reload_vim(self, target_id):
1500 if target_id in self.vim_targets:
1501 self._load_vim(target_id)
1502 else:
1503 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1504 # just remove it to force load again next time it is needed
1505 self.db_vims.pop(target_id, None)
1506
1507 def _load_vim(self, target_id):
1508 """
1509 Load or reload a vim_account, sdn_controller or wim_account.
1510 Read content from database, load the plugin if not loaded.
1511 In case of error loading the plugin, it load a failing VIM_connector
1512 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1513 :param target_id: Contains type:_id; where type can be 'vim', ...
1514 :return: None if ok, descriptive text if error
1515 """
1516 target, _, _id = target_id.partition(":")
1517 target_database = (
1518 "vim_accounts"
1519 if target == "vim"
1520 else "wim_accounts"
1521 if target == "wim"
1522 else "sdns"
1523 )
1524 plugin_name = ""
1525 vim = None
1526
1527 try:
1528 step = "Getting {}={} from db".format(target, _id)
1529 # TODO process for wim, sdnc, ...
1530 vim = self.db.get_one(target_database, {"_id": _id})
1531
1532 # if deep_get(vim, "config", "sdn-controller"):
1533 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1534 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1535
1536 step = "Decrypting password"
1537 schema_version = vim.get("schema_version")
1538 self.db.encrypt_decrypt_fields(
1539 vim,
1540 "decrypt",
1541 fields=("password", "secret"),
1542 schema_version=schema_version,
1543 salt=_id,
1544 )
1545 self._process_vim_config(target_id, vim)
1546
1547 if target == "vim":
1548 plugin_name = "rovim_" + vim["vim_type"]
1549 step = "Loading plugin '{}'".format(plugin_name)
1550 vim_module_conn = self._load_plugin(plugin_name)
1551 step = "Loading {}'".format(target_id)
1552 self.my_vims[target_id] = vim_module_conn(
1553 uuid=vim["_id"],
1554 name=vim["name"],
1555 tenant_id=vim.get("vim_tenant_id"),
1556 tenant_name=vim.get("vim_tenant_name"),
1557 url=vim["vim_url"],
1558 url_admin=None,
1559 user=vim["vim_user"],
1560 passwd=vim["vim_password"],
1561 config=vim.get("config") or {},
1562 persistent_info={},
1563 )
1564 else: # sdn
1565 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1566 step = "Loading plugin '{}'".format(plugin_name)
1567 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1568 step = "Loading {}'".format(target_id)
1569 wim = deepcopy(vim)
1570 wim_config = wim.pop("config", {}) or {}
1571 wim["uuid"] = wim["_id"]
1572 if "url" in wim and "wim_url" not in wim:
1573 wim["wim_url"] = wim["url"]
1574 elif "url" not in wim and "wim_url" in wim:
1575 wim["url"] = wim["wim_url"]
1576
1577 if wim.get("dpid"):
1578 wim_config["dpid"] = wim.pop("dpid")
1579
1580 if wim.get("switch_id"):
1581 wim_config["switch_id"] = wim.pop("switch_id")
1582
1583 # wim, wim_account, config
1584 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1585 self.db_vims[target_id] = vim
1586 self.error_status = None
1587
1588 self.logger.info(
1589 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1590 )
1591 except Exception as e:
1592 self.logger.error(
1593 "Cannot load {} plugin={}: {} {}".format(
1594 target_id, plugin_name, step, e
1595 )
1596 )
1597
1598 self.db_vims[target_id] = vim or {}
1599 self.db_vims[target_id] = FailingConnector(str(e))
1600 error_status = "{} Error: {}".format(step, e)
1601
1602 return error_status
1603 finally:
1604 if target_id not in self.vim_targets:
1605 self.vim_targets.append(target_id)
1606
1607 def _get_db_task(self):
1608 """
1609 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1610 :return: None
1611 """
1612 now = time.time()
1613
1614 if not self.time_last_task_processed:
1615 self.time_last_task_processed = now
1616
1617 try:
1618 while True:
1619 """
1620 # Log RO tasks only when loglevel is DEBUG
1621 if self.logger.getEffectiveLevel() == logging.DEBUG:
1622 self._log_ro_task(
1623 None,
1624 None,
1625 None,
1626 "TASK_WF",
1627 "task_locked_time="
1628 + str(self.task_locked_time)
1629 + " "
1630 + "time_last_task_processed="
1631 + str(self.time_last_task_processed)
1632 + " "
1633 + "now="
1634 + str(now),
1635 )
1636 """
1637 locked = self.db.set_one(
1638 "ro_tasks",
1639 q_filter={
1640 "target_id": self.vim_targets,
1641 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1642 "locked_at.lt": now - self.task_locked_time,
1643 "to_check_at.lt": self.time_last_task_processed,
1644 },
1645 update_dict={"locked_by": self.my_id, "locked_at": now},
1646 fail_on_empty=False,
1647 )
1648
1649 if locked:
1650 # read and return
1651 ro_task = self.db.get_one(
1652 "ro_tasks",
1653 q_filter={
1654 "target_id": self.vim_targets,
1655 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1656 "locked_at": now,
1657 },
1658 )
1659 return ro_task
1660
1661 if self.time_last_task_processed == now:
1662 self.time_last_task_processed = None
1663 return None
1664 else:
1665 self.time_last_task_processed = now
1666 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1667
1668 except DbException as e:
1669 self.logger.error("Database exception at _get_db_task: {}".format(e))
1670 except Exception as e:
1671 self.logger.critical(
1672 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1673 )
1674
1675 return None
1676
1677 def _get_db_all_tasks(self):
1678 """
1679 Read all content of table ro_tasks to log it
1680 :return: None
1681 """
1682 try:
1683 # Checking the content of the BD:
1684
1685 # read and return
1686 ro_task = self.db.get_list("ro_tasks")
1687 for rt in ro_task:
1688 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1689 return ro_task
1690
1691 except DbException as e:
1692 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1693 except Exception as e:
1694 self.logger.critical(
1695 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1696 )
1697
1698 return None
1699
1700 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1701 """
1702 Generate a log with the following format:
1703
1704 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1705 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1706 task_array_index;task_id;task_action;task_item;task_args
1707
1708 Example:
1709
1710 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1711 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1712 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1713 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1714 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1715 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1716 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1717 """
1718 try:
1719 line = []
1720 i = 0
1721 if ro_task is not None and isinstance(ro_task, dict):
1722 for t in ro_task["tasks"]:
1723 line.clear()
1724 line.append(mark)
1725 line.append(event)
1726 line.append(ro_task.get("_id", ""))
1727 line.append(str(ro_task.get("locked_at", "")))
1728 line.append(str(ro_task.get("modified_at", "")))
1729 line.append(str(ro_task.get("created_at", "")))
1730 line.append(str(ro_task.get("to_check_at", "")))
1731 line.append(str(ro_task.get("locked_by", "")))
1732 line.append(str(ro_task.get("target_id", "")))
1733 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1734 line.append(str(ro_task.get("vim_info", "")))
1735 line.append(str(ro_task.get("tasks", "")))
1736 if isinstance(t, dict):
1737 line.append(str(t.get("status", "")))
1738 line.append(str(t.get("action_id", "")))
1739 line.append(str(i))
1740 line.append(str(t.get("task_id", "")))
1741 line.append(str(t.get("action", "")))
1742 line.append(str(t.get("item", "")))
1743 line.append(str(t.get("find_params", "")))
1744 line.append(str(t.get("params", "")))
1745 else:
1746 line.extend([""] * 2)
1747 line.append(str(i))
1748 line.extend([""] * 5)
1749
1750 i += 1
1751 self.logger.debug(";".join(line))
1752 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1753 i = 0
1754 while True:
1755 st = "tasks.{}.status".format(i)
1756 if st not in db_ro_task_update:
1757 break
1758 line.clear()
1759 line.append(mark)
1760 line.append(event)
1761 line.append(db_ro_task_update.get("_id", ""))
1762 line.append(str(db_ro_task_update.get("locked_at", "")))
1763 line.append(str(db_ro_task_update.get("modified_at", "")))
1764 line.append("")
1765 line.append(str(db_ro_task_update.get("to_check_at", "")))
1766 line.append(str(db_ro_task_update.get("locked_by", "")))
1767 line.append("")
1768 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1769 line.append("")
1770 line.append(str(db_ro_task_update.get("vim_info", "")))
1771 line.append(str(str(db_ro_task_update).count(".status")))
1772 line.append(db_ro_task_update.get(st, ""))
1773 line.append("")
1774 line.append(str(i))
1775 line.extend([""] * 3)
1776 i += 1
1777 self.logger.debug(";".join(line))
1778
1779 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1780 line.clear()
1781 line.append(mark)
1782 line.append(event)
1783 line.append(db_ro_task_delete.get("_id", ""))
1784 line.append("")
1785 line.append(db_ro_task_delete.get("modified_at", ""))
1786 line.extend([""] * 13)
1787 self.logger.debug(";".join(line))
1788
1789 else:
1790 line.clear()
1791 line.append(mark)
1792 line.append(event)
1793 line.extend([""] * 16)
1794 self.logger.debug(";".join(line))
1795
1796 except Exception as e:
1797 self.logger.error("Error logging ro_task: {}".format(e))
1798
1799 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1800 """
1801 Determine if this task need to be done or superseded
1802 :return: None
1803 """
1804 my_task = ro_task["tasks"][task_index]
1805 task_id = my_task["task_id"]
1806 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1807 "created_items", False
1808 )
1809
1810 if my_task["status"] == "FAILED":
1811 return None, None # TODO need to be retry??
1812
1813 try:
1814 for index, task in enumerate(ro_task["tasks"]):
1815 if index == task_index or not task:
1816 continue # own task
1817
1818 if (
1819 my_task["target_record"] == task["target_record"]
1820 and task["action"] == "CREATE"
1821 ):
1822 # set to finished
1823 db_update["tasks.{}.status".format(index)] = task[
1824 "status"
1825 ] = "FINISHED"
1826 elif task["action"] == "CREATE" and task["status"] not in (
1827 "FINISHED",
1828 "SUPERSEDED",
1829 ):
1830 needed_delete = False
1831
1832 if needed_delete:
1833 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1834 else:
1835 return "SUPERSEDED", None
1836 except Exception as e:
1837 if not isinstance(e, NsWorkerException):
1838 self.logger.critical(
1839 "Unexpected exception at _delete_task task={}: {}".format(
1840 task_id, e
1841 ),
1842 exc_info=True,
1843 )
1844
1845 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1846
1847 def _create_task(self, ro_task, task_index, task_depends, db_update):
1848 """
1849 Determine if this task need to create something at VIM
1850 :return: None
1851 """
1852 my_task = ro_task["tasks"][task_index]
1853 task_id = my_task["task_id"]
1854 task_status = None
1855
1856 if my_task["status"] == "FAILED":
1857 return None, None # TODO need to be retry??
1858 elif my_task["status"] == "SCHEDULED":
1859 # check if already created by another task
1860 for index, task in enumerate(ro_task["tasks"]):
1861 if index == task_index or not task:
1862 continue # own task
1863
1864 if task["action"] == "CREATE" and task["status"] not in (
1865 "SCHEDULED",
1866 "FINISHED",
1867 "SUPERSEDED",
1868 ):
1869 return task["status"], "COPY_VIM_INFO"
1870
1871 try:
1872 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1873 ro_task, task_index, task_depends
1874 )
1875 # TODO update other CREATE tasks
1876 except Exception as e:
1877 if not isinstance(e, NsWorkerException):
1878 self.logger.error(
1879 "Error executing task={}: {}".format(task_id, e), exc_info=True
1880 )
1881
1882 task_status = "FAILED"
1883 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1884 # TODO update ro_vim_item_update
1885
1886 return task_status, ro_vim_item_update
1887 else:
1888 return None, None
1889
1890 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1891 """
1892 Look for dependency task
1893 :param task_id: Can be one of
1894 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1895 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1896 3. task.task_id: "<action_id>:number"
1897 :param ro_task:
1898 :param target_id:
1899 :return: database ro_task plus index of task
1900 """
1901 if (
1902 task_id.startswith("vim:")
1903 or task_id.startswith("sdn:")
1904 or task_id.startswith("wim:")
1905 ):
1906 target_id, _, task_id = task_id.partition(" ")
1907
1908 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1909 ro_task_dependency = self.db.get_one(
1910 "ro_tasks",
1911 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1912 fail_on_empty=False,
1913 )
1914
1915 if ro_task_dependency:
1916 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1917 if task["target_record_id"] == task_id:
1918 return ro_task_dependency, task_index
1919
1920 else:
1921 if ro_task:
1922 for task_index, task in enumerate(ro_task["tasks"]):
1923 if task and task["task_id"] == task_id:
1924 return ro_task, task_index
1925
1926 ro_task_dependency = self.db.get_one(
1927 "ro_tasks",
1928 q_filter={
1929 "tasks.ANYINDEX.task_id": task_id,
1930 "tasks.ANYINDEX.target_record.ne": None,
1931 },
1932 fail_on_empty=False,
1933 )
1934
1935 if ro_task_dependency:
1936 for task_index, task in ro_task_dependency["tasks"]:
1937 if task["task_id"] == task_id:
1938 return ro_task_dependency, task_index
1939 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1940
1941 def _process_pending_tasks(self, ro_task):
1942 ro_task_id = ro_task["_id"]
1943 now = time.time()
1944 # one day
1945 next_check_at = now + (24 * 60 * 60)
1946 db_ro_task_update = {}
1947
1948 def _update_refresh(new_status):
1949 # compute next_refresh
1950 nonlocal task
1951 nonlocal next_check_at
1952 nonlocal db_ro_task_update
1953 nonlocal ro_task
1954
1955 next_refresh = time.time()
1956
1957 if task["item"] in ("image", "flavor"):
1958 next_refresh += self.REFRESH_IMAGE
1959 elif new_status == "BUILD":
1960 next_refresh += self.REFRESH_BUILD
1961 elif new_status == "DONE":
1962 next_refresh += self.REFRESH_ACTIVE
1963 else:
1964 next_refresh += self.REFRESH_ERROR
1965
1966 next_check_at = min(next_check_at, next_refresh)
1967 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1968 ro_task["vim_info"]["refresh_at"] = next_refresh
1969
1970 try:
1971 """
1972 # Log RO tasks only when loglevel is DEBUG
1973 if self.logger.getEffectiveLevel() == logging.DEBUG:
1974 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1975 """
1976 # 0: get task_status_create
1977 lock_object = None
1978 task_status_create = None
1979 task_create = next(
1980 (
1981 t
1982 for t in ro_task["tasks"]
1983 if t
1984 and t["action"] == "CREATE"
1985 and t["status"] in ("BUILD", "DONE")
1986 ),
1987 None,
1988 )
1989
1990 if task_create:
1991 task_status_create = task_create["status"]
1992
1993 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1994 for task_action in ("DELETE", "CREATE", "EXEC"):
1995 db_vim_update = None
1996 new_status = None
1997
1998 for task_index, task in enumerate(ro_task["tasks"]):
1999 if not task:
2000 continue # task deleted
2001
2002 task_depends = {}
2003 target_update = None
2004
2005 if (
2006 (
2007 task_action in ("DELETE", "EXEC")
2008 and task["status"] not in ("SCHEDULED", "BUILD")
2009 )
2010 or task["action"] != task_action
2011 or (
2012 task_action == "CREATE"
2013 and task["status"] in ("FINISHED", "SUPERSEDED")
2014 )
2015 ):
2016 continue
2017
2018 task_path = "tasks.{}.status".format(task_index)
2019 try:
2020 db_vim_info_update = None
2021
2022 if task["status"] == "SCHEDULED":
2023 # check if tasks that this depends on have been completed
2024 dependency_not_completed = False
2025
2026 for dependency_task_id in task.get("depends_on") or ():
2027 (
2028 dependency_ro_task,
2029 dependency_task_index,
2030 ) = self._get_dependency(
2031 dependency_task_id, target_id=ro_task["target_id"]
2032 )
2033 dependency_task = dependency_ro_task["tasks"][
2034 dependency_task_index
2035 ]
2036
2037 if dependency_task["status"] == "SCHEDULED":
2038 dependency_not_completed = True
2039 next_check_at = min(
2040 next_check_at, dependency_ro_task["to_check_at"]
2041 )
2042 # must allow dependent task to be processed first
2043 # to do this set time after last_task_processed
2044 next_check_at = max(
2045 self.time_last_task_processed, next_check_at
2046 )
2047 break
2048 elif dependency_task["status"] == "FAILED":
2049 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2050 task["action"],
2051 task["item"],
2052 dependency_task["action"],
2053 dependency_task["item"],
2054 dependency_task_id,
2055 dependency_ro_task["vim_info"].get(
2056 "vim_details"
2057 ),
2058 )
2059 self.logger.error(
2060 "task={} {}".format(task["task_id"], error_text)
2061 )
2062 raise NsWorkerException(error_text)
2063
2064 task_depends[dependency_task_id] = dependency_ro_task[
2065 "vim_info"
2066 ]["vim_id"]
2067 task_depends[
2068 "TASK-{}".format(dependency_task_id)
2069 ] = dependency_ro_task["vim_info"]["vim_id"]
2070
2071 if dependency_not_completed:
2072 # TODO set at vim_info.vim_details that it is waiting
2073 continue
2074
2075 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2076 # the task of renew this locking. It will update database locket_at periodically
2077 if not lock_object:
2078 lock_object = LockRenew.add_lock_object(
2079 "ro_tasks", ro_task, self
2080 )
2081
2082 if task["action"] == "DELETE":
2083 (new_status, db_vim_info_update,) = self._delete_task(
2084 ro_task, task_index, task_depends, db_ro_task_update
2085 )
2086 new_status = (
2087 "FINISHED" if new_status == "DONE" else new_status
2088 )
2089 # ^with FINISHED instead of DONE it will not be refreshing
2090
2091 if new_status in ("FINISHED", "SUPERSEDED"):
2092 target_update = "DELETE"
2093 elif task["action"] == "EXEC":
2094 (
2095 new_status,
2096 db_vim_info_update,
2097 db_task_update,
2098 ) = self.item2class[task["item"]].exec(
2099 ro_task, task_index, task_depends
2100 )
2101 new_status = (
2102 "FINISHED" if new_status == "DONE" else new_status
2103 )
2104 # ^with FINISHED instead of DONE it will not be refreshing
2105
2106 if db_task_update:
2107 # load into database the modified db_task_update "retries" and "next_retry"
2108 if db_task_update.get("retries"):
2109 db_ro_task_update[
2110 "tasks.{}.retries".format(task_index)
2111 ] = db_task_update["retries"]
2112
2113 next_check_at = time.time() + db_task_update.get(
2114 "next_retry", 60
2115 )
2116 target_update = None
2117 elif task["action"] == "CREATE":
2118 if task["status"] == "SCHEDULED":
2119 if task_status_create:
2120 new_status = task_status_create
2121 target_update = "COPY_VIM_INFO"
2122 else:
2123 new_status, db_vim_info_update = self.item2class[
2124 task["item"]
2125 ].new(ro_task, task_index, task_depends)
2126 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2127 _update_refresh(new_status)
2128 else:
2129 if (
2130 ro_task["vim_info"]["refresh_at"]
2131 and now > ro_task["vim_info"]["refresh_at"]
2132 ):
2133 new_status, db_vim_info_update = self.item2class[
2134 task["item"]
2135 ].refresh(ro_task)
2136 _update_refresh(new_status)
2137 else:
2138 # The refresh is updated to avoid set the value of "refresh_at" to
2139 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2140 # because it can happen that in this case the task is never processed
2141 _update_refresh(task["status"])
2142
2143 except Exception as e:
2144 new_status = "FAILED"
2145 db_vim_info_update = {
2146 "vim_status": "VIM_ERROR",
2147 "vim_details": str(e),
2148 }
2149
2150 if not isinstance(
2151 e, (NsWorkerException, vimconn.VimConnException)
2152 ):
2153 self.logger.error(
2154 "Unexpected exception at _delete_task task={}: {}".format(
2155 task["task_id"], e
2156 ),
2157 exc_info=True,
2158 )
2159
2160 try:
2161 if db_vim_info_update:
2162 db_vim_update = db_vim_info_update.copy()
2163 db_ro_task_update.update(
2164 {
2165 "vim_info." + k: v
2166 for k, v in db_vim_info_update.items()
2167 }
2168 )
2169 ro_task["vim_info"].update(db_vim_info_update)
2170
2171 if new_status:
2172 if task_action == "CREATE":
2173 task_status_create = new_status
2174 db_ro_task_update[task_path] = new_status
2175
2176 if target_update or db_vim_update:
2177 if target_update == "DELETE":
2178 self._update_target(task, None)
2179 elif target_update == "COPY_VIM_INFO":
2180 self._update_target(task, ro_task["vim_info"])
2181 else:
2182 self._update_target(task, db_vim_update)
2183
2184 except Exception as e:
2185 if (
2186 isinstance(e, DbException)
2187 and e.http_code == HTTPStatus.NOT_FOUND
2188 ):
2189 # if the vnfrs or nsrs has been removed from database, this task must be removed
2190 self.logger.debug(
2191 "marking to delete task={}".format(task["task_id"])
2192 )
2193 self.tasks_to_delete.append(task)
2194 else:
2195 self.logger.error(
2196 "Unexpected exception at _update_target task={}: {}".format(
2197 task["task_id"], e
2198 ),
2199 exc_info=True,
2200 )
2201
2202 locked_at = ro_task["locked_at"]
2203
2204 if lock_object:
2205 locked_at = [
2206 lock_object["locked_at"],
2207 lock_object["locked_at"] + self.task_locked_time,
2208 ]
2209 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2210 # contain exactly locked_at + self.task_locked_time
2211 LockRenew.remove_lock_object(lock_object)
2212
2213 q_filter = {
2214 "_id": ro_task["_id"],
2215 "to_check_at": ro_task["to_check_at"],
2216 "locked_at": locked_at,
2217 }
2218 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2219 # outside this task (by ro_nbi) do not update it
2220 db_ro_task_update["locked_by"] = None
2221 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2222 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2223 db_ro_task_update["modified_at"] = now
2224 db_ro_task_update["to_check_at"] = next_check_at
2225
2226 """
2227 # Log RO tasks only when loglevel is DEBUG
2228 if self.logger.getEffectiveLevel() == logging.DEBUG:
2229 db_ro_task_update_log = db_ro_task_update.copy()
2230 db_ro_task_update_log["_id"] = q_filter["_id"]
2231 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2232 """
2233
2234 if not self.db.set_one(
2235 "ro_tasks",
2236 update_dict=db_ro_task_update,
2237 q_filter=q_filter,
2238 fail_on_empty=False,
2239 ):
2240 del db_ro_task_update["to_check_at"]
2241 del q_filter["to_check_at"]
2242 """
2243 # Log RO tasks only when loglevel is DEBUG
2244 if self.logger.getEffectiveLevel() == logging.DEBUG:
2245 self._log_ro_task(
2246 None,
2247 db_ro_task_update_log,
2248 None,
2249 "TASK_WF",
2250 "SET_TASK " + str(q_filter),
2251 )
2252 """
2253 self.db.set_one(
2254 "ro_tasks",
2255 q_filter=q_filter,
2256 update_dict=db_ro_task_update,
2257 fail_on_empty=True,
2258 )
2259 except DbException as e:
2260 self.logger.error(
2261 "ro_task={} Error updating database {}".format(ro_task_id, e)
2262 )
2263 except Exception as e:
2264 self.logger.error(
2265 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2266 )
2267
2268 def _update_target(self, task, ro_vim_item_update):
2269 table, _, temp = task["target_record"].partition(":")
2270 _id, _, path_vim_status = temp.partition(":")
2271 path_item = path_vim_status[: path_vim_status.rfind(".")]
2272 path_item = path_item[: path_item.rfind(".")]
2273 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2274 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2275
2276 if ro_vim_item_update:
2277 update_dict = {
2278 path_vim_status + "." + k: v
2279 for k, v in ro_vim_item_update.items()
2280 if k
2281 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2282 }
2283
2284 if path_vim_status.startswith("vdur."):
2285 # for backward compatibility, add vdur.name apart from vdur.vim_name
2286 if ro_vim_item_update.get("vim_name"):
2287 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2288
2289 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2290 if ro_vim_item_update.get("vim_id"):
2291 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2292
2293 # update general status
2294 if ro_vim_item_update.get("vim_status"):
2295 update_dict[path_item + ".status"] = ro_vim_item_update[
2296 "vim_status"
2297 ]
2298
2299 if ro_vim_item_update.get("interfaces"):
2300 path_interfaces = path_item + ".interfaces"
2301
2302 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2303 if iface:
2304 update_dict.update(
2305 {
2306 path_interfaces + ".{}.".format(i) + k: v
2307 for k, v in iface.items()
2308 if k in ("vlan", "compute_node", "pci")
2309 }
2310 )
2311
2312 # put ip_address and mac_address with ip-address and mac-address
2313 if iface.get("ip_address"):
2314 update_dict[
2315 path_interfaces + ".{}.".format(i) + "ip-address"
2316 ] = iface["ip_address"]
2317
2318 if iface.get("mac_address"):
2319 update_dict[
2320 path_interfaces + ".{}.".format(i) + "mac-address"
2321 ] = iface["mac_address"]
2322
2323 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2324 update_dict["ip-address"] = iface.get("ip_address").split(
2325 ";"
2326 )[0]
2327
2328 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2329 update_dict[path_item + ".ip-address"] = iface.get(
2330 "ip_address"
2331 ).split(";")[0]
2332
2333 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2334 else:
2335 update_dict = {path_item + ".status": "DELETED"}
2336 self.db.set_one(
2337 table,
2338 q_filter={"_id": _id},
2339 update_dict=update_dict,
2340 unset={path_vim_status: None},
2341 )
2342
2343 def _process_delete_db_tasks(self):
2344 """
2345 Delete task from database because vnfrs or nsrs or both have been deleted
2346 :return: None. Uses and modify self.tasks_to_delete
2347 """
2348 while self.tasks_to_delete:
2349 task = self.tasks_to_delete[0]
2350 vnfrs_deleted = None
2351 nsr_id = task["nsr_id"]
2352
2353 if task["target_record"].startswith("vnfrs:"):
2354 # check if nsrs is present
2355 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2356 vnfrs_deleted = task["target_record"].split(":")[1]
2357
2358 try:
2359 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2360 except Exception as e:
2361 self.logger.error(
2362 "Error deleting task={}: {}".format(task["task_id"], e)
2363 )
2364 self.tasks_to_delete.pop(0)
2365
2366 @staticmethod
2367 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2368 """
2369 Static method because it is called from osm_ng_ro.ns
2370 :param db: instance of database to use
2371 :param nsr_id: affected nsrs id
2372 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2373 :return: None, exception is fails
2374 """
2375 retries = 5
2376 for retry in range(retries):
2377 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2378 now = time.time()
2379 conflict = False
2380
2381 for ro_task in ro_tasks:
2382 db_update = {}
2383 to_delete_ro_task = True
2384
2385 for index, task in enumerate(ro_task["tasks"]):
2386 if not task:
2387 pass
2388 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2389 vnfrs_deleted
2390 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2391 ):
2392 db_update["tasks.{}".format(index)] = None
2393 else:
2394 # used by other nsr, ro_task cannot be deleted
2395 to_delete_ro_task = False
2396
2397 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2398 if to_delete_ro_task:
2399 if not db.del_one(
2400 "ro_tasks",
2401 q_filter={
2402 "_id": ro_task["_id"],
2403 "modified_at": ro_task["modified_at"],
2404 },
2405 fail_on_empty=False,
2406 ):
2407 conflict = True
2408 elif db_update:
2409 db_update["modified_at"] = now
2410 if not db.set_one(
2411 "ro_tasks",
2412 q_filter={
2413 "_id": ro_task["_id"],
2414 "modified_at": ro_task["modified_at"],
2415 },
2416 update_dict=db_update,
2417 fail_on_empty=False,
2418 ):
2419 conflict = True
2420 if not conflict:
2421 return
2422 else:
2423 raise NsWorkerException("Exceeded {} retries".format(retries))
2424
2425 def run(self):
2426 # load database
2427 self.logger.info("Starting")
2428 while True:
2429 # step 1: get commands from queue
2430 try:
2431 if self.vim_targets:
2432 task = self.task_queue.get(block=False)
2433 else:
2434 if not self.idle:
2435 self.logger.debug("enters in idle state")
2436 self.idle = True
2437 task = self.task_queue.get(block=True)
2438 self.idle = False
2439
2440 if task[0] == "terminate":
2441 break
2442 elif task[0] == "load_vim":
2443 self.logger.info("order to load vim {}".format(task[1]))
2444 self._load_vim(task[1])
2445 elif task[0] == "unload_vim":
2446 self.logger.info("order to unload vim {}".format(task[1]))
2447 self._unload_vim(task[1])
2448 elif task[0] == "reload_vim":
2449 self._reload_vim(task[1])
2450 elif task[0] == "check_vim":
2451 self.logger.info("order to check vim {}".format(task[1]))
2452 self._check_vim(task[1])
2453 continue
2454 except Exception as e:
2455 if isinstance(e, queue.Empty):
2456 pass
2457 else:
2458 self.logger.critical(
2459 "Error processing task: {}".format(e), exc_info=True
2460 )
2461
2462 # step 2: process pending_tasks, delete not needed tasks
2463 try:
2464 if self.tasks_to_delete:
2465 self._process_delete_db_tasks()
2466 busy = False
2467 """
2468 # Log RO tasks only when loglevel is DEBUG
2469 if self.logger.getEffectiveLevel() == logging.DEBUG:
2470 _ = self._get_db_all_tasks()
2471 """
2472 ro_task = self._get_db_task()
2473 if ro_task:
2474 self._process_pending_tasks(ro_task)
2475 busy = True
2476 if not busy:
2477 time.sleep(5)
2478 except Exception as e:
2479 self.logger.critical(
2480 "Unexpected exception at run: " + str(e), exc_info=True
2481 )
2482
2483 self.logger.info("Finishing")