2950aa9b287156c86657b0ab2b0ef7c5419c8189
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45 __author__ = "Alfonso Tierno"
46 __date__ = "$28-Sep-2017 12:07:15$"
47
48
49 def deep_get(target_dict, *args, **kwargs):
50 """
51 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
52 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
53 :param target_dict: dictionary to be read
54 :param args: list of keys to read from target_dict
55 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
56 :return: The wanted value if exist, None or default otherwise
57 """
58 for key in args:
59 if not isinstance(target_dict, dict) or key not in target_dict:
60 return kwargs.get("default")
61 target_dict = target_dict[key]
62 return target_dict
63
64
65 class NsWorkerException(Exception):
66 pass
67
68
69 class FailingConnector:
70 def __init__(self, error_msg):
71 self.error_msg = error_msg
72
73 for method in dir(vimconn.VimConnector):
74 if method[0] != "_":
75 setattr(
76 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
77 )
78
79 for method in dir(sdnconn.SdnConnectorBase):
80 if method[0] != "_":
81 setattr(
82 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
83 )
84
85
86 class NsWorkerExceptionNotFound(NsWorkerException):
87 pass
88
89
90 class VimInteractionBase:
91 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
92 It implements methods that does nothing and return ok"""
93
94 def __init__(self, db, my_vims, db_vims, logger):
95 self.db = db
96 self.logger = logger
97 self.my_vims = my_vims
98 self.db_vims = db_vims
99
100 def new(self, ro_task, task_index, task_depends):
101 return "BUILD", {}
102
103 def refresh(self, ro_task):
104 """skip calling VIM to get image, flavor status. Assumes ok"""
105 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
106 return "FAILED", {}
107
108 return "DONE", {}
109
110 def delete(self, ro_task, task_index):
111 """skip calling VIM to delete image. Assumes ok"""
112 return "DONE", {}
113
114 def exec(self, ro_task, task_index, task_depends):
115 return "DONE", None, None
116
117
118 class VimInteractionNet(VimInteractionBase):
119 def new(self, ro_task, task_index, task_depends):
120 vim_net_id = None
121 task = ro_task["tasks"][task_index]
122 task_id = task["task_id"]
123 created = False
124 created_items = {}
125 target_vim = self.my_vims[ro_task["target_id"]]
126 mgmtnet = False
127 mgmtnet_defined_in_vim = False
128
129 try:
130 # FIND
131 if task.get("find_params"):
132 # if management, get configuration of VIM
133 if task["find_params"].get("filter_dict"):
134 vim_filter = task["find_params"]["filter_dict"]
135 # management network
136 elif task["find_params"].get("mgmt"):
137 mgmtnet = True
138 if deep_get(
139 self.db_vims[ro_task["target_id"]],
140 "config",
141 "management_network_id",
142 ):
143 mgmtnet_defined_in_vim = True
144 vim_filter = {
145 "id": self.db_vims[ro_task["target_id"]]["config"][
146 "management_network_id"
147 ]
148 }
149 elif deep_get(
150 self.db_vims[ro_task["target_id"]],
151 "config",
152 "management_network_name",
153 ):
154 mgmtnet_defined_in_vim = True
155 vim_filter = {
156 "name": self.db_vims[ro_task["target_id"]]["config"][
157 "management_network_name"
158 ]
159 }
160 else:
161 vim_filter = {"name": task["find_params"]["name"]}
162 else:
163 raise NsWorkerExceptionNotFound(
164 "Invalid find_params for new_net {}".format(task["find_params"])
165 )
166
167 vim_nets = target_vim.get_network_list(vim_filter)
168 if not vim_nets and not task.get("params"):
169 # If there is mgmt-network in the descriptor,
170 # there is no mapping of that network to a VIM network in the descriptor,
171 # also there is no mapping in the "--config" parameter or at VIM creation;
172 # that mgmt-network will be created.
173 if mgmtnet and not mgmtnet_defined_in_vim:
174 net_name = (
175 vim_filter.get("name")
176 if vim_filter.get("name")
177 else vim_filter.get("id")[:16]
178 )
179 vim_net_id, created_items = target_vim.new_network(
180 net_name, None
181 )
182 self.logger.debug(
183 "Created mgmt network vim_net_id: {}".format(vim_net_id)
184 )
185 created = True
186 else:
187 raise NsWorkerExceptionNotFound(
188 "Network not found with this criteria: '{}'".format(
189 task.get("find_params")
190 )
191 )
192 elif len(vim_nets) > 1:
193 raise NsWorkerException(
194 "More than one network found with this criteria: '{}'".format(
195 task["find_params"]
196 )
197 )
198
199 if vim_nets:
200 vim_net_id = vim_nets[0]["id"]
201 else:
202 # CREATE
203 params = task["params"]
204 vim_net_id, created_items = target_vim.new_network(**params)
205 created = True
206
207 ro_vim_item_update = {
208 "vim_id": vim_net_id,
209 "vim_status": "BUILD",
210 "created": created,
211 "created_items": created_items,
212 "vim_details": None,
213 }
214 self.logger.debug(
215 "task={} {} new-net={} created={}".format(
216 task_id, ro_task["target_id"], vim_net_id, created
217 )
218 )
219
220 return "BUILD", ro_vim_item_update
221 except (vimconn.VimConnException, NsWorkerException) as e:
222 self.logger.error(
223 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
224 )
225 ro_vim_item_update = {
226 "vim_status": "VIM_ERROR",
227 "created": created,
228 "vim_details": str(e),
229 }
230
231 return "FAILED", ro_vim_item_update
232
233 def refresh(self, ro_task):
234 """Call VIM to get network status"""
235 ro_task_id = ro_task["_id"]
236 target_vim = self.my_vims[ro_task["target_id"]]
237 vim_id = ro_task["vim_info"]["vim_id"]
238 net_to_refresh_list = [vim_id]
239
240 try:
241 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
242 vim_info = vim_dict[vim_id]
243
244 if vim_info["status"] == "ACTIVE":
245 task_status = "DONE"
246 elif vim_info["status"] == "BUILD":
247 task_status = "BUILD"
248 else:
249 task_status = "FAILED"
250 except vimconn.VimConnException as e:
251 # Mark all tasks at VIM_ERROR status
252 self.logger.error(
253 "ro_task={} vim={} get-net={}: {}".format(
254 ro_task_id, ro_task["target_id"], vim_id, e
255 )
256 )
257 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
258 task_status = "FAILED"
259
260 ro_vim_item_update = {}
261 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
262 ro_vim_item_update["vim_status"] = vim_info["status"]
263
264 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
265 ro_vim_item_update["vim_name"] = vim_info.get("name")
266
267 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
268 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
269 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
270 elif vim_info["status"] == "DELETED":
271 ro_vim_item_update["vim_id"] = None
272 ro_vim_item_update["vim_details"] = "Deleted externally"
273 else:
274 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
275 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
276
277 if ro_vim_item_update:
278 self.logger.debug(
279 "ro_task={} {} get-net={}: status={} {}".format(
280 ro_task_id,
281 ro_task["target_id"],
282 vim_id,
283 ro_vim_item_update.get("vim_status"),
284 ro_vim_item_update.get("vim_details")
285 if ro_vim_item_update.get("vim_status") != "ACTIVE"
286 else "",
287 )
288 )
289
290 return task_status, ro_vim_item_update
291
292 def delete(self, ro_task, task_index):
293 task = ro_task["tasks"][task_index]
294 task_id = task["task_id"]
295 net_vim_id = ro_task["vim_info"]["vim_id"]
296 ro_vim_item_update_ok = {
297 "vim_status": "DELETED",
298 "created": False,
299 "vim_details": "DELETED",
300 "vim_id": None,
301 }
302
303 try:
304 if net_vim_id or ro_task["vim_info"]["created_items"]:
305 target_vim = self.my_vims[ro_task["target_id"]]
306 target_vim.delete_network(
307 net_vim_id, ro_task["vim_info"]["created_items"]
308 )
309 except vimconn.VimConnNotFoundException:
310 ro_vim_item_update_ok["vim_details"] = "already deleted"
311 except vimconn.VimConnException as e:
312 self.logger.error(
313 "ro_task={} vim={} del-net={}: {}".format(
314 ro_task["_id"], ro_task["target_id"], net_vim_id, e
315 )
316 )
317 ro_vim_item_update = {
318 "vim_status": "VIM_ERROR",
319 "vim_details": "Error while deleting: {}".format(e),
320 }
321
322 return "FAILED", ro_vim_item_update
323
324 self.logger.debug(
325 "task={} {} del-net={} {}".format(
326 task_id,
327 ro_task["target_id"],
328 net_vim_id,
329 ro_vim_item_update_ok.get("vim_details", ""),
330 )
331 )
332
333 return "DONE", ro_vim_item_update_ok
334
335
336 class VimInteractionVdu(VimInteractionBase):
337 max_retries_inject_ssh_key = 20 # 20 times
338 time_retries_inject_ssh_key = 30 # wevery 30 seconds
339
340 def new(self, ro_task, task_index, task_depends):
341 task = ro_task["tasks"][task_index]
342 task_id = task["task_id"]
343 created = False
344 created_items = {}
345 target_vim = self.my_vims[ro_task["target_id"]]
346
347 try:
348 created = True
349 params = task["params"]
350 params_copy = deepcopy(params)
351 net_list = params_copy["net_list"]
352
353 for net in net_list:
354 # change task_id into network_id
355 if "net_id" in net and net["net_id"].startswith("TASK-"):
356 network_id = task_depends[net["net_id"]]
357
358 if not network_id:
359 raise NsWorkerException(
360 "Cannot create VM because depends on a network not created or found "
361 "for {}".format(net["net_id"])
362 )
363
364 net["net_id"] = network_id
365
366 if params_copy["image_id"].startswith("TASK-"):
367 params_copy["image_id"] = task_depends[params_copy["image_id"]]
368
369 if params_copy["flavor_id"].startswith("TASK-"):
370 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
371
372 affinity_group_list = params_copy["affinity_group_list"]
373 for affinity_group in affinity_group_list:
374 # change task_id into affinity_group_id
375 if "affinity_group_id" in affinity_group and affinity_group[
376 "affinity_group_id"
377 ].startswith("TASK-"):
378 affinity_group_id = task_depends[
379 affinity_group["affinity_group_id"]
380 ]
381
382 if not affinity_group_id:
383 raise NsWorkerException(
384 "found for {}".format(affinity_group["affinity_group_id"])
385 )
386
387 affinity_group["affinity_group_id"] = affinity_group_id
388
389 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
390 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
391
392 ro_vim_item_update = {
393 "vim_id": vim_vm_id,
394 "vim_status": "BUILD",
395 "created": created,
396 "created_items": created_items,
397 "vim_details": None,
398 "interfaces_vim_ids": interfaces,
399 "interfaces": [],
400 }
401 self.logger.debug(
402 "task={} {} new-vm={} created={}".format(
403 task_id, ro_task["target_id"], vim_vm_id, created
404 )
405 )
406
407 return "BUILD", ro_vim_item_update
408 except (vimconn.VimConnException, NsWorkerException) as e:
409 self.logger.error(
410 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
411 )
412 ro_vim_item_update = {
413 "vim_status": "VIM_ERROR",
414 "created": created,
415 "vim_details": str(e),
416 }
417
418 return "FAILED", ro_vim_item_update
419
420 def delete(self, ro_task, task_index):
421 task = ro_task["tasks"][task_index]
422 task_id = task["task_id"]
423 vm_vim_id = ro_task["vim_info"]["vim_id"]
424 ro_vim_item_update_ok = {
425 "vim_status": "DELETED",
426 "created": False,
427 "vim_details": "DELETED",
428 "vim_id": None,
429 }
430
431 try:
432 self.logger.debug(
433 "delete_vminstance: vm_vim_id={} created_items={}".format(
434 vm_vim_id, ro_task["vim_info"]["created_items"]
435 )
436 )
437 if vm_vim_id or ro_task["vim_info"]["created_items"]:
438 target_vim = self.my_vims[ro_task["target_id"]]
439 target_vim.delete_vminstance(
440 vm_vim_id,
441 ro_task["vim_info"]["created_items"],
442 ro_task["vim_info"].get("volumes_to_hold", []),
443 )
444 except vimconn.VimConnNotFoundException:
445 ro_vim_item_update_ok["vim_details"] = "already deleted"
446 except vimconn.VimConnException as e:
447 self.logger.error(
448 "ro_task={} vim={} del-vm={}: {}".format(
449 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
450 )
451 )
452 ro_vim_item_update = {
453 "vim_status": "VIM_ERROR",
454 "vim_details": "Error while deleting: {}".format(e),
455 }
456
457 return "FAILED", ro_vim_item_update
458
459 self.logger.debug(
460 "task={} {} del-vm={} {}".format(
461 task_id,
462 ro_task["target_id"],
463 vm_vim_id,
464 ro_vim_item_update_ok.get("vim_details", ""),
465 )
466 )
467
468 return "DONE", ro_vim_item_update_ok
469
470 def refresh(self, ro_task):
471 """Call VIM to get vm status"""
472 ro_task_id = ro_task["_id"]
473 target_vim = self.my_vims[ro_task["target_id"]]
474 vim_id = ro_task["vim_info"]["vim_id"]
475
476 if not vim_id:
477 return None, None
478
479 vm_to_refresh_list = [vim_id]
480 try:
481 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
482 vim_info = vim_dict[vim_id]
483
484 if vim_info["status"] == "ACTIVE":
485 task_status = "DONE"
486 elif vim_info["status"] == "BUILD":
487 task_status = "BUILD"
488 else:
489 task_status = "FAILED"
490
491 # try to load and parse vim_information
492 try:
493 vim_info_info = yaml.safe_load(vim_info["vim_info"])
494 if vim_info_info.get("name"):
495 vim_info["name"] = vim_info_info["name"]
496 except Exception:
497 pass
498 except vimconn.VimConnException as e:
499 # Mark all tasks at VIM_ERROR status
500 self.logger.error(
501 "ro_task={} vim={} get-vm={}: {}".format(
502 ro_task_id, ro_task["target_id"], vim_id, e
503 )
504 )
505 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
506 task_status = "FAILED"
507
508 ro_vim_item_update = {}
509
510 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
511 vim_interfaces = []
512 if vim_info.get("interfaces"):
513 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
514 iface = next(
515 (
516 iface
517 for iface in vim_info["interfaces"]
518 if vim_iface_id == iface["vim_interface_id"]
519 ),
520 None,
521 )
522 # if iface:
523 # iface.pop("vim_info", None)
524 vim_interfaces.append(iface)
525
526 task_create = next(
527 t
528 for t in ro_task["tasks"]
529 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
530 )
531 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
532 vim_interfaces[task_create["mgmt_vnf_interface"]][
533 "mgmt_vnf_interface"
534 ] = True
535
536 mgmt_vdu_iface = task_create.get(
537 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
538 )
539 if vim_interfaces:
540 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
541
542 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
543 ro_vim_item_update["interfaces"] = vim_interfaces
544
545 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
546 ro_vim_item_update["vim_status"] = vim_info["status"]
547
548 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
549 ro_vim_item_update["vim_name"] = vim_info.get("name")
550
551 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
552 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
553 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
554 elif vim_info["status"] == "DELETED":
555 ro_vim_item_update["vim_id"] = None
556 ro_vim_item_update["vim_details"] = "Deleted externally"
557 else:
558 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
559 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
560
561 if ro_vim_item_update:
562 self.logger.debug(
563 "ro_task={} {} get-vm={}: status={} {}".format(
564 ro_task_id,
565 ro_task["target_id"],
566 vim_id,
567 ro_vim_item_update.get("vim_status"),
568 ro_vim_item_update.get("vim_details")
569 if ro_vim_item_update.get("vim_status") != "ACTIVE"
570 else "",
571 )
572 )
573
574 return task_status, ro_vim_item_update
575
576 def exec(self, ro_task, task_index, task_depends):
577 task = ro_task["tasks"][task_index]
578 task_id = task["task_id"]
579 target_vim = self.my_vims[ro_task["target_id"]]
580 db_task_update = {"retries": 0}
581 retries = task.get("retries", 0)
582
583 try:
584 params = task["params"]
585 params_copy = deepcopy(params)
586 params_copy["ro_key"] = self.db.decrypt(
587 params_copy.pop("private_key"),
588 params_copy.pop("schema_version"),
589 params_copy.pop("salt"),
590 )
591 params_copy["ip_addr"] = params_copy.pop("ip_address")
592 target_vim.inject_user_key(**params_copy)
593 self.logger.debug(
594 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
595 )
596
597 return (
598 "DONE",
599 None,
600 db_task_update,
601 ) # params_copy["key"]
602 except (vimconn.VimConnException, NsWorkerException) as e:
603 retries += 1
604
605 if retries < self.max_retries_inject_ssh_key:
606 return (
607 "BUILD",
608 None,
609 {
610 "retries": retries,
611 "next_retry": self.time_retries_inject_ssh_key,
612 },
613 )
614
615 self.logger.error(
616 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
617 )
618 ro_vim_item_update = {"vim_details": str(e)}
619
620 return "FAILED", ro_vim_item_update, db_task_update
621
622
623 class VimInteractionImage(VimInteractionBase):
624 def new(self, ro_task, task_index, task_depends):
625 task = ro_task["tasks"][task_index]
626 task_id = task["task_id"]
627 created = False
628 created_items = {}
629 target_vim = self.my_vims[ro_task["target_id"]]
630
631 try:
632 # FIND
633 if task.get("find_params"):
634 vim_images = target_vim.get_image_list(**task["find_params"])
635
636 if not vim_images:
637 raise NsWorkerExceptionNotFound(
638 "Image not found with this criteria: '{}'".format(
639 task["find_params"]
640 )
641 )
642 elif len(vim_images) > 1:
643 raise NsWorkerException(
644 "More than one image found with this criteria: '{}'".format(
645 task["find_params"]
646 )
647 )
648 else:
649 vim_image_id = vim_images[0]["id"]
650
651 ro_vim_item_update = {
652 "vim_id": vim_image_id,
653 "vim_status": "DONE",
654 "created": created,
655 "created_items": created_items,
656 "vim_details": None,
657 }
658 self.logger.debug(
659 "task={} {} new-image={} created={}".format(
660 task_id, ro_task["target_id"], vim_image_id, created
661 )
662 )
663
664 return "DONE", ro_vim_item_update
665 except (NsWorkerException, vimconn.VimConnException) as e:
666 self.logger.error(
667 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
668 )
669 ro_vim_item_update = {
670 "vim_status": "VIM_ERROR",
671 "created": created,
672 "vim_details": str(e),
673 }
674
675 return "FAILED", ro_vim_item_update
676
677
678 class VimInteractionFlavor(VimInteractionBase):
679 def delete(self, ro_task, task_index):
680 task = ro_task["tasks"][task_index]
681 task_id = task["task_id"]
682 flavor_vim_id = ro_task["vim_info"]["vim_id"]
683 ro_vim_item_update_ok = {
684 "vim_status": "DELETED",
685 "created": False,
686 "vim_details": "DELETED",
687 "vim_id": None,
688 }
689
690 try:
691 if flavor_vim_id:
692 target_vim = self.my_vims[ro_task["target_id"]]
693 target_vim.delete_flavor(flavor_vim_id)
694 except vimconn.VimConnNotFoundException:
695 ro_vim_item_update_ok["vim_details"] = "already deleted"
696 except vimconn.VimConnException as e:
697 self.logger.error(
698 "ro_task={} vim={} del-flavor={}: {}".format(
699 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
700 )
701 )
702 ro_vim_item_update = {
703 "vim_status": "VIM_ERROR",
704 "vim_details": "Error while deleting: {}".format(e),
705 }
706
707 return "FAILED", ro_vim_item_update
708
709 self.logger.debug(
710 "task={} {} del-flavor={} {}".format(
711 task_id,
712 ro_task["target_id"],
713 flavor_vim_id,
714 ro_vim_item_update_ok.get("vim_details", ""),
715 )
716 )
717
718 return "DONE", ro_vim_item_update_ok
719
720 def new(self, ro_task, task_index, task_depends):
721 task = ro_task["tasks"][task_index]
722 task_id = task["task_id"]
723 created = False
724 created_items = {}
725 target_vim = self.my_vims[ro_task["target_id"]]
726
727 try:
728 # FIND
729 vim_flavor_id = None
730
731 if task.get("find_params"):
732 try:
733 flavor_data = task["find_params"]["flavor_data"]
734 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
735 except vimconn.VimConnNotFoundException:
736 pass
737
738 if not vim_flavor_id and task.get("params"):
739 # CREATE
740 flavor_data = task["params"]["flavor_data"]
741 vim_flavor_id = target_vim.new_flavor(flavor_data)
742 created = True
743
744 ro_vim_item_update = {
745 "vim_id": vim_flavor_id,
746 "vim_status": "DONE",
747 "created": created,
748 "created_items": created_items,
749 "vim_details": None,
750 }
751 self.logger.debug(
752 "task={} {} new-flavor={} created={}".format(
753 task_id, ro_task["target_id"], vim_flavor_id, created
754 )
755 )
756
757 return "DONE", ro_vim_item_update
758 except (vimconn.VimConnException, NsWorkerException) as e:
759 self.logger.error(
760 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
761 )
762 ro_vim_item_update = {
763 "vim_status": "VIM_ERROR",
764 "created": created,
765 "vim_details": str(e),
766 }
767
768 return "FAILED", ro_vim_item_update
769
770
771 class VimInteractionAffinityGroup(VimInteractionBase):
772 def delete(self, ro_task, task_index):
773 task = ro_task["tasks"][task_index]
774 task_id = task["task_id"]
775 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
776 ro_vim_item_update_ok = {
777 "vim_status": "DELETED",
778 "created": False,
779 "vim_details": "DELETED",
780 "vim_id": None,
781 }
782
783 try:
784 if affinity_group_vim_id:
785 target_vim = self.my_vims[ro_task["target_id"]]
786 target_vim.delete_affinity_group(affinity_group_vim_id)
787 except vimconn.VimConnNotFoundException:
788 ro_vim_item_update_ok["vim_details"] = "already deleted"
789 except vimconn.VimConnException as e:
790 self.logger.error(
791 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
792 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
793 )
794 )
795 ro_vim_item_update = {
796 "vim_status": "VIM_ERROR",
797 "vim_details": "Error while deleting: {}".format(e),
798 }
799
800 return "FAILED", ro_vim_item_update
801
802 self.logger.debug(
803 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
804 task_id,
805 ro_task["target_id"],
806 affinity_group_vim_id,
807 ro_vim_item_update_ok.get("vim_details", ""),
808 )
809 )
810
811 return "DONE", ro_vim_item_update_ok
812
813 def new(self, ro_task, task_index, task_depends):
814 task = ro_task["tasks"][task_index]
815 task_id = task["task_id"]
816 created = False
817 created_items = {}
818 target_vim = self.my_vims[ro_task["target_id"]]
819
820 try:
821 affinity_group_vim_id = None
822 affinity_group_data = None
823
824 if task.get("params"):
825 affinity_group_data = task["params"].get("affinity_group_data")
826
827 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
828 try:
829 param_affinity_group_id = task["params"]["affinity_group_data"].get(
830 "vim-affinity-group-id"
831 )
832 affinity_group_vim_id = target_vim.get_affinity_group(
833 param_affinity_group_id
834 ).get("id")
835 except vimconn.VimConnNotFoundException:
836 self.logger.error(
837 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
838 "could not be found at VIM. Creating a new one.".format(
839 task_id, ro_task["target_id"], param_affinity_group_id
840 )
841 )
842
843 if not affinity_group_vim_id and affinity_group_data:
844 affinity_group_vim_id = target_vim.new_affinity_group(
845 affinity_group_data
846 )
847 created = True
848
849 ro_vim_item_update = {
850 "vim_id": affinity_group_vim_id,
851 "vim_status": "DONE",
852 "created": created,
853 "created_items": created_items,
854 "vim_details": None,
855 }
856 self.logger.debug(
857 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
858 task_id, ro_task["target_id"], affinity_group_vim_id, created
859 )
860 )
861
862 return "DONE", ro_vim_item_update
863 except (vimconn.VimConnException, NsWorkerException) as e:
864 self.logger.error(
865 "task={} vim={} new-affinity-or-anti-affinity-group:"
866 " {}".format(task_id, ro_task["target_id"], e)
867 )
868 ro_vim_item_update = {
869 "vim_status": "VIM_ERROR",
870 "created": created,
871 "vim_details": str(e),
872 }
873
874 return "FAILED", ro_vim_item_update
875
876
877 class VimInteractionSdnNet(VimInteractionBase):
878 @staticmethod
879 def _match_pci(port_pci, mapping):
880 """
881 Check if port_pci matches with mapping
882 mapping can have brackets to indicate that several chars are accepted. e.g
883 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
884 :param port_pci: text
885 :param mapping: text, can contain brackets to indicate several chars are available
886 :return: True if matches, False otherwise
887 """
888 if not port_pci or not mapping:
889 return False
890 if port_pci == mapping:
891 return True
892
893 mapping_index = 0
894 pci_index = 0
895 while True:
896 bracket_start = mapping.find("[", mapping_index)
897
898 if bracket_start == -1:
899 break
900
901 bracket_end = mapping.find("]", bracket_start)
902 if bracket_end == -1:
903 break
904
905 length = bracket_start - mapping_index
906 if (
907 length
908 and port_pci[pci_index : pci_index + length]
909 != mapping[mapping_index:bracket_start]
910 ):
911 return False
912
913 if (
914 port_pci[pci_index + length]
915 not in mapping[bracket_start + 1 : bracket_end]
916 ):
917 return False
918
919 pci_index += length + 1
920 mapping_index = bracket_end + 1
921
922 if port_pci[pci_index:] != mapping[mapping_index:]:
923 return False
924
925 return True
926
927 def _get_interfaces(self, vlds_to_connect, vim_account_id):
928 """
929 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
930 :param vim_account_id:
931 :return:
932 """
933 interfaces = []
934
935 for vld in vlds_to_connect:
936 table, _, db_id = vld.partition(":")
937 db_id, _, vld = db_id.partition(":")
938 _, _, vld_id = vld.partition(".")
939
940 if table == "vnfrs":
941 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
942 iface_key = "vnf-vld-id"
943 else: # table == "nsrs"
944 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
945 iface_key = "ns-vld-id"
946
947 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
948
949 for db_vnfr in db_vnfrs:
950 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
951 for iface_index, interface in enumerate(vdur["interfaces"]):
952 if interface.get(iface_key) == vld_id and interface.get(
953 "type"
954 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
955 # only SR-IOV o PT
956 interface_ = interface.copy()
957 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
958 db_vnfr["_id"], vdu_index, iface_index
959 )
960
961 if vdur.get("status") == "ERROR":
962 interface_["status"] = "ERROR"
963
964 interfaces.append(interface_)
965
966 return interfaces
967
968 def refresh(self, ro_task):
969 # look for task create
970 task_create_index, _ = next(
971 i_t
972 for i_t in enumerate(ro_task["tasks"])
973 if i_t[1]
974 and i_t[1]["action"] == "CREATE"
975 and i_t[1]["status"] != "FINISHED"
976 )
977
978 return self.new(ro_task, task_create_index, None)
979
980 def new(self, ro_task, task_index, task_depends):
981
982 task = ro_task["tasks"][task_index]
983 task_id = task["task_id"]
984 target_vim = self.my_vims[ro_task["target_id"]]
985
986 sdn_net_id = ro_task["vim_info"]["vim_id"]
987
988 created_items = ro_task["vim_info"].get("created_items")
989 connected_ports = ro_task["vim_info"].get("connected_ports", [])
990 new_connected_ports = []
991 last_update = ro_task["vim_info"].get("last_update", 0)
992 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
993 error_list = []
994 created = ro_task["vim_info"].get("created", False)
995
996 try:
997 # CREATE
998 params = task["params"]
999 vlds_to_connect = params["vlds"]
1000 associated_vim = params["target_vim"]
1001 # external additional ports
1002 additional_ports = params.get("sdn-ports") or ()
1003 _, _, vim_account_id = associated_vim.partition(":")
1004
1005 if associated_vim:
1006 # get associated VIM
1007 if associated_vim not in self.db_vims:
1008 self.db_vims[associated_vim] = self.db.get_one(
1009 "vim_accounts", {"_id": vim_account_id}
1010 )
1011
1012 db_vim = self.db_vims[associated_vim]
1013
1014 # look for ports to connect
1015 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1016 # print(ports)
1017
1018 sdn_ports = []
1019 pending_ports = error_ports = 0
1020 vlan_used = None
1021 sdn_need_update = False
1022
1023 for port in ports:
1024 vlan_used = port.get("vlan") or vlan_used
1025
1026 # TODO. Do not connect if already done
1027 if not port.get("compute_node") or not port.get("pci"):
1028 if port.get("status") == "ERROR":
1029 error_ports += 1
1030 else:
1031 pending_ports += 1
1032 continue
1033
1034 pmap = None
1035 compute_node_mappings = next(
1036 (
1037 c
1038 for c in db_vim["config"].get("sdn-port-mapping", ())
1039 if c and c["compute_node"] == port["compute_node"]
1040 ),
1041 None,
1042 )
1043
1044 if compute_node_mappings:
1045 # process port_mapping pci of type 0000:af:1[01].[1357]
1046 pmap = next(
1047 (
1048 p
1049 for p in compute_node_mappings["ports"]
1050 if self._match_pci(port["pci"], p.get("pci"))
1051 ),
1052 None,
1053 )
1054
1055 if not pmap:
1056 if not db_vim["config"].get("mapping_not_needed"):
1057 error_list.append(
1058 "Port mapping not found for compute_node={} pci={}".format(
1059 port["compute_node"], port["pci"]
1060 )
1061 )
1062 continue
1063
1064 pmap = {}
1065
1066 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1067 new_port = {
1068 "service_endpoint_id": pmap.get("service_endpoint_id")
1069 or service_endpoint_id,
1070 "service_endpoint_encapsulation_type": "dot1q"
1071 if port["type"] == "SR-IOV"
1072 else None,
1073 "service_endpoint_encapsulation_info": {
1074 "vlan": port.get("vlan"),
1075 "mac": port.get("mac-address"),
1076 "device_id": pmap.get("device_id") or port["compute_node"],
1077 "device_interface_id": pmap.get("device_interface_id")
1078 or port["pci"],
1079 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1080 "switch_port": pmap.get("switch_port"),
1081 "service_mapping_info": pmap.get("service_mapping_info"),
1082 },
1083 }
1084
1085 # TODO
1086 # if port["modified_at"] > last_update:
1087 # sdn_need_update = True
1088 new_connected_ports.append(port["id"]) # TODO
1089 sdn_ports.append(new_port)
1090
1091 if error_ports:
1092 error_list.append(
1093 "{} interfaces have not been created as VDU is on ERROR status".format(
1094 error_ports
1095 )
1096 )
1097
1098 # connect external ports
1099 for index, additional_port in enumerate(additional_ports):
1100 additional_port_id = additional_port.get(
1101 "service_endpoint_id"
1102 ) or "external-{}".format(index)
1103 sdn_ports.append(
1104 {
1105 "service_endpoint_id": additional_port_id,
1106 "service_endpoint_encapsulation_type": additional_port.get(
1107 "service_endpoint_encapsulation_type", "dot1q"
1108 ),
1109 "service_endpoint_encapsulation_info": {
1110 "vlan": additional_port.get("vlan") or vlan_used,
1111 "mac": additional_port.get("mac_address"),
1112 "device_id": additional_port.get("device_id"),
1113 "device_interface_id": additional_port.get(
1114 "device_interface_id"
1115 ),
1116 "switch_dpid": additional_port.get("switch_dpid")
1117 or additional_port.get("switch_id"),
1118 "switch_port": additional_port.get("switch_port"),
1119 "service_mapping_info": additional_port.get(
1120 "service_mapping_info"
1121 ),
1122 },
1123 }
1124 )
1125 new_connected_ports.append(additional_port_id)
1126 sdn_info = ""
1127
1128 # if there are more ports to connect or they have been modified, call create/update
1129 if error_list:
1130 sdn_status = "ERROR"
1131 sdn_info = "; ".join(error_list)
1132 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1133 last_update = time.time()
1134
1135 if not sdn_net_id:
1136 if len(sdn_ports) < 2:
1137 sdn_status = "ACTIVE"
1138
1139 if not pending_ports:
1140 self.logger.debug(
1141 "task={} {} new-sdn-net done, less than 2 ports".format(
1142 task_id, ro_task["target_id"]
1143 )
1144 )
1145 else:
1146 net_type = params.get("type") or "ELAN"
1147 (
1148 sdn_net_id,
1149 created_items,
1150 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1151 created = True
1152 self.logger.debug(
1153 "task={} {} new-sdn-net={} created={}".format(
1154 task_id, ro_task["target_id"], sdn_net_id, created
1155 )
1156 )
1157 else:
1158 created_items = target_vim.edit_connectivity_service(
1159 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1160 )
1161 created = True
1162 self.logger.debug(
1163 "task={} {} update-sdn-net={} created={}".format(
1164 task_id, ro_task["target_id"], sdn_net_id, created
1165 )
1166 )
1167
1168 connected_ports = new_connected_ports
1169 elif sdn_net_id:
1170 wim_status_dict = target_vim.get_connectivity_service_status(
1171 sdn_net_id, conn_info=created_items
1172 )
1173 sdn_status = wim_status_dict["sdn_status"]
1174
1175 if wim_status_dict.get("sdn_info"):
1176 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1177
1178 if wim_status_dict.get("error_msg"):
1179 sdn_info = wim_status_dict.get("error_msg") or ""
1180
1181 if pending_ports:
1182 if sdn_status != "ERROR":
1183 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1184 len(ports) - pending_ports, len(ports)
1185 )
1186
1187 if sdn_status == "ACTIVE":
1188 sdn_status = "BUILD"
1189
1190 ro_vim_item_update = {
1191 "vim_id": sdn_net_id,
1192 "vim_status": sdn_status,
1193 "created": created,
1194 "created_items": created_items,
1195 "connected_ports": connected_ports,
1196 "vim_details": sdn_info,
1197 "last_update": last_update,
1198 }
1199
1200 return sdn_status, ro_vim_item_update
1201 except Exception as e:
1202 self.logger.error(
1203 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1204 exc_info=not isinstance(
1205 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1206 ),
1207 )
1208 ro_vim_item_update = {
1209 "vim_status": "VIM_ERROR",
1210 "created": created,
1211 "vim_details": str(e),
1212 }
1213
1214 return "FAILED", ro_vim_item_update
1215
1216 def delete(self, ro_task, task_index):
1217 task = ro_task["tasks"][task_index]
1218 task_id = task["task_id"]
1219 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1220 ro_vim_item_update_ok = {
1221 "vim_status": "DELETED",
1222 "created": False,
1223 "vim_details": "DELETED",
1224 "vim_id": None,
1225 }
1226
1227 try:
1228 if sdn_vim_id:
1229 target_vim = self.my_vims[ro_task["target_id"]]
1230 target_vim.delete_connectivity_service(
1231 sdn_vim_id, ro_task["vim_info"].get("created_items")
1232 )
1233
1234 except Exception as e:
1235 if (
1236 isinstance(e, sdnconn.SdnConnectorError)
1237 and e.http_code == HTTPStatus.NOT_FOUND.value
1238 ):
1239 ro_vim_item_update_ok["vim_details"] = "already deleted"
1240 else:
1241 self.logger.error(
1242 "ro_task={} vim={} del-sdn-net={}: {}".format(
1243 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1244 ),
1245 exc_info=not isinstance(
1246 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1247 ),
1248 )
1249 ro_vim_item_update = {
1250 "vim_status": "VIM_ERROR",
1251 "vim_details": "Error while deleting: {}".format(e),
1252 }
1253
1254 return "FAILED", ro_vim_item_update
1255
1256 self.logger.debug(
1257 "task={} {} del-sdn-net={} {}".format(
1258 task_id,
1259 ro_task["target_id"],
1260 sdn_vim_id,
1261 ro_vim_item_update_ok.get("vim_details", ""),
1262 )
1263 )
1264
1265 return "DONE", ro_vim_item_update_ok
1266
1267
1268 class NsWorker(threading.Thread):
1269 REFRESH_BUILD = 5 # 5 seconds
1270 REFRESH_ACTIVE = 60 # 1 minute
1271 REFRESH_ERROR = 600
1272 REFRESH_IMAGE = 3600 * 10
1273 REFRESH_DELETE = 3600 * 10
1274 QUEUE_SIZE = 100
1275 terminate = False
1276
1277 def __init__(self, worker_index, config, plugins, db):
1278 """
1279
1280 :param worker_index: thread index
1281 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1282 :param plugins: global shared dict with the loaded plugins
1283 :param db: database class instance to use
1284 """
1285 threading.Thread.__init__(self)
1286 self.config = config
1287 self.plugins = plugins
1288 self.plugin_name = "unknown"
1289 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1290 self.worker_index = worker_index
1291 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1292 # targetvim: vimplugin class
1293 self.my_vims = {}
1294 # targetvim: vim information from database
1295 self.db_vims = {}
1296 # targetvim list
1297 self.vim_targets = []
1298 self.my_id = config["process_id"] + ":" + str(worker_index)
1299 self.db = db
1300 self.item2class = {
1301 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1302 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1303 "image": VimInteractionImage(
1304 self.db, self.my_vims, self.db_vims, self.logger
1305 ),
1306 "flavor": VimInteractionFlavor(
1307 self.db, self.my_vims, self.db_vims, self.logger
1308 ),
1309 "sdn_net": VimInteractionSdnNet(
1310 self.db, self.my_vims, self.db_vims, self.logger
1311 ),
1312 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1313 self.db, self.my_vims, self.db_vims, self.logger
1314 ),
1315 }
1316 self.time_last_task_processed = None
1317 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1318 self.tasks_to_delete = []
1319 # it is idle when there are not vim_targets associated
1320 self.idle = True
1321 self.task_locked_time = config["global"]["task_locked_time"]
1322
1323 def insert_task(self, task):
1324 try:
1325 self.task_queue.put(task, False)
1326 return None
1327 except queue.Full:
1328 raise NsWorkerException("timeout inserting a task")
1329
1330 def terminate(self):
1331 self.insert_task("exit")
1332
1333 def del_task(self, task):
1334 with self.task_lock:
1335 if task["status"] == "SCHEDULED":
1336 task["status"] = "SUPERSEDED"
1337 return True
1338 else: # task["status"] == "processing"
1339 self.task_lock.release()
1340 return False
1341
1342 def _process_vim_config(self, target_id, db_vim):
1343 """
1344 Process vim config, creating vim configuration files as ca_cert
1345 :param target_id: vim/sdn/wim + id
1346 :param db_vim: Vim dictionary obtained from database
1347 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1348 """
1349 if not db_vim.get("config"):
1350 return
1351
1352 file_name = ""
1353
1354 try:
1355 if db_vim["config"].get("ca_cert_content"):
1356 file_name = "{}:{}".format(target_id, self.worker_index)
1357
1358 try:
1359 mkdir(file_name)
1360 except FileExistsError:
1361 pass
1362
1363 file_name = file_name + "/ca_cert"
1364
1365 with open(file_name, "w") as f:
1366 f.write(db_vim["config"]["ca_cert_content"])
1367 del db_vim["config"]["ca_cert_content"]
1368 db_vim["config"]["ca_cert"] = file_name
1369 except Exception as e:
1370 raise NsWorkerException(
1371 "Error writing to file '{}': {}".format(file_name, e)
1372 )
1373
1374 def _load_plugin(self, name, type="vim"):
1375 # type can be vim or sdn
1376 if "rovim_dummy" not in self.plugins:
1377 self.plugins["rovim_dummy"] = VimDummyConnector
1378
1379 if "rosdn_dummy" not in self.plugins:
1380 self.plugins["rosdn_dummy"] = SdnDummyConnector
1381
1382 if name in self.plugins:
1383 return self.plugins[name]
1384
1385 try:
1386 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1387 self.plugins[name] = ep.load()
1388 except Exception as e:
1389 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1390
1391 if name and name not in self.plugins:
1392 raise NsWorkerException(
1393 "Plugin 'osm_{n}' has not been installed".format(n=name)
1394 )
1395
1396 return self.plugins[name]
1397
1398 def _unload_vim(self, target_id):
1399 """
1400 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1401 :param target_id: Contains type:_id; where type can be 'vim', ...
1402 :return: None.
1403 """
1404 try:
1405 self.db_vims.pop(target_id, None)
1406 self.my_vims.pop(target_id, None)
1407
1408 if target_id in self.vim_targets:
1409 self.vim_targets.remove(target_id)
1410
1411 self.logger.info("Unloaded {}".format(target_id))
1412 rmtree("{}:{}".format(target_id, self.worker_index))
1413 except FileNotFoundError:
1414 pass # this is raised by rmtree if folder does not exist
1415 except Exception as e:
1416 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1417
1418 def _check_vim(self, target_id):
1419 """
1420 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1421 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1422 :return: None.
1423 """
1424 target, _, _id = target_id.partition(":")
1425 now = time.time()
1426 update_dict = {}
1427 unset_dict = {}
1428 op_text = ""
1429 step = ""
1430 loaded = target_id in self.vim_targets
1431 target_database = (
1432 "vim_accounts"
1433 if target == "vim"
1434 else "wim_accounts"
1435 if target == "wim"
1436 else "sdns"
1437 )
1438
1439 try:
1440 step = "Getting {} from db".format(target_id)
1441 db_vim = self.db.get_one(target_database, {"_id": _id})
1442
1443 for op_index, operation in enumerate(
1444 db_vim["_admin"].get("operations", ())
1445 ):
1446 if operation["operationState"] != "PROCESSING":
1447 continue
1448
1449 locked_at = operation.get("locked_at")
1450
1451 if locked_at is not None and locked_at >= now - self.task_locked_time:
1452 # some other thread is doing this operation
1453 return
1454
1455 # lock
1456 op_text = "_admin.operations.{}.".format(op_index)
1457
1458 if not self.db.set_one(
1459 target_database,
1460 q_filter={
1461 "_id": _id,
1462 op_text + "operationState": "PROCESSING",
1463 op_text + "locked_at": locked_at,
1464 },
1465 update_dict={
1466 op_text + "locked_at": now,
1467 "admin.current_operation": op_index,
1468 },
1469 fail_on_empty=False,
1470 ):
1471 return
1472
1473 unset_dict[op_text + "locked_at"] = None
1474 unset_dict["current_operation"] = None
1475 step = "Loading " + target_id
1476 error_text = self._load_vim(target_id)
1477
1478 if not error_text:
1479 step = "Checking connectivity"
1480
1481 if target == "vim":
1482 self.my_vims[target_id].check_vim_connectivity()
1483 else:
1484 self.my_vims[target_id].check_credentials()
1485
1486 update_dict["_admin.operationalState"] = "ENABLED"
1487 update_dict["_admin.detailed-status"] = ""
1488 unset_dict[op_text + "detailed-status"] = None
1489 update_dict[op_text + "operationState"] = "COMPLETED"
1490
1491 return
1492
1493 except Exception as e:
1494 error_text = "{}: {}".format(step, e)
1495 self.logger.error("{} for {}: {}".format(step, target_id, e))
1496
1497 finally:
1498 if update_dict or unset_dict:
1499 if error_text:
1500 update_dict[op_text + "operationState"] = "FAILED"
1501 update_dict[op_text + "detailed-status"] = error_text
1502 unset_dict.pop(op_text + "detailed-status", None)
1503 update_dict["_admin.operationalState"] = "ERROR"
1504 update_dict["_admin.detailed-status"] = error_text
1505
1506 if op_text:
1507 update_dict[op_text + "statusEnteredTime"] = now
1508
1509 self.db.set_one(
1510 target_database,
1511 q_filter={"_id": _id},
1512 update_dict=update_dict,
1513 unset=unset_dict,
1514 fail_on_empty=False,
1515 )
1516
1517 if not loaded:
1518 self._unload_vim(target_id)
1519
1520 def _reload_vim(self, target_id):
1521 if target_id in self.vim_targets:
1522 self._load_vim(target_id)
1523 else:
1524 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1525 # just remove it to force load again next time it is needed
1526 self.db_vims.pop(target_id, None)
1527
1528 def _load_vim(self, target_id):
1529 """
1530 Load or reload a vim_account, sdn_controller or wim_account.
1531 Read content from database, load the plugin if not loaded.
1532 In case of error loading the plugin, it load a failing VIM_connector
1533 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1534 :param target_id: Contains type:_id; where type can be 'vim', ...
1535 :return: None if ok, descriptive text if error
1536 """
1537 target, _, _id = target_id.partition(":")
1538 target_database = (
1539 "vim_accounts"
1540 if target == "vim"
1541 else "wim_accounts"
1542 if target == "wim"
1543 else "sdns"
1544 )
1545 plugin_name = ""
1546 vim = None
1547
1548 try:
1549 step = "Getting {}={} from db".format(target, _id)
1550 # TODO process for wim, sdnc, ...
1551 vim = self.db.get_one(target_database, {"_id": _id})
1552
1553 # if deep_get(vim, "config", "sdn-controller"):
1554 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1555 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1556
1557 step = "Decrypting password"
1558 schema_version = vim.get("schema_version")
1559 self.db.encrypt_decrypt_fields(
1560 vim,
1561 "decrypt",
1562 fields=("password", "secret"),
1563 schema_version=schema_version,
1564 salt=_id,
1565 )
1566 self._process_vim_config(target_id, vim)
1567
1568 if target == "vim":
1569 plugin_name = "rovim_" + vim["vim_type"]
1570 step = "Loading plugin '{}'".format(plugin_name)
1571 vim_module_conn = self._load_plugin(plugin_name)
1572 step = "Loading {}'".format(target_id)
1573 self.my_vims[target_id] = vim_module_conn(
1574 uuid=vim["_id"],
1575 name=vim["name"],
1576 tenant_id=vim.get("vim_tenant_id"),
1577 tenant_name=vim.get("vim_tenant_name"),
1578 url=vim["vim_url"],
1579 url_admin=None,
1580 user=vim["vim_user"],
1581 passwd=vim["vim_password"],
1582 config=vim.get("config") or {},
1583 persistent_info={},
1584 )
1585 else: # sdn
1586 plugin_name = "rosdn_" + vim["type"]
1587 step = "Loading plugin '{}'".format(plugin_name)
1588 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1589 step = "Loading {}'".format(target_id)
1590 wim = deepcopy(vim)
1591 wim_config = wim.pop("config", {}) or {}
1592 wim["uuid"] = wim["_id"]
1593 wim["wim_url"] = wim["url"]
1594
1595 if wim.get("dpid"):
1596 wim_config["dpid"] = wim.pop("dpid")
1597
1598 if wim.get("switch_id"):
1599 wim_config["switch_id"] = wim.pop("switch_id")
1600
1601 # wim, wim_account, config
1602 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1603 self.db_vims[target_id] = vim
1604 self.error_status = None
1605
1606 self.logger.info(
1607 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1608 )
1609 except Exception as e:
1610 self.logger.error(
1611 "Cannot load {} plugin={}: {} {}".format(
1612 target_id, plugin_name, step, e
1613 )
1614 )
1615
1616 self.db_vims[target_id] = vim or {}
1617 self.db_vims[target_id] = FailingConnector(str(e))
1618 error_status = "{} Error: {}".format(step, e)
1619
1620 return error_status
1621 finally:
1622 if target_id not in self.vim_targets:
1623 self.vim_targets.append(target_id)
1624
1625 def _get_db_task(self):
1626 """
1627 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1628 :return: None
1629 """
1630 now = time.time()
1631
1632 if not self.time_last_task_processed:
1633 self.time_last_task_processed = now
1634
1635 try:
1636 while True:
1637 """
1638 # Log RO tasks only when loglevel is DEBUG
1639 if self.logger.getEffectiveLevel() == logging.DEBUG:
1640 self._log_ro_task(
1641 None,
1642 None,
1643 None,
1644 "TASK_WF",
1645 "task_locked_time="
1646 + str(self.task_locked_time)
1647 + " "
1648 + "time_last_task_processed="
1649 + str(self.time_last_task_processed)
1650 + " "
1651 + "now="
1652 + str(now),
1653 )
1654 """
1655 locked = self.db.set_one(
1656 "ro_tasks",
1657 q_filter={
1658 "target_id": self.vim_targets,
1659 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1660 "locked_at.lt": now - self.task_locked_time,
1661 "to_check_at.lt": self.time_last_task_processed,
1662 },
1663 update_dict={"locked_by": self.my_id, "locked_at": now},
1664 fail_on_empty=False,
1665 )
1666
1667 if locked:
1668 # read and return
1669 ro_task = self.db.get_one(
1670 "ro_tasks",
1671 q_filter={
1672 "target_id": self.vim_targets,
1673 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1674 "locked_at": now,
1675 },
1676 )
1677 return ro_task
1678
1679 if self.time_last_task_processed == now:
1680 self.time_last_task_processed = None
1681 return None
1682 else:
1683 self.time_last_task_processed = now
1684 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1685
1686 except DbException as e:
1687 self.logger.error("Database exception at _get_db_task: {}".format(e))
1688 except Exception as e:
1689 self.logger.critical(
1690 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1691 )
1692
1693 return None
1694
1695 def _get_db_all_tasks(self):
1696 """
1697 Read all content of table ro_tasks to log it
1698 :return: None
1699 """
1700 try:
1701 # Checking the content of the BD:
1702
1703 # read and return
1704 ro_task = self.db.get_list("ro_tasks")
1705 for rt in ro_task:
1706 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1707 return ro_task
1708
1709 except DbException as e:
1710 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1711 except Exception as e:
1712 self.logger.critical(
1713 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1714 )
1715
1716 return None
1717
1718 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1719 """
1720 Generate a log with the following format:
1721
1722 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1723 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1724 task_array_index;task_id;task_action;task_item;task_args
1725
1726 Example:
1727
1728 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1729 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1730 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1731 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1732 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1733 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1734 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1735 """
1736 try:
1737 line = []
1738 i = 0
1739 if ro_task is not None and isinstance(ro_task, dict):
1740 for t in ro_task["tasks"]:
1741 line.clear()
1742 line.append(mark)
1743 line.append(event)
1744 line.append(ro_task.get("_id", ""))
1745 line.append(str(ro_task.get("locked_at", "")))
1746 line.append(str(ro_task.get("modified_at", "")))
1747 line.append(str(ro_task.get("created_at", "")))
1748 line.append(str(ro_task.get("to_check_at", "")))
1749 line.append(str(ro_task.get("locked_by", "")))
1750 line.append(str(ro_task.get("target_id", "")))
1751 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1752 line.append(str(ro_task.get("vim_info", "")))
1753 line.append(str(ro_task.get("tasks", "")))
1754 if isinstance(t, dict):
1755 line.append(str(t.get("status", "")))
1756 line.append(str(t.get("action_id", "")))
1757 line.append(str(i))
1758 line.append(str(t.get("task_id", "")))
1759 line.append(str(t.get("action", "")))
1760 line.append(str(t.get("item", "")))
1761 line.append(str(t.get("find_params", "")))
1762 line.append(str(t.get("params", "")))
1763 else:
1764 line.extend([""] * 2)
1765 line.append(str(i))
1766 line.extend([""] * 5)
1767
1768 i += 1
1769 self.logger.debug(";".join(line))
1770 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1771 i = 0
1772 while True:
1773 st = "tasks.{}.status".format(i)
1774 if st not in db_ro_task_update:
1775 break
1776 line.clear()
1777 line.append(mark)
1778 line.append(event)
1779 line.append(db_ro_task_update.get("_id", ""))
1780 line.append(str(db_ro_task_update.get("locked_at", "")))
1781 line.append(str(db_ro_task_update.get("modified_at", "")))
1782 line.append("")
1783 line.append(str(db_ro_task_update.get("to_check_at", "")))
1784 line.append(str(db_ro_task_update.get("locked_by", "")))
1785 line.append("")
1786 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1787 line.append("")
1788 line.append(str(db_ro_task_update.get("vim_info", "")))
1789 line.append(str(str(db_ro_task_update).count(".status")))
1790 line.append(db_ro_task_update.get(st, ""))
1791 line.append("")
1792 line.append(str(i))
1793 line.extend([""] * 3)
1794 i += 1
1795 self.logger.debug(";".join(line))
1796
1797 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1798 line.clear()
1799 line.append(mark)
1800 line.append(event)
1801 line.append(db_ro_task_delete.get("_id", ""))
1802 line.append("")
1803 line.append(db_ro_task_delete.get("modified_at", ""))
1804 line.extend([""] * 13)
1805 self.logger.debug(";".join(line))
1806
1807 else:
1808 line.clear()
1809 line.append(mark)
1810 line.append(event)
1811 line.extend([""] * 16)
1812 self.logger.debug(";".join(line))
1813
1814 except Exception as e:
1815 self.logger.error("Error logging ro_task: {}".format(e))
1816
1817 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1818 """
1819 Determine if this task need to be done or superseded
1820 :return: None
1821 """
1822 my_task = ro_task["tasks"][task_index]
1823 task_id = my_task["task_id"]
1824 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1825 "created_items", False
1826 )
1827
1828 self.logger.warning("Needed delete: {}".format(needed_delete))
1829 if my_task["status"] == "FAILED":
1830 return None, None # TODO need to be retry??
1831
1832 try:
1833 for index, task in enumerate(ro_task["tasks"]):
1834 if index == task_index or not task:
1835 continue # own task
1836
1837 if (
1838 my_task["target_record"] == task["target_record"]
1839 and task["action"] == "CREATE"
1840 ):
1841 # set to finished
1842 db_update["tasks.{}.status".format(index)] = task[
1843 "status"
1844 ] = "FINISHED"
1845 elif task["action"] == "CREATE" and task["status"] not in (
1846 "FINISHED",
1847 "SUPERSEDED",
1848 ):
1849 needed_delete = False
1850
1851 if needed_delete:
1852 self.logger.warning(
1853 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
1854 )
1855 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1856 else:
1857 return "SUPERSEDED", None
1858 except Exception as e:
1859 if not isinstance(e, NsWorkerException):
1860 self.logger.critical(
1861 "Unexpected exception at _delete_task task={}: {}".format(
1862 task_id, e
1863 ),
1864 exc_info=True,
1865 )
1866
1867 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1868
1869 def _create_task(self, ro_task, task_index, task_depends, db_update):
1870 """
1871 Determine if this task need to create something at VIM
1872 :return: None
1873 """
1874 my_task = ro_task["tasks"][task_index]
1875 task_id = my_task["task_id"]
1876 task_status = None
1877
1878 if my_task["status"] == "FAILED":
1879 return None, None # TODO need to be retry??
1880 elif my_task["status"] == "SCHEDULED":
1881 # check if already created by another task
1882 for index, task in enumerate(ro_task["tasks"]):
1883 if index == task_index or not task:
1884 continue # own task
1885
1886 if task["action"] == "CREATE" and task["status"] not in (
1887 "SCHEDULED",
1888 "FINISHED",
1889 "SUPERSEDED",
1890 ):
1891 return task["status"], "COPY_VIM_INFO"
1892
1893 try:
1894 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1895 ro_task, task_index, task_depends
1896 )
1897 # TODO update other CREATE tasks
1898 except Exception as e:
1899 if not isinstance(e, NsWorkerException):
1900 self.logger.error(
1901 "Error executing task={}: {}".format(task_id, e), exc_info=True
1902 )
1903
1904 task_status = "FAILED"
1905 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1906 # TODO update ro_vim_item_update
1907
1908 return task_status, ro_vim_item_update
1909 else:
1910 return None, None
1911
1912 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1913 """
1914 Look for dependency task
1915 :param task_id: Can be one of
1916 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1917 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1918 3. task.task_id: "<action_id>:number"
1919 :param ro_task:
1920 :param target_id:
1921 :return: database ro_task plus index of task
1922 """
1923 if (
1924 task_id.startswith("vim:")
1925 or task_id.startswith("sdn:")
1926 or task_id.startswith("wim:")
1927 ):
1928 target_id, _, task_id = task_id.partition(" ")
1929
1930 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1931 ro_task_dependency = self.db.get_one(
1932 "ro_tasks",
1933 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1934 fail_on_empty=False,
1935 )
1936
1937 if ro_task_dependency:
1938 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1939 if task["target_record_id"] == task_id:
1940 return ro_task_dependency, task_index
1941
1942 else:
1943 if ro_task:
1944 for task_index, task in enumerate(ro_task["tasks"]):
1945 if task and task["task_id"] == task_id:
1946 return ro_task, task_index
1947
1948 ro_task_dependency = self.db.get_one(
1949 "ro_tasks",
1950 q_filter={
1951 "tasks.ANYINDEX.task_id": task_id,
1952 "tasks.ANYINDEX.target_record.ne": None,
1953 },
1954 fail_on_empty=False,
1955 )
1956
1957 self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
1958 if ro_task_dependency:
1959 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1960 if task["task_id"] == task_id:
1961 return ro_task_dependency, task_index
1962 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1963
1964 def _process_pending_tasks(self, ro_task):
1965 ro_task_id = ro_task["_id"]
1966 now = time.time()
1967 # one day
1968 next_check_at = now + (24 * 60 * 60)
1969 db_ro_task_update = {}
1970
1971 def _update_refresh(new_status):
1972 # compute next_refresh
1973 nonlocal task
1974 nonlocal next_check_at
1975 nonlocal db_ro_task_update
1976 nonlocal ro_task
1977
1978 next_refresh = time.time()
1979
1980 if task["item"] in ("image", "flavor"):
1981 next_refresh += self.REFRESH_IMAGE
1982 elif new_status == "BUILD":
1983 next_refresh += self.REFRESH_BUILD
1984 elif new_status == "DONE":
1985 next_refresh += self.REFRESH_ACTIVE
1986 else:
1987 next_refresh += self.REFRESH_ERROR
1988
1989 next_check_at = min(next_check_at, next_refresh)
1990 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1991 ro_task["vim_info"]["refresh_at"] = next_refresh
1992
1993 try:
1994 """
1995 # Log RO tasks only when loglevel is DEBUG
1996 if self.logger.getEffectiveLevel() == logging.DEBUG:
1997 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1998 """
1999 # 0: get task_status_create
2000 lock_object = None
2001 task_status_create = None
2002 task_create = next(
2003 (
2004 t
2005 for t in ro_task["tasks"]
2006 if t
2007 and t["action"] == "CREATE"
2008 and t["status"] in ("BUILD", "DONE")
2009 ),
2010 None,
2011 )
2012
2013 if task_create:
2014 task_status_create = task_create["status"]
2015
2016 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2017 for task_action in ("DELETE", "CREATE", "EXEC"):
2018 db_vim_update = None
2019 new_status = None
2020
2021 for task_index, task in enumerate(ro_task["tasks"]):
2022 if not task:
2023 continue # task deleted
2024
2025 task_depends = {}
2026 target_update = None
2027
2028 if (
2029 (
2030 task_action in ("DELETE", "EXEC")
2031 and task["status"] not in ("SCHEDULED", "BUILD")
2032 )
2033 or task["action"] != task_action
2034 or (
2035 task_action == "CREATE"
2036 and task["status"] in ("FINISHED", "SUPERSEDED")
2037 )
2038 ):
2039 continue
2040
2041 task_path = "tasks.{}.status".format(task_index)
2042 try:
2043 db_vim_info_update = None
2044
2045 if task["status"] == "SCHEDULED":
2046 # check if tasks that this depends on have been completed
2047 dependency_not_completed = False
2048
2049 for dependency_task_id in task.get("depends_on") or ():
2050 (
2051 dependency_ro_task,
2052 dependency_task_index,
2053 ) = self._get_dependency(
2054 dependency_task_id, target_id=ro_task["target_id"]
2055 )
2056 dependency_task = dependency_ro_task["tasks"][
2057 dependency_task_index
2058 ]
2059 self.logger.warning(
2060 "dependency_ro_task={} dependency_task_index={}".format(
2061 dependency_ro_task, dependency_task_index
2062 )
2063 )
2064
2065 if dependency_task["status"] == "SCHEDULED":
2066 dependency_not_completed = True
2067 next_check_at = min(
2068 next_check_at, dependency_ro_task["to_check_at"]
2069 )
2070 # must allow dependent task to be processed first
2071 # to do this set time after last_task_processed
2072 next_check_at = max(
2073 self.time_last_task_processed, next_check_at
2074 )
2075 break
2076 elif dependency_task["status"] == "FAILED":
2077 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2078 task["action"],
2079 task["item"],
2080 dependency_task["action"],
2081 dependency_task["item"],
2082 dependency_task_id,
2083 dependency_ro_task["vim_info"].get(
2084 "vim_details"
2085 ),
2086 )
2087 self.logger.error(
2088 "task={} {}".format(task["task_id"], error_text)
2089 )
2090 raise NsWorkerException(error_text)
2091
2092 task_depends[dependency_task_id] = dependency_ro_task[
2093 "vim_info"
2094 ]["vim_id"]
2095 task_depends[
2096 "TASK-{}".format(dependency_task_id)
2097 ] = dependency_ro_task["vim_info"]["vim_id"]
2098
2099 if dependency_not_completed:
2100 self.logger.warning(
2101 "DEPENDENCY NOT COMPLETED {}".format(
2102 dependency_ro_task["vim_info"]["vim_id"]
2103 )
2104 )
2105 # TODO set at vim_info.vim_details that it is waiting
2106 continue
2107
2108 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2109 # the task of renew this locking. It will update database locket_at periodically
2110 if not lock_object:
2111 lock_object = LockRenew.add_lock_object(
2112 "ro_tasks", ro_task, self
2113 )
2114
2115 if task["action"] == "DELETE":
2116 (new_status, db_vim_info_update,) = self._delete_task(
2117 ro_task, task_index, task_depends, db_ro_task_update
2118 )
2119 new_status = (
2120 "FINISHED" if new_status == "DONE" else new_status
2121 )
2122 # ^with FINISHED instead of DONE it will not be refreshing
2123
2124 if new_status in ("FINISHED", "SUPERSEDED"):
2125 target_update = "DELETE"
2126 elif task["action"] == "EXEC":
2127 (
2128 new_status,
2129 db_vim_info_update,
2130 db_task_update,
2131 ) = self.item2class[task["item"]].exec(
2132 ro_task, task_index, task_depends
2133 )
2134 new_status = (
2135 "FINISHED" if new_status == "DONE" else new_status
2136 )
2137 # ^with FINISHED instead of DONE it will not be refreshing
2138
2139 if db_task_update:
2140 # load into database the modified db_task_update "retries" and "next_retry"
2141 if db_task_update.get("retries"):
2142 db_ro_task_update[
2143 "tasks.{}.retries".format(task_index)
2144 ] = db_task_update["retries"]
2145
2146 next_check_at = time.time() + db_task_update.get(
2147 "next_retry", 60
2148 )
2149 target_update = None
2150 elif task["action"] == "CREATE":
2151 if task["status"] == "SCHEDULED":
2152 if task_status_create:
2153 new_status = task_status_create
2154 target_update = "COPY_VIM_INFO"
2155 else:
2156 new_status, db_vim_info_update = self.item2class[
2157 task["item"]
2158 ].new(ro_task, task_index, task_depends)
2159 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2160 _update_refresh(new_status)
2161 else:
2162 if (
2163 ro_task["vim_info"]["refresh_at"]
2164 and now > ro_task["vim_info"]["refresh_at"]
2165 ):
2166 new_status, db_vim_info_update = self.item2class[
2167 task["item"]
2168 ].refresh(ro_task)
2169 _update_refresh(new_status)
2170 else:
2171 # The refresh is updated to avoid set the value of "refresh_at" to
2172 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2173 # because it can happen that in this case the task is never processed
2174 _update_refresh(task["status"])
2175
2176 except Exception as e:
2177 new_status = "FAILED"
2178 db_vim_info_update = {
2179 "vim_status": "VIM_ERROR",
2180 "vim_details": str(e),
2181 }
2182
2183 if not isinstance(
2184 e, (NsWorkerException, vimconn.VimConnException)
2185 ):
2186 self.logger.error(
2187 "Unexpected exception at _delete_task task={}: {}".format(
2188 task["task_id"], e
2189 ),
2190 exc_info=True,
2191 )
2192
2193 try:
2194 if db_vim_info_update:
2195 db_vim_update = db_vim_info_update.copy()
2196 db_ro_task_update.update(
2197 {
2198 "vim_info." + k: v
2199 for k, v in db_vim_info_update.items()
2200 }
2201 )
2202 ro_task["vim_info"].update(db_vim_info_update)
2203
2204 if new_status:
2205 if task_action == "CREATE":
2206 task_status_create = new_status
2207 db_ro_task_update[task_path] = new_status
2208
2209 if target_update or db_vim_update:
2210 if target_update == "DELETE":
2211 self._update_target(task, None)
2212 elif target_update == "COPY_VIM_INFO":
2213 self._update_target(task, ro_task["vim_info"])
2214 else:
2215 self._update_target(task, db_vim_update)
2216
2217 except Exception as e:
2218 if (
2219 isinstance(e, DbException)
2220 and e.http_code == HTTPStatus.NOT_FOUND
2221 ):
2222 # if the vnfrs or nsrs has been removed from database, this task must be removed
2223 self.logger.debug(
2224 "marking to delete task={}".format(task["task_id"])
2225 )
2226 self.tasks_to_delete.append(task)
2227 else:
2228 self.logger.error(
2229 "Unexpected exception at _update_target task={}: {}".format(
2230 task["task_id"], e
2231 ),
2232 exc_info=True,
2233 )
2234
2235 locked_at = ro_task["locked_at"]
2236
2237 if lock_object:
2238 locked_at = [
2239 lock_object["locked_at"],
2240 lock_object["locked_at"] + self.task_locked_time,
2241 ]
2242 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2243 # contain exactly locked_at + self.task_locked_time
2244 LockRenew.remove_lock_object(lock_object)
2245
2246 q_filter = {
2247 "_id": ro_task["_id"],
2248 "to_check_at": ro_task["to_check_at"],
2249 "locked_at": locked_at,
2250 }
2251 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2252 # outside this task (by ro_nbi) do not update it
2253 db_ro_task_update["locked_by"] = None
2254 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2255 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2256 db_ro_task_update["modified_at"] = now
2257 db_ro_task_update["to_check_at"] = next_check_at
2258
2259 """
2260 # Log RO tasks only when loglevel is DEBUG
2261 if self.logger.getEffectiveLevel() == logging.DEBUG:
2262 db_ro_task_update_log = db_ro_task_update.copy()
2263 db_ro_task_update_log["_id"] = q_filter["_id"]
2264 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2265 """
2266
2267 if not self.db.set_one(
2268 "ro_tasks",
2269 update_dict=db_ro_task_update,
2270 q_filter=q_filter,
2271 fail_on_empty=False,
2272 ):
2273 del db_ro_task_update["to_check_at"]
2274 del q_filter["to_check_at"]
2275 """
2276 # Log RO tasks only when loglevel is DEBUG
2277 if self.logger.getEffectiveLevel() == logging.DEBUG:
2278 self._log_ro_task(
2279 None,
2280 db_ro_task_update_log,
2281 None,
2282 "TASK_WF",
2283 "SET_TASK " + str(q_filter),
2284 )
2285 """
2286 self.db.set_one(
2287 "ro_tasks",
2288 q_filter=q_filter,
2289 update_dict=db_ro_task_update,
2290 fail_on_empty=True,
2291 )
2292 except DbException as e:
2293 self.logger.error(
2294 "ro_task={} Error updating database {}".format(ro_task_id, e)
2295 )
2296 except Exception as e:
2297 self.logger.error(
2298 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2299 )
2300
2301 def _update_target(self, task, ro_vim_item_update):
2302 table, _, temp = task["target_record"].partition(":")
2303 _id, _, path_vim_status = temp.partition(":")
2304 path_item = path_vim_status[: path_vim_status.rfind(".")]
2305 path_item = path_item[: path_item.rfind(".")]
2306 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2307 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2308
2309 if ro_vim_item_update:
2310 update_dict = {
2311 path_vim_status + "." + k: v
2312 for k, v in ro_vim_item_update.items()
2313 if k
2314 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2315 }
2316
2317 if path_vim_status.startswith("vdur."):
2318 # for backward compatibility, add vdur.name apart from vdur.vim_name
2319 if ro_vim_item_update.get("vim_name"):
2320 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2321
2322 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2323 if ro_vim_item_update.get("vim_id"):
2324 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2325
2326 # update general status
2327 if ro_vim_item_update.get("vim_status"):
2328 update_dict[path_item + ".status"] = ro_vim_item_update[
2329 "vim_status"
2330 ]
2331
2332 if ro_vim_item_update.get("interfaces"):
2333 path_interfaces = path_item + ".interfaces"
2334
2335 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2336 if iface:
2337 update_dict.update(
2338 {
2339 path_interfaces + ".{}.".format(i) + k: v
2340 for k, v in iface.items()
2341 if k in ("vlan", "compute_node", "pci")
2342 }
2343 )
2344
2345 # put ip_address and mac_address with ip-address and mac-address
2346 if iface.get("ip_address"):
2347 update_dict[
2348 path_interfaces + ".{}.".format(i) + "ip-address"
2349 ] = iface["ip_address"]
2350
2351 if iface.get("mac_address"):
2352 update_dict[
2353 path_interfaces + ".{}.".format(i) + "mac-address"
2354 ] = iface["mac_address"]
2355
2356 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2357 update_dict["ip-address"] = iface.get("ip_address").split(
2358 ";"
2359 )[0]
2360
2361 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2362 update_dict[path_item + ".ip-address"] = iface.get(
2363 "ip_address"
2364 ).split(";")[0]
2365
2366 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2367 else:
2368 update_dict = {path_item + ".status": "DELETED"}
2369 self.db.set_one(
2370 table,
2371 q_filter={"_id": _id},
2372 update_dict=update_dict,
2373 unset={path_vim_status: None},
2374 )
2375
2376 def _process_delete_db_tasks(self):
2377 """
2378 Delete task from database because vnfrs or nsrs or both have been deleted
2379 :return: None. Uses and modify self.tasks_to_delete
2380 """
2381 while self.tasks_to_delete:
2382 task = self.tasks_to_delete[0]
2383 vnfrs_deleted = None
2384 nsr_id = task["nsr_id"]
2385
2386 if task["target_record"].startswith("vnfrs:"):
2387 # check if nsrs is present
2388 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2389 vnfrs_deleted = task["target_record"].split(":")[1]
2390
2391 try:
2392 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2393 except Exception as e:
2394 self.logger.error(
2395 "Error deleting task={}: {}".format(task["task_id"], e)
2396 )
2397 self.tasks_to_delete.pop(0)
2398
2399 @staticmethod
2400 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2401 """
2402 Static method because it is called from osm_ng_ro.ns
2403 :param db: instance of database to use
2404 :param nsr_id: affected nsrs id
2405 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2406 :return: None, exception is fails
2407 """
2408 retries = 5
2409 for retry in range(retries):
2410 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2411 now = time.time()
2412 conflict = False
2413
2414 for ro_task in ro_tasks:
2415 db_update = {}
2416 to_delete_ro_task = True
2417
2418 for index, task in enumerate(ro_task["tasks"]):
2419 if not task:
2420 pass
2421 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2422 vnfrs_deleted
2423 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2424 ):
2425 db_update["tasks.{}".format(index)] = None
2426 else:
2427 # used by other nsr, ro_task cannot be deleted
2428 to_delete_ro_task = False
2429
2430 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2431 if to_delete_ro_task:
2432 if not db.del_one(
2433 "ro_tasks",
2434 q_filter={
2435 "_id": ro_task["_id"],
2436 "modified_at": ro_task["modified_at"],
2437 },
2438 fail_on_empty=False,
2439 ):
2440 conflict = True
2441 elif db_update:
2442 db_update["modified_at"] = now
2443 if not db.set_one(
2444 "ro_tasks",
2445 q_filter={
2446 "_id": ro_task["_id"],
2447 "modified_at": ro_task["modified_at"],
2448 },
2449 update_dict=db_update,
2450 fail_on_empty=False,
2451 ):
2452 conflict = True
2453 if not conflict:
2454 return
2455 else:
2456 raise NsWorkerException("Exceeded {} retries".format(retries))
2457
2458 def run(self):
2459 # load database
2460 self.logger.info("Starting")
2461 while True:
2462 # step 1: get commands from queue
2463 try:
2464 if self.vim_targets:
2465 task = self.task_queue.get(block=False)
2466 else:
2467 if not self.idle:
2468 self.logger.debug("enters in idle state")
2469 self.idle = True
2470 task = self.task_queue.get(block=True)
2471 self.idle = False
2472
2473 if task[0] == "terminate":
2474 break
2475 elif task[0] == "load_vim":
2476 self.logger.info("order to load vim {}".format(task[1]))
2477 self._load_vim(task[1])
2478 elif task[0] == "unload_vim":
2479 self.logger.info("order to unload vim {}".format(task[1]))
2480 self._unload_vim(task[1])
2481 elif task[0] == "reload_vim":
2482 self._reload_vim(task[1])
2483 elif task[0] == "check_vim":
2484 self.logger.info("order to check vim {}".format(task[1]))
2485 self._check_vim(task[1])
2486 continue
2487 except Exception as e:
2488 if isinstance(e, queue.Empty):
2489 pass
2490 else:
2491 self.logger.critical(
2492 "Error processing task: {}".format(e), exc_info=True
2493 )
2494
2495 # step 2: process pending_tasks, delete not needed tasks
2496 try:
2497 if self.tasks_to_delete:
2498 self._process_delete_db_tasks()
2499 busy = False
2500 """
2501 # Log RO tasks only when loglevel is DEBUG
2502 if self.logger.getEffectiveLevel() == logging.DEBUG:
2503 _ = self._get_db_all_tasks()
2504 """
2505 ro_task = self._get_db_task()
2506 if ro_task:
2507 self.logger.warning("Task to process: {}".format(ro_task))
2508 time.sleep(1)
2509 self._process_pending_tasks(ro_task)
2510 busy = True
2511 if not busy:
2512 time.sleep(5)
2513 except Exception as e:
2514 self.logger.critical(
2515 "Unexpected exception at run: " + str(e), exc_info=True
2516 )
2517
2518 self.logger.info("Finishing")