c381595e6274f075eace0edde3bae4fa29cc3905
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 }
215 self.logger.debug(
216 "task={} {} new-net={} created={}".format(
217 task_id, ro_task["target_id"], vim_net_id, created
218 )
219 )
220
221 return "BUILD", ro_vim_item_update
222 except (vimconn.VimConnException, NsWorkerException) as e:
223 self.logger.error(
224 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
225 )
226 ro_vim_item_update = {
227 "vim_status": "VIM_ERROR",
228 "created": created,
229 "vim_details": str(e),
230 }
231
232 return "FAILED", ro_vim_item_update
233
234 def refresh(self, ro_task):
235 """Call VIM to get network status"""
236 ro_task_id = ro_task["_id"]
237 target_vim = self.my_vims[ro_task["target_id"]]
238 vim_id = ro_task["vim_info"]["vim_id"]
239 net_to_refresh_list = [vim_id]
240
241 try:
242 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
243 vim_info = vim_dict[vim_id]
244
245 if vim_info["status"] == "ACTIVE":
246 task_status = "DONE"
247 elif vim_info["status"] == "BUILD":
248 task_status = "BUILD"
249 else:
250 task_status = "FAILED"
251 except vimconn.VimConnException as e:
252 # Mark all tasks at VIM_ERROR status
253 self.logger.error(
254 "ro_task={} vim={} get-net={}: {}".format(
255 ro_task_id, ro_task["target_id"], vim_id, e
256 )
257 )
258 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
259 task_status = "FAILED"
260
261 ro_vim_item_update = {}
262 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
263 ro_vim_item_update["vim_status"] = vim_info["status"]
264
265 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
266 ro_vim_item_update["vim_name"] = vim_info.get("name")
267
268 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
269 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
270 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
271 elif vim_info["status"] == "DELETED":
272 ro_vim_item_update["vim_id"] = None
273 ro_vim_item_update["vim_details"] = "Deleted externally"
274 else:
275 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
276 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
277
278 if ro_vim_item_update:
279 self.logger.debug(
280 "ro_task={} {} get-net={}: status={} {}".format(
281 ro_task_id,
282 ro_task["target_id"],
283 vim_id,
284 ro_vim_item_update.get("vim_status"),
285 ro_vim_item_update.get("vim_details")
286 if ro_vim_item_update.get("vim_status") != "ACTIVE"
287 else "",
288 )
289 )
290
291 return task_status, ro_vim_item_update
292
293 def delete(self, ro_task, task_index):
294 task = ro_task["tasks"][task_index]
295 task_id = task["task_id"]
296 net_vim_id = ro_task["vim_info"]["vim_id"]
297 ro_vim_item_update_ok = {
298 "vim_status": "DELETED",
299 "created": False,
300 "vim_details": "DELETED",
301 "vim_id": None,
302 }
303
304 try:
305 if net_vim_id or ro_task["vim_info"]["created_items"]:
306 target_vim = self.my_vims[ro_task["target_id"]]
307 target_vim.delete_network(
308 net_vim_id, ro_task["vim_info"]["created_items"]
309 )
310 except vimconn.VimConnNotFoundException:
311 ro_vim_item_update_ok["vim_details"] = "already deleted"
312 except vimconn.VimConnException as e:
313 self.logger.error(
314 "ro_task={} vim={} del-net={}: {}".format(
315 ro_task["_id"], ro_task["target_id"], net_vim_id, e
316 )
317 )
318 ro_vim_item_update = {
319 "vim_status": "VIM_ERROR",
320 "vim_details": "Error while deleting: {}".format(e),
321 }
322
323 return "FAILED", ro_vim_item_update
324
325 self.logger.debug(
326 "task={} {} del-net={} {}".format(
327 task_id,
328 ro_task["target_id"],
329 net_vim_id,
330 ro_vim_item_update_ok.get("vim_details", ""),
331 )
332 )
333
334 return "DONE", ro_vim_item_update_ok
335
336
337 class VimInteractionVdu(VimInteractionBase):
338 max_retries_inject_ssh_key = 20 # 20 times
339 time_retries_inject_ssh_key = 30 # wevery 30 seconds
340
341 def new(self, ro_task, task_index, task_depends):
342 task = ro_task["tasks"][task_index]
343 task_id = task["task_id"]
344 created = False
345 created_items = {}
346 target_vim = self.my_vims[ro_task["target_id"]]
347
348 try:
349 created = True
350 params = task["params"]
351 params_copy = deepcopy(params)
352 net_list = params_copy["net_list"]
353
354 for net in net_list:
355 # change task_id into network_id
356 if "net_id" in net and net["net_id"].startswith("TASK-"):
357 network_id = task_depends[net["net_id"]]
358
359 if not network_id:
360 raise NsWorkerException(
361 "Cannot create VM because depends on a network not created or found "
362 "for {}".format(net["net_id"])
363 )
364
365 net["net_id"] = network_id
366
367 if params_copy["image_id"].startswith("TASK-"):
368 params_copy["image_id"] = task_depends[params_copy["image_id"]]
369
370 if params_copy["flavor_id"].startswith("TASK-"):
371 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
372
373 affinity_group_list = params_copy["affinity_group_list"]
374 for affinity_group in affinity_group_list:
375 # change task_id into affinity_group_id
376 if "affinity_group_id" in affinity_group and affinity_group[
377 "affinity_group_id"
378 ].startswith("TASK-"):
379 affinity_group_id = task_depends[
380 affinity_group["affinity_group_id"]
381 ]
382
383 if not affinity_group_id:
384 raise NsWorkerException(
385 "found for {}".format(affinity_group["affinity_group_id"])
386 )
387
388 affinity_group["affinity_group_id"] = affinity_group_id
389
390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
392
393 ro_vim_item_update = {
394 "vim_id": vim_vm_id,
395 "vim_status": "BUILD",
396 "created": created,
397 "created_items": created_items,
398 "vim_details": None,
399 "interfaces_vim_ids": interfaces,
400 "interfaces": [],
401 }
402 self.logger.debug(
403 "task={} {} new-vm={} created={}".format(
404 task_id, ro_task["target_id"], vim_vm_id, created
405 )
406 )
407
408 return "BUILD", ro_vim_item_update
409 except (vimconn.VimConnException, NsWorkerException) as e:
410 self.logger.error(
411 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
412 )
413 ro_vim_item_update = {
414 "vim_status": "VIM_ERROR",
415 "created": created,
416 "vim_details": str(e),
417 }
418
419 return "FAILED", ro_vim_item_update
420
421 def delete(self, ro_task, task_index):
422 task = ro_task["tasks"][task_index]
423 task_id = task["task_id"]
424 vm_vim_id = ro_task["vim_info"]["vim_id"]
425 ro_vim_item_update_ok = {
426 "vim_status": "DELETED",
427 "created": False,
428 "vim_details": "DELETED",
429 "vim_id": None,
430 }
431
432 try:
433 if vm_vim_id or ro_task["vim_info"]["created_items"]:
434 target_vim = self.my_vims[ro_task["target_id"]]
435 target_vim.delete_vminstance(
436 vm_vim_id, ro_task["vim_info"]["created_items"]
437 )
438 except vimconn.VimConnNotFoundException:
439 ro_vim_item_update_ok["vim_details"] = "already deleted"
440 except vimconn.VimConnException as e:
441 self.logger.error(
442 "ro_task={} vim={} del-vm={}: {}".format(
443 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
444 )
445 )
446 ro_vim_item_update = {
447 "vim_status": "VIM_ERROR",
448 "vim_details": "Error while deleting: {}".format(e),
449 }
450
451 return "FAILED", ro_vim_item_update
452
453 self.logger.debug(
454 "task={} {} del-vm={} {}".format(
455 task_id,
456 ro_task["target_id"],
457 vm_vim_id,
458 ro_vim_item_update_ok.get("vim_details", ""),
459 )
460 )
461
462 return "DONE", ro_vim_item_update_ok
463
464 def refresh(self, ro_task):
465 """Call VIM to get vm status"""
466 ro_task_id = ro_task["_id"]
467 target_vim = self.my_vims[ro_task["target_id"]]
468 vim_id = ro_task["vim_info"]["vim_id"]
469
470 if not vim_id:
471 return None, None
472
473 vm_to_refresh_list = [vim_id]
474 try:
475 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
476 vim_info = vim_dict[vim_id]
477
478 if vim_info["status"] == "ACTIVE":
479 task_status = "DONE"
480 elif vim_info["status"] == "BUILD":
481 task_status = "BUILD"
482 else:
483 task_status = "FAILED"
484
485 # try to load and parse vim_information
486 try:
487 vim_info_info = yaml.safe_load(vim_info["vim_info"])
488 if vim_info_info.get("name"):
489 vim_info["name"] = vim_info_info["name"]
490 except Exception:
491 pass
492 except vimconn.VimConnException as e:
493 # Mark all tasks at VIM_ERROR status
494 self.logger.error(
495 "ro_task={} vim={} get-vm={}: {}".format(
496 ro_task_id, ro_task["target_id"], vim_id, e
497 )
498 )
499 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
500 task_status = "FAILED"
501
502 ro_vim_item_update = {}
503
504 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
505 vim_interfaces = []
506 if vim_info.get("interfaces"):
507 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
508 iface = next(
509 (
510 iface
511 for iface in vim_info["interfaces"]
512 if vim_iface_id == iface["vim_interface_id"]
513 ),
514 None,
515 )
516 # if iface:
517 # iface.pop("vim_info", None)
518 vim_interfaces.append(iface)
519
520 task_create = next(
521 t
522 for t in ro_task["tasks"]
523 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
524 )
525 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
526 vim_interfaces[task_create["mgmt_vnf_interface"]][
527 "mgmt_vnf_interface"
528 ] = True
529
530 mgmt_vdu_iface = task_create.get(
531 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
532 )
533 if vim_interfaces:
534 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
535
536 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
537 ro_vim_item_update["interfaces"] = vim_interfaces
538
539 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
540 ro_vim_item_update["vim_status"] = vim_info["status"]
541
542 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
543 ro_vim_item_update["vim_name"] = vim_info.get("name")
544
545 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
546 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
547 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
548 elif vim_info["status"] == "DELETED":
549 ro_vim_item_update["vim_id"] = None
550 ro_vim_item_update["vim_details"] = "Deleted externally"
551 else:
552 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
553 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
554
555 if ro_vim_item_update:
556 self.logger.debug(
557 "ro_task={} {} get-vm={}: status={} {}".format(
558 ro_task_id,
559 ro_task["target_id"],
560 vim_id,
561 ro_vim_item_update.get("vim_status"),
562 ro_vim_item_update.get("vim_details")
563 if ro_vim_item_update.get("vim_status") != "ACTIVE"
564 else "",
565 )
566 )
567
568 return task_status, ro_vim_item_update
569
570 def exec(self, ro_task, task_index, task_depends):
571 task = ro_task["tasks"][task_index]
572 task_id = task["task_id"]
573 target_vim = self.my_vims[ro_task["target_id"]]
574 db_task_update = {"retries": 0}
575 retries = task.get("retries", 0)
576
577 try:
578 params = task["params"]
579 params_copy = deepcopy(params)
580 params_copy["ro_key"] = self.db.decrypt(
581 params_copy.pop("private_key"),
582 params_copy.pop("schema_version"),
583 params_copy.pop("salt"),
584 )
585 params_copy["ip_addr"] = params_copy.pop("ip_address")
586 target_vim.inject_user_key(**params_copy)
587 self.logger.debug(
588 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
589 )
590
591 return (
592 "DONE",
593 None,
594 db_task_update,
595 ) # params_copy["key"]
596 except (vimconn.VimConnException, NsWorkerException) as e:
597 retries += 1
598
599 if retries < self.max_retries_inject_ssh_key:
600 return (
601 "BUILD",
602 None,
603 {
604 "retries": retries,
605 "next_retry": self.time_retries_inject_ssh_key,
606 },
607 )
608
609 self.logger.error(
610 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
611 )
612 ro_vim_item_update = {"vim_details": str(e)}
613
614 return "FAILED", ro_vim_item_update, db_task_update
615
616
617 class VimInteractionImage(VimInteractionBase):
618 def new(self, ro_task, task_index, task_depends):
619 task = ro_task["tasks"][task_index]
620 task_id = task["task_id"]
621 created = False
622 created_items = {}
623 target_vim = self.my_vims[ro_task["target_id"]]
624
625 try:
626 # FIND
627 if task.get("find_params"):
628 vim_images = target_vim.get_image_list(**task["find_params"])
629
630 if not vim_images:
631 raise NsWorkerExceptionNotFound(
632 "Image not found with this criteria: '{}'".format(
633 task["find_params"]
634 )
635 )
636 elif len(vim_images) > 1:
637 raise NsWorkerException(
638 "More than one image found with this criteria: '{}'".format(
639 task["find_params"]
640 )
641 )
642 else:
643 vim_image_id = vim_images[0]["id"]
644
645 ro_vim_item_update = {
646 "vim_id": vim_image_id,
647 "vim_status": "DONE",
648 "created": created,
649 "created_items": created_items,
650 "vim_details": None,
651 }
652 self.logger.debug(
653 "task={} {} new-image={} created={}".format(
654 task_id, ro_task["target_id"], vim_image_id, created
655 )
656 )
657
658 return "DONE", ro_vim_item_update
659 except (NsWorkerException, vimconn.VimConnException) as e:
660 self.logger.error(
661 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
662 )
663 ro_vim_item_update = {
664 "vim_status": "VIM_ERROR",
665 "created": created,
666 "vim_details": str(e),
667 }
668
669 return "FAILED", ro_vim_item_update
670
671
672 class VimInteractionFlavor(VimInteractionBase):
673 def delete(self, ro_task, task_index):
674 task = ro_task["tasks"][task_index]
675 task_id = task["task_id"]
676 flavor_vim_id = ro_task["vim_info"]["vim_id"]
677 ro_vim_item_update_ok = {
678 "vim_status": "DELETED",
679 "created": False,
680 "vim_details": "DELETED",
681 "vim_id": None,
682 }
683
684 try:
685 if flavor_vim_id:
686 target_vim = self.my_vims[ro_task["target_id"]]
687 target_vim.delete_flavor(flavor_vim_id)
688 except vimconn.VimConnNotFoundException:
689 ro_vim_item_update_ok["vim_details"] = "already deleted"
690 except vimconn.VimConnException as e:
691 self.logger.error(
692 "ro_task={} vim={} del-flavor={}: {}".format(
693 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
694 )
695 )
696 ro_vim_item_update = {
697 "vim_status": "VIM_ERROR",
698 "vim_details": "Error while deleting: {}".format(e),
699 }
700
701 return "FAILED", ro_vim_item_update
702
703 self.logger.debug(
704 "task={} {} del-flavor={} {}".format(
705 task_id,
706 ro_task["target_id"],
707 flavor_vim_id,
708 ro_vim_item_update_ok.get("vim_details", ""),
709 )
710 )
711
712 return "DONE", ro_vim_item_update_ok
713
714 def new(self, ro_task, task_index, task_depends):
715 task = ro_task["tasks"][task_index]
716 task_id = task["task_id"]
717 created = False
718 created_items = {}
719 target_vim = self.my_vims[ro_task["target_id"]]
720
721 try:
722 # FIND
723 vim_flavor_id = None
724
725 if task.get("find_params"):
726 try:
727 flavor_data = task["find_params"]["flavor_data"]
728 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
729 except vimconn.VimConnNotFoundException:
730 pass
731
732 if not vim_flavor_id and task.get("params"):
733 # CREATE
734 flavor_data = task["params"]["flavor_data"]
735 vim_flavor_id = target_vim.new_flavor(flavor_data)
736 created = True
737
738 ro_vim_item_update = {
739 "vim_id": vim_flavor_id,
740 "vim_status": "DONE",
741 "created": created,
742 "created_items": created_items,
743 "vim_details": None,
744 }
745 self.logger.debug(
746 "task={} {} new-flavor={} created={}".format(
747 task_id, ro_task["target_id"], vim_flavor_id, created
748 )
749 )
750
751 return "DONE", ro_vim_item_update
752 except (vimconn.VimConnException, NsWorkerException) as e:
753 self.logger.error(
754 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
755 )
756 ro_vim_item_update = {
757 "vim_status": "VIM_ERROR",
758 "created": created,
759 "vim_details": str(e),
760 }
761
762 return "FAILED", ro_vim_item_update
763
764
765 class VimInteractionAffinityGroup(VimInteractionBase):
766 def delete(self, ro_task, task_index):
767 task = ro_task["tasks"][task_index]
768 task_id = task["task_id"]
769 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
770 ro_vim_item_update_ok = {
771 "vim_status": "DELETED",
772 "created": False,
773 "vim_details": "DELETED",
774 "vim_id": None,
775 }
776
777 try:
778 if affinity_group_vim_id:
779 target_vim = self.my_vims[ro_task["target_id"]]
780 target_vim.delete_affinity_group(affinity_group_vim_id)
781 except vimconn.VimConnNotFoundException:
782 ro_vim_item_update_ok["vim_details"] = "already deleted"
783 except vimconn.VimConnException as e:
784 self.logger.error(
785 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
786 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
787 )
788 )
789 ro_vim_item_update = {
790 "vim_status": "VIM_ERROR",
791 "vim_details": "Error while deleting: {}".format(e),
792 }
793
794 return "FAILED", ro_vim_item_update
795
796 self.logger.debug(
797 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
798 task_id,
799 ro_task["target_id"],
800 affinity_group_vim_id,
801 ro_vim_item_update_ok.get("vim_details", ""),
802 )
803 )
804
805 return "DONE", ro_vim_item_update_ok
806
807 def new(self, ro_task, task_index, task_depends):
808 task = ro_task["tasks"][task_index]
809 task_id = task["task_id"]
810 created = False
811 created_items = {}
812 target_vim = self.my_vims[ro_task["target_id"]]
813
814 try:
815 affinity_group_vim_id = None
816 affinity_group_data = None
817
818 if task.get("params"):
819 affinity_group_data = task["params"].get("affinity_group_data")
820
821 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
822 try:
823 param_affinity_group_id = task["params"]["affinity_group_data"].get(
824 "vim-affinity-group-id"
825 )
826 affinity_group_vim_id = target_vim.get_affinity_group(
827 param_affinity_group_id
828 ).get("id")
829 except vimconn.VimConnNotFoundException:
830 self.logger.error(
831 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
832 "could not be found at VIM. Creating a new one.".format(
833 task_id, ro_task["target_id"], param_affinity_group_id
834 )
835 )
836
837 if not affinity_group_vim_id and affinity_group_data:
838 affinity_group_vim_id = target_vim.new_affinity_group(
839 affinity_group_data
840 )
841 created = True
842
843 ro_vim_item_update = {
844 "vim_id": affinity_group_vim_id,
845 "vim_status": "DONE",
846 "created": created,
847 "created_items": created_items,
848 "vim_details": None,
849 }
850 self.logger.debug(
851 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
852 task_id, ro_task["target_id"], affinity_group_vim_id, created
853 )
854 )
855
856 return "DONE", ro_vim_item_update
857 except (vimconn.VimConnException, NsWorkerException) as e:
858 self.logger.error(
859 "task={} vim={} new-affinity-or-anti-affinity-group:"
860 " {}".format(task_id, ro_task["target_id"], e)
861 )
862 ro_vim_item_update = {
863 "vim_status": "VIM_ERROR",
864 "created": created,
865 "vim_details": str(e),
866 }
867
868 return "FAILED", ro_vim_item_update
869
870
871 class VimInteractionSdnNet(VimInteractionBase):
872 @staticmethod
873 def _match_pci(port_pci, mapping):
874 """
875 Check if port_pci matches with mapping
876 mapping can have brackets to indicate that several chars are accepted. e.g
877 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
878 :param port_pci: text
879 :param mapping: text, can contain brackets to indicate several chars are available
880 :return: True if matches, False otherwise
881 """
882 if not port_pci or not mapping:
883 return False
884 if port_pci == mapping:
885 return True
886
887 mapping_index = 0
888 pci_index = 0
889 while True:
890 bracket_start = mapping.find("[", mapping_index)
891
892 if bracket_start == -1:
893 break
894
895 bracket_end = mapping.find("]", bracket_start)
896 if bracket_end == -1:
897 break
898
899 length = bracket_start - mapping_index
900 if (
901 length
902 and port_pci[pci_index : pci_index + length]
903 != mapping[mapping_index:bracket_start]
904 ):
905 return False
906
907 if (
908 port_pci[pci_index + length]
909 not in mapping[bracket_start + 1 : bracket_end]
910 ):
911 return False
912
913 pci_index += length + 1
914 mapping_index = bracket_end + 1
915
916 if port_pci[pci_index:] != mapping[mapping_index:]:
917 return False
918
919 return True
920
921 def _get_interfaces(self, vlds_to_connect, vim_account_id):
922 """
923 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
924 :param vim_account_id:
925 :return:
926 """
927 interfaces = []
928
929 for vld in vlds_to_connect:
930 table, _, db_id = vld.partition(":")
931 db_id, _, vld = db_id.partition(":")
932 _, _, vld_id = vld.partition(".")
933
934 if table == "vnfrs":
935 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
936 iface_key = "vnf-vld-id"
937 else: # table == "nsrs"
938 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
939 iface_key = "ns-vld-id"
940
941 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
942
943 for db_vnfr in db_vnfrs:
944 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
945 for iface_index, interface in enumerate(vdur["interfaces"]):
946 if interface.get(iface_key) == vld_id and interface.get(
947 "type"
948 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
949 # only SR-IOV o PT
950 interface_ = interface.copy()
951 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
952 db_vnfr["_id"], vdu_index, iface_index
953 )
954
955 if vdur.get("status") == "ERROR":
956 interface_["status"] = "ERROR"
957
958 interfaces.append(interface_)
959
960 return interfaces
961
962 def refresh(self, ro_task):
963 # look for task create
964 task_create_index, _ = next(
965 i_t
966 for i_t in enumerate(ro_task["tasks"])
967 if i_t[1]
968 and i_t[1]["action"] == "CREATE"
969 and i_t[1]["status"] != "FINISHED"
970 )
971
972 return self.new(ro_task, task_create_index, None)
973
974 def new(self, ro_task, task_index, task_depends):
975
976 task = ro_task["tasks"][task_index]
977 task_id = task["task_id"]
978 target_vim = self.my_vims[ro_task["target_id"]]
979
980 sdn_net_id = ro_task["vim_info"]["vim_id"]
981
982 created_items = ro_task["vim_info"].get("created_items")
983 connected_ports = ro_task["vim_info"].get("connected_ports", [])
984 new_connected_ports = []
985 last_update = ro_task["vim_info"].get("last_update", 0)
986 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
987 error_list = []
988 created = ro_task["vim_info"].get("created", False)
989
990 try:
991 # CREATE
992 params = task["params"]
993 vlds_to_connect = params.get("vlds", [])
994 associated_vim = params.get("target_vim")
995 # external additional ports
996 additional_ports = params.get("sdn-ports") or ()
997 _, _, vim_account_id = (
998 (None, None, None)
999 if associated_vim is None
1000 else associated_vim.partition(":")
1001 )
1002
1003 if associated_vim:
1004 # get associated VIM
1005 if associated_vim not in self.db_vims:
1006 self.db_vims[associated_vim] = self.db.get_one(
1007 "vim_accounts", {"_id": vim_account_id}
1008 )
1009
1010 db_vim = self.db_vims[associated_vim]
1011
1012 # look for ports to connect
1013 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1014 # print(ports)
1015
1016 sdn_ports = []
1017 pending_ports = error_ports = 0
1018 vlan_used = None
1019 sdn_need_update = False
1020
1021 for port in ports:
1022 vlan_used = port.get("vlan") or vlan_used
1023
1024 # TODO. Do not connect if already done
1025 if not port.get("compute_node") or not port.get("pci"):
1026 if port.get("status") == "ERROR":
1027 error_ports += 1
1028 else:
1029 pending_ports += 1
1030 continue
1031
1032 pmap = None
1033 compute_node_mappings = next(
1034 (
1035 c
1036 for c in db_vim["config"].get("sdn-port-mapping", ())
1037 if c and c["compute_node"] == port["compute_node"]
1038 ),
1039 None,
1040 )
1041
1042 if compute_node_mappings:
1043 # process port_mapping pci of type 0000:af:1[01].[1357]
1044 pmap = next(
1045 (
1046 p
1047 for p in compute_node_mappings["ports"]
1048 if self._match_pci(port["pci"], p.get("pci"))
1049 ),
1050 None,
1051 )
1052
1053 if not pmap:
1054 if not db_vim["config"].get("mapping_not_needed"):
1055 error_list.append(
1056 "Port mapping not found for compute_node={} pci={}".format(
1057 port["compute_node"], port["pci"]
1058 )
1059 )
1060 continue
1061
1062 pmap = {}
1063
1064 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1065 new_port = {
1066 "service_endpoint_id": pmap.get("service_endpoint_id")
1067 or service_endpoint_id,
1068 "service_endpoint_encapsulation_type": "dot1q"
1069 if port["type"] == "SR-IOV"
1070 else None,
1071 "service_endpoint_encapsulation_info": {
1072 "vlan": port.get("vlan"),
1073 "mac": port.get("mac-address"),
1074 "device_id": pmap.get("device_id") or port["compute_node"],
1075 "device_interface_id": pmap.get("device_interface_id")
1076 or port["pci"],
1077 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1078 "switch_port": pmap.get("switch_port"),
1079 "service_mapping_info": pmap.get("service_mapping_info"),
1080 },
1081 }
1082
1083 # TODO
1084 # if port["modified_at"] > last_update:
1085 # sdn_need_update = True
1086 new_connected_ports.append(port["id"]) # TODO
1087 sdn_ports.append(new_port)
1088
1089 if error_ports:
1090 error_list.append(
1091 "{} interfaces have not been created as VDU is on ERROR status".format(
1092 error_ports
1093 )
1094 )
1095
1096 # connect external ports
1097 for index, additional_port in enumerate(additional_ports):
1098 additional_port_id = additional_port.get(
1099 "service_endpoint_id"
1100 ) or "external-{}".format(index)
1101 sdn_ports.append(
1102 {
1103 "service_endpoint_id": additional_port_id,
1104 "service_endpoint_encapsulation_type": additional_port.get(
1105 "service_endpoint_encapsulation_type", "dot1q"
1106 ),
1107 "service_endpoint_encapsulation_info": {
1108 "vlan": additional_port.get("vlan") or vlan_used,
1109 "mac": additional_port.get("mac_address"),
1110 "device_id": additional_port.get("device_id"),
1111 "device_interface_id": additional_port.get(
1112 "device_interface_id"
1113 ),
1114 "switch_dpid": additional_port.get("switch_dpid")
1115 or additional_port.get("switch_id"),
1116 "switch_port": additional_port.get("switch_port"),
1117 "service_mapping_info": additional_port.get(
1118 "service_mapping_info"
1119 ),
1120 },
1121 }
1122 )
1123 new_connected_ports.append(additional_port_id)
1124 sdn_info = ""
1125
1126 # if there are more ports to connect or they have been modified, call create/update
1127 if error_list:
1128 sdn_status = "ERROR"
1129 sdn_info = "; ".join(error_list)
1130 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1131 last_update = time.time()
1132
1133 if not sdn_net_id:
1134 if len(sdn_ports) < 2:
1135 sdn_status = "ACTIVE"
1136
1137 if not pending_ports:
1138 self.logger.debug(
1139 "task={} {} new-sdn-net done, less than 2 ports".format(
1140 task_id, ro_task["target_id"]
1141 )
1142 )
1143 else:
1144 net_type = params.get("type") or "ELAN"
1145 (
1146 sdn_net_id,
1147 created_items,
1148 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1149 created = True
1150 self.logger.debug(
1151 "task={} {} new-sdn-net={} created={}".format(
1152 task_id, ro_task["target_id"], sdn_net_id, created
1153 )
1154 )
1155 else:
1156 created_items = target_vim.edit_connectivity_service(
1157 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1158 )
1159 created = True
1160 self.logger.debug(
1161 "task={} {} update-sdn-net={} created={}".format(
1162 task_id, ro_task["target_id"], sdn_net_id, created
1163 )
1164 )
1165
1166 connected_ports = new_connected_ports
1167 elif sdn_net_id:
1168 wim_status_dict = target_vim.get_connectivity_service_status(
1169 sdn_net_id, conn_info=created_items
1170 )
1171 sdn_status = wim_status_dict["sdn_status"]
1172
1173 if wim_status_dict.get("sdn_info"):
1174 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1175
1176 if wim_status_dict.get("error_msg"):
1177 sdn_info = wim_status_dict.get("error_msg") or ""
1178
1179 if pending_ports:
1180 if sdn_status != "ERROR":
1181 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1182 len(ports) - pending_ports, len(ports)
1183 )
1184
1185 if sdn_status == "ACTIVE":
1186 sdn_status = "BUILD"
1187
1188 ro_vim_item_update = {
1189 "vim_id": sdn_net_id,
1190 "vim_status": sdn_status,
1191 "created": created,
1192 "created_items": created_items,
1193 "connected_ports": connected_ports,
1194 "vim_details": sdn_info,
1195 "last_update": last_update,
1196 }
1197
1198 return sdn_status, ro_vim_item_update
1199 except Exception as e:
1200 self.logger.error(
1201 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1202 exc_info=not isinstance(
1203 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1204 ),
1205 )
1206 ro_vim_item_update = {
1207 "vim_status": "VIM_ERROR",
1208 "created": created,
1209 "vim_details": str(e),
1210 }
1211
1212 return "FAILED", ro_vim_item_update
1213
1214 def delete(self, ro_task, task_index):
1215 task = ro_task["tasks"][task_index]
1216 task_id = task["task_id"]
1217 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1218 ro_vim_item_update_ok = {
1219 "vim_status": "DELETED",
1220 "created": False,
1221 "vim_details": "DELETED",
1222 "vim_id": None,
1223 }
1224
1225 try:
1226 if sdn_vim_id:
1227 target_vim = self.my_vims[ro_task["target_id"]]
1228 target_vim.delete_connectivity_service(
1229 sdn_vim_id, ro_task["vim_info"].get("created_items")
1230 )
1231
1232 except Exception as e:
1233 if (
1234 isinstance(e, sdnconn.SdnConnectorError)
1235 and e.http_code == HTTPStatus.NOT_FOUND.value
1236 ):
1237 ro_vim_item_update_ok["vim_details"] = "already deleted"
1238 else:
1239 self.logger.error(
1240 "ro_task={} vim={} del-sdn-net={}: {}".format(
1241 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1242 ),
1243 exc_info=not isinstance(
1244 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1245 ),
1246 )
1247 ro_vim_item_update = {
1248 "vim_status": "VIM_ERROR",
1249 "vim_details": "Error while deleting: {}".format(e),
1250 }
1251
1252 return "FAILED", ro_vim_item_update
1253
1254 self.logger.debug(
1255 "task={} {} del-sdn-net={} {}".format(
1256 task_id,
1257 ro_task["target_id"],
1258 sdn_vim_id,
1259 ro_vim_item_update_ok.get("vim_details", ""),
1260 )
1261 )
1262
1263 return "DONE", ro_vim_item_update_ok
1264
1265
1266 class NsWorker(threading.Thread):
1267 REFRESH_BUILD = 5 # 5 seconds
1268 REFRESH_ACTIVE = 60 # 1 minute
1269 REFRESH_ERROR = 600
1270 REFRESH_IMAGE = 3600 * 10
1271 REFRESH_DELETE = 3600 * 10
1272 QUEUE_SIZE = 100
1273 terminate = False
1274
1275 def __init__(self, worker_index, config, plugins, db):
1276 """
1277
1278 :param worker_index: thread index
1279 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1280 :param plugins: global shared dict with the loaded plugins
1281 :param db: database class instance to use
1282 """
1283 threading.Thread.__init__(self)
1284 self.config = config
1285 self.plugins = plugins
1286 self.plugin_name = "unknown"
1287 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1288 self.worker_index = worker_index
1289 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1290 # targetvim: vimplugin class
1291 self.my_vims = {}
1292 # targetvim: vim information from database
1293 self.db_vims = {}
1294 # targetvim list
1295 self.vim_targets = []
1296 self.my_id = config["process_id"] + ":" + str(worker_index)
1297 self.db = db
1298 self.item2class = {
1299 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1300 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1301 "image": VimInteractionImage(
1302 self.db, self.my_vims, self.db_vims, self.logger
1303 ),
1304 "flavor": VimInteractionFlavor(
1305 self.db, self.my_vims, self.db_vims, self.logger
1306 ),
1307 "sdn_net": VimInteractionSdnNet(
1308 self.db, self.my_vims, self.db_vims, self.logger
1309 ),
1310 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1311 self.db, self.my_vims, self.db_vims, self.logger
1312 ),
1313 }
1314 self.time_last_task_processed = None
1315 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1316 self.tasks_to_delete = []
1317 # it is idle when there are not vim_targets associated
1318 self.idle = True
1319 self.task_locked_time = config["global"]["task_locked_time"]
1320
1321 def insert_task(self, task):
1322 try:
1323 self.task_queue.put(task, False)
1324 return None
1325 except queue.Full:
1326 raise NsWorkerException("timeout inserting a task")
1327
1328 def terminate(self):
1329 self.insert_task("exit")
1330
1331 def del_task(self, task):
1332 with self.task_lock:
1333 if task["status"] == "SCHEDULED":
1334 task["status"] = "SUPERSEDED"
1335 return True
1336 else: # task["status"] == "processing"
1337 self.task_lock.release()
1338 return False
1339
1340 def _process_vim_config(self, target_id, db_vim):
1341 """
1342 Process vim config, creating vim configuration files as ca_cert
1343 :param target_id: vim/sdn/wim + id
1344 :param db_vim: Vim dictionary obtained from database
1345 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1346 """
1347 if not db_vim.get("config"):
1348 return
1349
1350 file_name = ""
1351
1352 try:
1353 if db_vim["config"].get("ca_cert_content"):
1354 file_name = "{}:{}".format(target_id, self.worker_index)
1355
1356 try:
1357 mkdir(file_name)
1358 except FileExistsError:
1359 pass
1360
1361 file_name = file_name + "/ca_cert"
1362
1363 with open(file_name, "w") as f:
1364 f.write(db_vim["config"]["ca_cert_content"])
1365 del db_vim["config"]["ca_cert_content"]
1366 db_vim["config"]["ca_cert"] = file_name
1367 except Exception as e:
1368 raise NsWorkerException(
1369 "Error writing to file '{}': {}".format(file_name, e)
1370 )
1371
1372 def _load_plugin(self, name, type="vim"):
1373 # type can be vim or sdn
1374 if "rovim_dummy" not in self.plugins:
1375 self.plugins["rovim_dummy"] = VimDummyConnector
1376
1377 if "rosdn_dummy" not in self.plugins:
1378 self.plugins["rosdn_dummy"] = SdnDummyConnector
1379
1380 if name in self.plugins:
1381 return self.plugins[name]
1382
1383 try:
1384 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1385 self.plugins[name] = ep.load()
1386 except Exception as e:
1387 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1388
1389 if name and name not in self.plugins:
1390 raise NsWorkerException(
1391 "Plugin 'osm_{n}' has not been installed".format(n=name)
1392 )
1393
1394 return self.plugins[name]
1395
1396 def _unload_vim(self, target_id):
1397 """
1398 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1399 :param target_id: Contains type:_id; where type can be 'vim', ...
1400 :return: None.
1401 """
1402 try:
1403 self.db_vims.pop(target_id, None)
1404 self.my_vims.pop(target_id, None)
1405
1406 if target_id in self.vim_targets:
1407 self.vim_targets.remove(target_id)
1408
1409 self.logger.info("Unloaded {}".format(target_id))
1410 rmtree("{}:{}".format(target_id, self.worker_index))
1411 except FileNotFoundError:
1412 pass # this is raised by rmtree if folder does not exist
1413 except Exception as e:
1414 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1415
1416 def _check_vim(self, target_id):
1417 """
1418 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1419 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1420 :return: None.
1421 """
1422 target, _, _id = target_id.partition(":")
1423 now = time.time()
1424 update_dict = {}
1425 unset_dict = {}
1426 op_text = ""
1427 step = ""
1428 loaded = target_id in self.vim_targets
1429 target_database = (
1430 "vim_accounts"
1431 if target == "vim"
1432 else "wim_accounts"
1433 if target == "wim"
1434 else "sdns"
1435 )
1436
1437 try:
1438 step = "Getting {} from db".format(target_id)
1439 db_vim = self.db.get_one(target_database, {"_id": _id})
1440
1441 for op_index, operation in enumerate(
1442 db_vim["_admin"].get("operations", ())
1443 ):
1444 if operation["operationState"] != "PROCESSING":
1445 continue
1446
1447 locked_at = operation.get("locked_at")
1448
1449 if locked_at is not None and locked_at >= now - self.task_locked_time:
1450 # some other thread is doing this operation
1451 return
1452
1453 # lock
1454 op_text = "_admin.operations.{}.".format(op_index)
1455
1456 if not self.db.set_one(
1457 target_database,
1458 q_filter={
1459 "_id": _id,
1460 op_text + "operationState": "PROCESSING",
1461 op_text + "locked_at": locked_at,
1462 },
1463 update_dict={
1464 op_text + "locked_at": now,
1465 "admin.current_operation": op_index,
1466 },
1467 fail_on_empty=False,
1468 ):
1469 return
1470
1471 unset_dict[op_text + "locked_at"] = None
1472 unset_dict["current_operation"] = None
1473 step = "Loading " + target_id
1474 error_text = self._load_vim(target_id)
1475
1476 if not error_text:
1477 step = "Checking connectivity"
1478
1479 if target == "vim":
1480 self.my_vims[target_id].check_vim_connectivity()
1481 else:
1482 self.my_vims[target_id].check_credentials()
1483
1484 update_dict["_admin.operationalState"] = "ENABLED"
1485 update_dict["_admin.detailed-status"] = ""
1486 unset_dict[op_text + "detailed-status"] = None
1487 update_dict[op_text + "operationState"] = "COMPLETED"
1488
1489 return
1490
1491 except Exception as e:
1492 error_text = "{}: {}".format(step, e)
1493 self.logger.error("{} for {}: {}".format(step, target_id, e))
1494
1495 finally:
1496 if update_dict or unset_dict:
1497 if error_text:
1498 update_dict[op_text + "operationState"] = "FAILED"
1499 update_dict[op_text + "detailed-status"] = error_text
1500 unset_dict.pop(op_text + "detailed-status", None)
1501 update_dict["_admin.operationalState"] = "ERROR"
1502 update_dict["_admin.detailed-status"] = error_text
1503
1504 if op_text:
1505 update_dict[op_text + "statusEnteredTime"] = now
1506
1507 self.db.set_one(
1508 target_database,
1509 q_filter={"_id": _id},
1510 update_dict=update_dict,
1511 unset=unset_dict,
1512 fail_on_empty=False,
1513 )
1514
1515 if not loaded:
1516 self._unload_vim(target_id)
1517
1518 def _reload_vim(self, target_id):
1519 if target_id in self.vim_targets:
1520 self._load_vim(target_id)
1521 else:
1522 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1523 # just remove it to force load again next time it is needed
1524 self.db_vims.pop(target_id, None)
1525
1526 def _load_vim(self, target_id):
1527 """
1528 Load or reload a vim_account, sdn_controller or wim_account.
1529 Read content from database, load the plugin if not loaded.
1530 In case of error loading the plugin, it load a failing VIM_connector
1531 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1532 :param target_id: Contains type:_id; where type can be 'vim', ...
1533 :return: None if ok, descriptive text if error
1534 """
1535 target, _, _id = target_id.partition(":")
1536 target_database = (
1537 "vim_accounts"
1538 if target == "vim"
1539 else "wim_accounts"
1540 if target == "wim"
1541 else "sdns"
1542 )
1543 plugin_name = ""
1544 vim = None
1545
1546 try:
1547 step = "Getting {}={} from db".format(target, _id)
1548 # TODO process for wim, sdnc, ...
1549 vim = self.db.get_one(target_database, {"_id": _id})
1550
1551 # if deep_get(vim, "config", "sdn-controller"):
1552 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1553 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1554
1555 step = "Decrypting password"
1556 schema_version = vim.get("schema_version")
1557 self.db.encrypt_decrypt_fields(
1558 vim,
1559 "decrypt",
1560 fields=("password", "secret"),
1561 schema_version=schema_version,
1562 salt=_id,
1563 )
1564 self._process_vim_config(target_id, vim)
1565
1566 if target == "vim":
1567 plugin_name = "rovim_" + vim["vim_type"]
1568 step = "Loading plugin '{}'".format(plugin_name)
1569 vim_module_conn = self._load_plugin(plugin_name)
1570 step = "Loading {}'".format(target_id)
1571 self.my_vims[target_id] = vim_module_conn(
1572 uuid=vim["_id"],
1573 name=vim["name"],
1574 tenant_id=vim.get("vim_tenant_id"),
1575 tenant_name=vim.get("vim_tenant_name"),
1576 url=vim["vim_url"],
1577 url_admin=None,
1578 user=vim["vim_user"],
1579 passwd=vim["vim_password"],
1580 config=vim.get("config") or {},
1581 persistent_info={},
1582 )
1583 else: # sdn
1584 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1585 step = "Loading plugin '{}'".format(plugin_name)
1586 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1587 step = "Loading {}'".format(target_id)
1588 wim = deepcopy(vim)
1589 wim_config = wim.pop("config", {}) or {}
1590 wim["uuid"] = wim["_id"]
1591 if "url" in wim and "wim_url" not in wim:
1592 wim["wim_url"] = wim["url"]
1593 elif "url" not in wim and "wim_url" in wim:
1594 wim["url"] = wim["wim_url"]
1595
1596 if wim.get("dpid"):
1597 wim_config["dpid"] = wim.pop("dpid")
1598
1599 if wim.get("switch_id"):
1600 wim_config["switch_id"] = wim.pop("switch_id")
1601
1602 # wim, wim_account, config
1603 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1604 self.db_vims[target_id] = vim
1605 self.error_status = None
1606
1607 self.logger.info(
1608 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1609 )
1610 except Exception as e:
1611 self.logger.error(
1612 "Cannot load {} plugin={}: {} {}".format(
1613 target_id, plugin_name, step, e
1614 )
1615 )
1616
1617 self.db_vims[target_id] = vim or {}
1618 self.db_vims[target_id] = FailingConnector(str(e))
1619 error_status = "{} Error: {}".format(step, e)
1620
1621 return error_status
1622 finally:
1623 if target_id not in self.vim_targets:
1624 self.vim_targets.append(target_id)
1625
1626 def _get_db_task(self):
1627 """
1628 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1629 :return: None
1630 """
1631 now = time.time()
1632
1633 if not self.time_last_task_processed:
1634 self.time_last_task_processed = now
1635
1636 try:
1637 while True:
1638 """
1639 # Log RO tasks only when loglevel is DEBUG
1640 if self.logger.getEffectiveLevel() == logging.DEBUG:
1641 self._log_ro_task(
1642 None,
1643 None,
1644 None,
1645 "TASK_WF",
1646 "task_locked_time="
1647 + str(self.task_locked_time)
1648 + " "
1649 + "time_last_task_processed="
1650 + str(self.time_last_task_processed)
1651 + " "
1652 + "now="
1653 + str(now),
1654 )
1655 """
1656 locked = self.db.set_one(
1657 "ro_tasks",
1658 q_filter={
1659 "target_id": self.vim_targets,
1660 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1661 "locked_at.lt": now - self.task_locked_time,
1662 "to_check_at.lt": self.time_last_task_processed,
1663 },
1664 update_dict={"locked_by": self.my_id, "locked_at": now},
1665 fail_on_empty=False,
1666 )
1667
1668 if locked:
1669 # read and return
1670 ro_task = self.db.get_one(
1671 "ro_tasks",
1672 q_filter={
1673 "target_id": self.vim_targets,
1674 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1675 "locked_at": now,
1676 },
1677 )
1678 return ro_task
1679
1680 if self.time_last_task_processed == now:
1681 self.time_last_task_processed = None
1682 return None
1683 else:
1684 self.time_last_task_processed = now
1685 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1686
1687 except DbException as e:
1688 self.logger.error("Database exception at _get_db_task: {}".format(e))
1689 except Exception as e:
1690 self.logger.critical(
1691 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1692 )
1693
1694 return None
1695
1696 def _get_db_all_tasks(self):
1697 """
1698 Read all content of table ro_tasks to log it
1699 :return: None
1700 """
1701 try:
1702 # Checking the content of the BD:
1703
1704 # read and return
1705 ro_task = self.db.get_list("ro_tasks")
1706 for rt in ro_task:
1707 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1708 return ro_task
1709
1710 except DbException as e:
1711 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1712 except Exception as e:
1713 self.logger.critical(
1714 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1715 )
1716
1717 return None
1718
1719 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1720 """
1721 Generate a log with the following format:
1722
1723 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1724 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1725 task_array_index;task_id;task_action;task_item;task_args
1726
1727 Example:
1728
1729 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1730 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1731 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1732 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1733 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1734 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1735 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1736 """
1737 try:
1738 line = []
1739 i = 0
1740 if ro_task is not None and isinstance(ro_task, dict):
1741 for t in ro_task["tasks"]:
1742 line.clear()
1743 line.append(mark)
1744 line.append(event)
1745 line.append(ro_task.get("_id", ""))
1746 line.append(str(ro_task.get("locked_at", "")))
1747 line.append(str(ro_task.get("modified_at", "")))
1748 line.append(str(ro_task.get("created_at", "")))
1749 line.append(str(ro_task.get("to_check_at", "")))
1750 line.append(str(ro_task.get("locked_by", "")))
1751 line.append(str(ro_task.get("target_id", "")))
1752 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1753 line.append(str(ro_task.get("vim_info", "")))
1754 line.append(str(ro_task.get("tasks", "")))
1755 if isinstance(t, dict):
1756 line.append(str(t.get("status", "")))
1757 line.append(str(t.get("action_id", "")))
1758 line.append(str(i))
1759 line.append(str(t.get("task_id", "")))
1760 line.append(str(t.get("action", "")))
1761 line.append(str(t.get("item", "")))
1762 line.append(str(t.get("find_params", "")))
1763 line.append(str(t.get("params", "")))
1764 else:
1765 line.extend([""] * 2)
1766 line.append(str(i))
1767 line.extend([""] * 5)
1768
1769 i += 1
1770 self.logger.debug(";".join(line))
1771 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1772 i = 0
1773 while True:
1774 st = "tasks.{}.status".format(i)
1775 if st not in db_ro_task_update:
1776 break
1777 line.clear()
1778 line.append(mark)
1779 line.append(event)
1780 line.append(db_ro_task_update.get("_id", ""))
1781 line.append(str(db_ro_task_update.get("locked_at", "")))
1782 line.append(str(db_ro_task_update.get("modified_at", "")))
1783 line.append("")
1784 line.append(str(db_ro_task_update.get("to_check_at", "")))
1785 line.append(str(db_ro_task_update.get("locked_by", "")))
1786 line.append("")
1787 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1788 line.append("")
1789 line.append(str(db_ro_task_update.get("vim_info", "")))
1790 line.append(str(str(db_ro_task_update).count(".status")))
1791 line.append(db_ro_task_update.get(st, ""))
1792 line.append("")
1793 line.append(str(i))
1794 line.extend([""] * 3)
1795 i += 1
1796 self.logger.debug(";".join(line))
1797
1798 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1799 line.clear()
1800 line.append(mark)
1801 line.append(event)
1802 line.append(db_ro_task_delete.get("_id", ""))
1803 line.append("")
1804 line.append(db_ro_task_delete.get("modified_at", ""))
1805 line.extend([""] * 13)
1806 self.logger.debug(";".join(line))
1807
1808 else:
1809 line.clear()
1810 line.append(mark)
1811 line.append(event)
1812 line.extend([""] * 16)
1813 self.logger.debug(";".join(line))
1814
1815 except Exception as e:
1816 self.logger.error("Error logging ro_task: {}".format(e))
1817
1818 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1819 """
1820 Determine if this task need to be done or superseded
1821 :return: None
1822 """
1823 my_task = ro_task["tasks"][task_index]
1824 task_id = my_task["task_id"]
1825 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1826 "created_items", False
1827 )
1828
1829 if my_task["status"] == "FAILED":
1830 return None, None # TODO need to be retry??
1831
1832 try:
1833 for index, task in enumerate(ro_task["tasks"]):
1834 if index == task_index or not task:
1835 continue # own task
1836
1837 if (
1838 my_task["target_record"] == task["target_record"]
1839 and task["action"] == "CREATE"
1840 ):
1841 # set to finished
1842 db_update["tasks.{}.status".format(index)] = task[
1843 "status"
1844 ] = "FINISHED"
1845 elif task["action"] == "CREATE" and task["status"] not in (
1846 "FINISHED",
1847 "SUPERSEDED",
1848 ):
1849 needed_delete = False
1850
1851 if needed_delete:
1852 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1853 else:
1854 return "SUPERSEDED", None
1855 except Exception as e:
1856 if not isinstance(e, NsWorkerException):
1857 self.logger.critical(
1858 "Unexpected exception at _delete_task task={}: {}".format(
1859 task_id, e
1860 ),
1861 exc_info=True,
1862 )
1863
1864 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1865
1866 def _create_task(self, ro_task, task_index, task_depends, db_update):
1867 """
1868 Determine if this task need to create something at VIM
1869 :return: None
1870 """
1871 my_task = ro_task["tasks"][task_index]
1872 task_id = my_task["task_id"]
1873 task_status = None
1874
1875 if my_task["status"] == "FAILED":
1876 return None, None # TODO need to be retry??
1877 elif my_task["status"] == "SCHEDULED":
1878 # check if already created by another task
1879 for index, task in enumerate(ro_task["tasks"]):
1880 if index == task_index or not task:
1881 continue # own task
1882
1883 if task["action"] == "CREATE" and task["status"] not in (
1884 "SCHEDULED",
1885 "FINISHED",
1886 "SUPERSEDED",
1887 ):
1888 return task["status"], "COPY_VIM_INFO"
1889
1890 try:
1891 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1892 ro_task, task_index, task_depends
1893 )
1894 # TODO update other CREATE tasks
1895 except Exception as e:
1896 if not isinstance(e, NsWorkerException):
1897 self.logger.error(
1898 "Error executing task={}: {}".format(task_id, e), exc_info=True
1899 )
1900
1901 task_status = "FAILED"
1902 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1903 # TODO update ro_vim_item_update
1904
1905 return task_status, ro_vim_item_update
1906 else:
1907 return None, None
1908
1909 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1910 """
1911 Look for dependency task
1912 :param task_id: Can be one of
1913 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1914 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1915 3. task.task_id: "<action_id>:number"
1916 :param ro_task:
1917 :param target_id:
1918 :return: database ro_task plus index of task
1919 """
1920 if (
1921 task_id.startswith("vim:")
1922 or task_id.startswith("sdn:")
1923 or task_id.startswith("wim:")
1924 ):
1925 target_id, _, task_id = task_id.partition(" ")
1926
1927 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1928 ro_task_dependency = self.db.get_one(
1929 "ro_tasks",
1930 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1931 fail_on_empty=False,
1932 )
1933
1934 if ro_task_dependency:
1935 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1936 if task["target_record_id"] == task_id:
1937 return ro_task_dependency, task_index
1938
1939 else:
1940 if ro_task:
1941 for task_index, task in enumerate(ro_task["tasks"]):
1942 if task and task["task_id"] == task_id:
1943 return ro_task, task_index
1944
1945 ro_task_dependency = self.db.get_one(
1946 "ro_tasks",
1947 q_filter={
1948 "tasks.ANYINDEX.task_id": task_id,
1949 "tasks.ANYINDEX.target_record.ne": None,
1950 },
1951 fail_on_empty=False,
1952 )
1953
1954 if ro_task_dependency:
1955 for task_index, task in ro_task_dependency["tasks"]:
1956 if task["task_id"] == task_id:
1957 return ro_task_dependency, task_index
1958 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1959
1960 def _process_pending_tasks(self, ro_task):
1961 ro_task_id = ro_task["_id"]
1962 now = time.time()
1963 # one day
1964 next_check_at = now + (24 * 60 * 60)
1965 db_ro_task_update = {}
1966
1967 def _update_refresh(new_status):
1968 # compute next_refresh
1969 nonlocal task
1970 nonlocal next_check_at
1971 nonlocal db_ro_task_update
1972 nonlocal ro_task
1973
1974 next_refresh = time.time()
1975
1976 if task["item"] in ("image", "flavor"):
1977 next_refresh += self.REFRESH_IMAGE
1978 elif new_status == "BUILD":
1979 next_refresh += self.REFRESH_BUILD
1980 elif new_status == "DONE":
1981 next_refresh += self.REFRESH_ACTIVE
1982 else:
1983 next_refresh += self.REFRESH_ERROR
1984
1985 next_check_at = min(next_check_at, next_refresh)
1986 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1987 ro_task["vim_info"]["refresh_at"] = next_refresh
1988
1989 try:
1990 """
1991 # Log RO tasks only when loglevel is DEBUG
1992 if self.logger.getEffectiveLevel() == logging.DEBUG:
1993 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1994 """
1995 # 0: get task_status_create
1996 lock_object = None
1997 task_status_create = None
1998 task_create = next(
1999 (
2000 t
2001 for t in ro_task["tasks"]
2002 if t
2003 and t["action"] == "CREATE"
2004 and t["status"] in ("BUILD", "DONE")
2005 ),
2006 None,
2007 )
2008
2009 if task_create:
2010 task_status_create = task_create["status"]
2011
2012 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2013 for task_action in ("DELETE", "CREATE", "EXEC"):
2014 db_vim_update = None
2015 new_status = None
2016
2017 for task_index, task in enumerate(ro_task["tasks"]):
2018 if not task:
2019 continue # task deleted
2020
2021 task_depends = {}
2022 target_update = None
2023
2024 if (
2025 (
2026 task_action in ("DELETE", "EXEC")
2027 and task["status"] not in ("SCHEDULED", "BUILD")
2028 )
2029 or task["action"] != task_action
2030 or (
2031 task_action == "CREATE"
2032 and task["status"] in ("FINISHED", "SUPERSEDED")
2033 )
2034 ):
2035 continue
2036
2037 task_path = "tasks.{}.status".format(task_index)
2038 try:
2039 db_vim_info_update = None
2040
2041 if task["status"] == "SCHEDULED":
2042 # check if tasks that this depends on have been completed
2043 dependency_not_completed = False
2044
2045 for dependency_task_id in task.get("depends_on") or ():
2046 (
2047 dependency_ro_task,
2048 dependency_task_index,
2049 ) = self._get_dependency(
2050 dependency_task_id, target_id=ro_task["target_id"]
2051 )
2052 dependency_task = dependency_ro_task["tasks"][
2053 dependency_task_index
2054 ]
2055
2056 if dependency_task["status"] == "SCHEDULED":
2057 dependency_not_completed = True
2058 next_check_at = min(
2059 next_check_at, dependency_ro_task["to_check_at"]
2060 )
2061 # must allow dependent task to be processed first
2062 # to do this set time after last_task_processed
2063 next_check_at = max(
2064 self.time_last_task_processed, next_check_at
2065 )
2066 break
2067 elif dependency_task["status"] == "FAILED":
2068 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2069 task["action"],
2070 task["item"],
2071 dependency_task["action"],
2072 dependency_task["item"],
2073 dependency_task_id,
2074 dependency_ro_task["vim_info"].get(
2075 "vim_details"
2076 ),
2077 )
2078 self.logger.error(
2079 "task={} {}".format(task["task_id"], error_text)
2080 )
2081 raise NsWorkerException(error_text)
2082
2083 task_depends[dependency_task_id] = dependency_ro_task[
2084 "vim_info"
2085 ]["vim_id"]
2086 task_depends[
2087 "TASK-{}".format(dependency_task_id)
2088 ] = dependency_ro_task["vim_info"]["vim_id"]
2089
2090 if dependency_not_completed:
2091 # TODO set at vim_info.vim_details that it is waiting
2092 continue
2093
2094 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2095 # the task of renew this locking. It will update database locket_at periodically
2096 if not lock_object:
2097 lock_object = LockRenew.add_lock_object(
2098 "ro_tasks", ro_task, self
2099 )
2100
2101 if task["action"] == "DELETE":
2102 (new_status, db_vim_info_update,) = self._delete_task(
2103 ro_task, task_index, task_depends, db_ro_task_update
2104 )
2105 new_status = (
2106 "FINISHED" if new_status == "DONE" else new_status
2107 )
2108 # ^with FINISHED instead of DONE it will not be refreshing
2109
2110 if new_status in ("FINISHED", "SUPERSEDED"):
2111 target_update = "DELETE"
2112 elif task["action"] == "EXEC":
2113 (
2114 new_status,
2115 db_vim_info_update,
2116 db_task_update,
2117 ) = self.item2class[task["item"]].exec(
2118 ro_task, task_index, task_depends
2119 )
2120 new_status = (
2121 "FINISHED" if new_status == "DONE" else new_status
2122 )
2123 # ^with FINISHED instead of DONE it will not be refreshing
2124
2125 if db_task_update:
2126 # load into database the modified db_task_update "retries" and "next_retry"
2127 if db_task_update.get("retries"):
2128 db_ro_task_update[
2129 "tasks.{}.retries".format(task_index)
2130 ] = db_task_update["retries"]
2131
2132 next_check_at = time.time() + db_task_update.get(
2133 "next_retry", 60
2134 )
2135 target_update = None
2136 elif task["action"] == "CREATE":
2137 if task["status"] == "SCHEDULED":
2138 if task_status_create:
2139 new_status = task_status_create
2140 target_update = "COPY_VIM_INFO"
2141 else:
2142 new_status, db_vim_info_update = self.item2class[
2143 task["item"]
2144 ].new(ro_task, task_index, task_depends)
2145 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2146 _update_refresh(new_status)
2147 else:
2148 if (
2149 ro_task["vim_info"]["refresh_at"]
2150 and now > ro_task["vim_info"]["refresh_at"]
2151 ):
2152 new_status, db_vim_info_update = self.item2class[
2153 task["item"]
2154 ].refresh(ro_task)
2155 _update_refresh(new_status)
2156 else:
2157 # The refresh is updated to avoid set the value of "refresh_at" to
2158 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2159 # because it can happen that in this case the task is never processed
2160 _update_refresh(task["status"])
2161
2162 except Exception as e:
2163 new_status = "FAILED"
2164 db_vim_info_update = {
2165 "vim_status": "VIM_ERROR",
2166 "vim_details": str(e),
2167 }
2168
2169 if not isinstance(
2170 e, (NsWorkerException, vimconn.VimConnException)
2171 ):
2172 self.logger.error(
2173 "Unexpected exception at _delete_task task={}: {}".format(
2174 task["task_id"], e
2175 ),
2176 exc_info=True,
2177 )
2178
2179 try:
2180 if db_vim_info_update:
2181 db_vim_update = db_vim_info_update.copy()
2182 db_ro_task_update.update(
2183 {
2184 "vim_info." + k: v
2185 for k, v in db_vim_info_update.items()
2186 }
2187 )
2188 ro_task["vim_info"].update(db_vim_info_update)
2189
2190 if new_status:
2191 if task_action == "CREATE":
2192 task_status_create = new_status
2193 db_ro_task_update[task_path] = new_status
2194
2195 if target_update or db_vim_update:
2196 if target_update == "DELETE":
2197 self._update_target(task, None)
2198 elif target_update == "COPY_VIM_INFO":
2199 self._update_target(task, ro_task["vim_info"])
2200 else:
2201 self._update_target(task, db_vim_update)
2202
2203 except Exception as e:
2204 if (
2205 isinstance(e, DbException)
2206 and e.http_code == HTTPStatus.NOT_FOUND
2207 ):
2208 # if the vnfrs or nsrs has been removed from database, this task must be removed
2209 self.logger.debug(
2210 "marking to delete task={}".format(task["task_id"])
2211 )
2212 self.tasks_to_delete.append(task)
2213 else:
2214 self.logger.error(
2215 "Unexpected exception at _update_target task={}: {}".format(
2216 task["task_id"], e
2217 ),
2218 exc_info=True,
2219 )
2220
2221 locked_at = ro_task["locked_at"]
2222
2223 if lock_object:
2224 locked_at = [
2225 lock_object["locked_at"],
2226 lock_object["locked_at"] + self.task_locked_time,
2227 ]
2228 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2229 # contain exactly locked_at + self.task_locked_time
2230 LockRenew.remove_lock_object(lock_object)
2231
2232 q_filter = {
2233 "_id": ro_task["_id"],
2234 "to_check_at": ro_task["to_check_at"],
2235 "locked_at": locked_at,
2236 }
2237 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2238 # outside this task (by ro_nbi) do not update it
2239 db_ro_task_update["locked_by"] = None
2240 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2241 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2242 db_ro_task_update["modified_at"] = now
2243 db_ro_task_update["to_check_at"] = next_check_at
2244
2245 """
2246 # Log RO tasks only when loglevel is DEBUG
2247 if self.logger.getEffectiveLevel() == logging.DEBUG:
2248 db_ro_task_update_log = db_ro_task_update.copy()
2249 db_ro_task_update_log["_id"] = q_filter["_id"]
2250 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2251 """
2252
2253 if not self.db.set_one(
2254 "ro_tasks",
2255 update_dict=db_ro_task_update,
2256 q_filter=q_filter,
2257 fail_on_empty=False,
2258 ):
2259 del db_ro_task_update["to_check_at"]
2260 del q_filter["to_check_at"]
2261 """
2262 # Log RO tasks only when loglevel is DEBUG
2263 if self.logger.getEffectiveLevel() == logging.DEBUG:
2264 self._log_ro_task(
2265 None,
2266 db_ro_task_update_log,
2267 None,
2268 "TASK_WF",
2269 "SET_TASK " + str(q_filter),
2270 )
2271 """
2272 self.db.set_one(
2273 "ro_tasks",
2274 q_filter=q_filter,
2275 update_dict=db_ro_task_update,
2276 fail_on_empty=True,
2277 )
2278 except DbException as e:
2279 self.logger.error(
2280 "ro_task={} Error updating database {}".format(ro_task_id, e)
2281 )
2282 except Exception as e:
2283 self.logger.error(
2284 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2285 )
2286
2287 def _update_target(self, task, ro_vim_item_update):
2288 table, _, temp = task["target_record"].partition(":")
2289 _id, _, path_vim_status = temp.partition(":")
2290 path_item = path_vim_status[: path_vim_status.rfind(".")]
2291 path_item = path_item[: path_item.rfind(".")]
2292 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2293 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2294
2295 if ro_vim_item_update:
2296 update_dict = {
2297 path_vim_status + "." + k: v
2298 for k, v in ro_vim_item_update.items()
2299 if k
2300 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2301 }
2302
2303 if path_vim_status.startswith("vdur."):
2304 # for backward compatibility, add vdur.name apart from vdur.vim_name
2305 if ro_vim_item_update.get("vim_name"):
2306 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2307
2308 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2309 if ro_vim_item_update.get("vim_id"):
2310 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2311
2312 # update general status
2313 if ro_vim_item_update.get("vim_status"):
2314 update_dict[path_item + ".status"] = ro_vim_item_update[
2315 "vim_status"
2316 ]
2317
2318 if ro_vim_item_update.get("interfaces"):
2319 path_interfaces = path_item + ".interfaces"
2320
2321 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2322 if iface:
2323 update_dict.update(
2324 {
2325 path_interfaces + ".{}.".format(i) + k: v
2326 for k, v in iface.items()
2327 if k in ("vlan", "compute_node", "pci")
2328 }
2329 )
2330
2331 # put ip_address and mac_address with ip-address and mac-address
2332 if iface.get("ip_address"):
2333 update_dict[
2334 path_interfaces + ".{}.".format(i) + "ip-address"
2335 ] = iface["ip_address"]
2336
2337 if iface.get("mac_address"):
2338 update_dict[
2339 path_interfaces + ".{}.".format(i) + "mac-address"
2340 ] = iface["mac_address"]
2341
2342 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2343 update_dict["ip-address"] = iface.get("ip_address").split(
2344 ";"
2345 )[0]
2346
2347 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2348 update_dict[path_item + ".ip-address"] = iface.get(
2349 "ip_address"
2350 ).split(";")[0]
2351
2352 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2353 else:
2354 update_dict = {path_item + ".status": "DELETED"}
2355 self.db.set_one(
2356 table,
2357 q_filter={"_id": _id},
2358 update_dict=update_dict,
2359 unset={path_vim_status: None},
2360 )
2361
2362 def _process_delete_db_tasks(self):
2363 """
2364 Delete task from database because vnfrs or nsrs or both have been deleted
2365 :return: None. Uses and modify self.tasks_to_delete
2366 """
2367 while self.tasks_to_delete:
2368 task = self.tasks_to_delete[0]
2369 vnfrs_deleted = None
2370 nsr_id = task["nsr_id"]
2371
2372 if task["target_record"].startswith("vnfrs:"):
2373 # check if nsrs is present
2374 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2375 vnfrs_deleted = task["target_record"].split(":")[1]
2376
2377 try:
2378 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2379 except Exception as e:
2380 self.logger.error(
2381 "Error deleting task={}: {}".format(task["task_id"], e)
2382 )
2383 self.tasks_to_delete.pop(0)
2384
2385 @staticmethod
2386 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2387 """
2388 Static method because it is called from osm_ng_ro.ns
2389 :param db: instance of database to use
2390 :param nsr_id: affected nsrs id
2391 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2392 :return: None, exception is fails
2393 """
2394 retries = 5
2395 for retry in range(retries):
2396 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2397 now = time.time()
2398 conflict = False
2399
2400 for ro_task in ro_tasks:
2401 db_update = {}
2402 to_delete_ro_task = True
2403
2404 for index, task in enumerate(ro_task["tasks"]):
2405 if not task:
2406 pass
2407 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2408 vnfrs_deleted
2409 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2410 ):
2411 db_update["tasks.{}".format(index)] = None
2412 else:
2413 # used by other nsr, ro_task cannot be deleted
2414 to_delete_ro_task = False
2415
2416 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2417 if to_delete_ro_task:
2418 if not db.del_one(
2419 "ro_tasks",
2420 q_filter={
2421 "_id": ro_task["_id"],
2422 "modified_at": ro_task["modified_at"],
2423 },
2424 fail_on_empty=False,
2425 ):
2426 conflict = True
2427 elif db_update:
2428 db_update["modified_at"] = now
2429 if not db.set_one(
2430 "ro_tasks",
2431 q_filter={
2432 "_id": ro_task["_id"],
2433 "modified_at": ro_task["modified_at"],
2434 },
2435 update_dict=db_update,
2436 fail_on_empty=False,
2437 ):
2438 conflict = True
2439 if not conflict:
2440 return
2441 else:
2442 raise NsWorkerException("Exceeded {} retries".format(retries))
2443
2444 def run(self):
2445 # load database
2446 self.logger.info("Starting")
2447 while True:
2448 # step 1: get commands from queue
2449 try:
2450 if self.vim_targets:
2451 task = self.task_queue.get(block=False)
2452 else:
2453 if not self.idle:
2454 self.logger.debug("enters in idle state")
2455 self.idle = True
2456 task = self.task_queue.get(block=True)
2457 self.idle = False
2458
2459 if task[0] == "terminate":
2460 break
2461 elif task[0] == "load_vim":
2462 self.logger.info("order to load vim {}".format(task[1]))
2463 self._load_vim(task[1])
2464 elif task[0] == "unload_vim":
2465 self.logger.info("order to unload vim {}".format(task[1]))
2466 self._unload_vim(task[1])
2467 elif task[0] == "reload_vim":
2468 self._reload_vim(task[1])
2469 elif task[0] == "check_vim":
2470 self.logger.info("order to check vim {}".format(task[1]))
2471 self._check_vim(task[1])
2472 continue
2473 except Exception as e:
2474 if isinstance(e, queue.Empty):
2475 pass
2476 else:
2477 self.logger.critical(
2478 "Error processing task: {}".format(e), exc_info=True
2479 )
2480
2481 # step 2: process pending_tasks, delete not needed tasks
2482 try:
2483 if self.tasks_to_delete:
2484 self._process_delete_db_tasks()
2485 busy = False
2486 """
2487 # Log RO tasks only when loglevel is DEBUG
2488 if self.logger.getEffectiveLevel() == logging.DEBUG:
2489 _ = self._get_db_all_tasks()
2490 """
2491 ro_task = self._get_db_task()
2492 if ro_task:
2493 self._process_pending_tasks(ro_task)
2494 busy = True
2495 if not busy:
2496 time.sleep(5)
2497 except Exception as e:
2498 self.logger.critical(
2499 "Unexpected exception at run: " + str(e), exc_info=True
2500 )
2501
2502 self.logger.info("Finishing")