5c3bb5d51efb1ed8f2dd3ff1012b86ec70b2d283
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45 __author__ = "Alfonso Tierno"
46 __date__ = "$28-Sep-2017 12:07:15$"
47
48
49 def deep_get(target_dict, *args, **kwargs):
50 """
51 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
52 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
53 :param target_dict: dictionary to be read
54 :param args: list of keys to read from target_dict
55 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
56 :return: The wanted value if exist, None or default otherwise
57 """
58 for key in args:
59 if not isinstance(target_dict, dict) or key not in target_dict:
60 return kwargs.get("default")
61 target_dict = target_dict[key]
62 return target_dict
63
64
65 class NsWorkerException(Exception):
66 pass
67
68
69 class FailingConnector:
70 def __init__(self, error_msg):
71 self.error_msg = error_msg
72
73 for method in dir(vimconn.VimConnector):
74 if method[0] != "_":
75 setattr(
76 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
77 )
78
79 for method in dir(sdnconn.SdnConnectorBase):
80 if method[0] != "_":
81 setattr(
82 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
83 )
84
85
86 class NsWorkerExceptionNotFound(NsWorkerException):
87 pass
88
89
90 class VimInteractionBase:
91 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
92 It implements methods that does nothing and return ok"""
93
94 def __init__(self, db, my_vims, db_vims, logger):
95 self.db = db
96 self.logger = logger
97 self.my_vims = my_vims
98 self.db_vims = db_vims
99
100 def new(self, ro_task, task_index, task_depends):
101 return "BUILD", {}
102
103 def refresh(self, ro_task):
104 """skip calling VIM to get image, flavor status. Assumes ok"""
105 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
106 return "FAILED", {}
107
108 return "DONE", {}
109
110 def delete(self, ro_task, task_index):
111 """skip calling VIM to delete image. Assumes ok"""
112 return "DONE", {}
113
114 def exec(self, ro_task, task_index, task_depends):
115 return "DONE", None, None
116
117
118 class VimInteractionNet(VimInteractionBase):
119 def new(self, ro_task, task_index, task_depends):
120 vim_net_id = None
121 task = ro_task["tasks"][task_index]
122 task_id = task["task_id"]
123 created = False
124 created_items = {}
125 target_vim = self.my_vims[ro_task["target_id"]]
126 mgmtnet = False
127 mgmtnet_defined_in_vim = False
128
129 try:
130 # FIND
131 if task.get("find_params"):
132 # if management, get configuration of VIM
133 if task["find_params"].get("filter_dict"):
134 vim_filter = task["find_params"]["filter_dict"]
135 # management network
136 elif task["find_params"].get("mgmt"):
137 mgmtnet = True
138 if deep_get(
139 self.db_vims[ro_task["target_id"]],
140 "config",
141 "management_network_id",
142 ):
143 mgmtnet_defined_in_vim = True
144 vim_filter = {
145 "id": self.db_vims[ro_task["target_id"]]["config"][
146 "management_network_id"
147 ]
148 }
149 elif deep_get(
150 self.db_vims[ro_task["target_id"]],
151 "config",
152 "management_network_name",
153 ):
154 mgmtnet_defined_in_vim = True
155 vim_filter = {
156 "name": self.db_vims[ro_task["target_id"]]["config"][
157 "management_network_name"
158 ]
159 }
160 else:
161 vim_filter = {"name": task["find_params"]["name"]}
162 else:
163 raise NsWorkerExceptionNotFound(
164 "Invalid find_params for new_net {}".format(task["find_params"])
165 )
166
167 vim_nets = target_vim.get_network_list(vim_filter)
168 if not vim_nets and not task.get("params"):
169 # If there is mgmt-network in the descriptor,
170 # there is no mapping of that network to a VIM network in the descriptor,
171 # also there is no mapping in the "--config" parameter or at VIM creation;
172 # that mgmt-network will be created.
173 if mgmtnet and not mgmtnet_defined_in_vim:
174 net_name = (
175 vim_filter.get("name")
176 if vim_filter.get("name")
177 else vim_filter.get("id")[:16]
178 )
179 vim_net_id, created_items = target_vim.new_network(
180 net_name, None
181 )
182 self.logger.debug(
183 "Created mgmt network vim_net_id: {}".format(vim_net_id)
184 )
185 created = True
186 else:
187 raise NsWorkerExceptionNotFound(
188 "Network not found with this criteria: '{}'".format(
189 task.get("find_params")
190 )
191 )
192 elif len(vim_nets) > 1:
193 raise NsWorkerException(
194 "More than one network found with this criteria: '{}'".format(
195 task["find_params"]
196 )
197 )
198
199 if vim_nets:
200 vim_net_id = vim_nets[0]["id"]
201 else:
202 # CREATE
203 params = task["params"]
204 vim_net_id, created_items = target_vim.new_network(**params)
205 created = True
206
207 ro_vim_item_update = {
208 "vim_id": vim_net_id,
209 "vim_status": "BUILD",
210 "created": created,
211 "created_items": created_items,
212 "vim_details": None,
213 }
214 self.logger.debug(
215 "task={} {} new-net={} created={}".format(
216 task_id, ro_task["target_id"], vim_net_id, created
217 )
218 )
219
220 return "BUILD", ro_vim_item_update
221 except (vimconn.VimConnException, NsWorkerException) as e:
222 self.logger.error(
223 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
224 )
225 ro_vim_item_update = {
226 "vim_status": "VIM_ERROR",
227 "created": created,
228 "vim_details": str(e),
229 }
230
231 return "FAILED", ro_vim_item_update
232
233 def refresh(self, ro_task):
234 """Call VIM to get network status"""
235 ro_task_id = ro_task["_id"]
236 target_vim = self.my_vims[ro_task["target_id"]]
237 vim_id = ro_task["vim_info"]["vim_id"]
238 net_to_refresh_list = [vim_id]
239
240 try:
241 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
242 vim_info = vim_dict[vim_id]
243
244 if vim_info["status"] == "ACTIVE":
245 task_status = "DONE"
246 elif vim_info["status"] == "BUILD":
247 task_status = "BUILD"
248 else:
249 task_status = "FAILED"
250 except vimconn.VimConnException as e:
251 # Mark all tasks at VIM_ERROR status
252 self.logger.error(
253 "ro_task={} vim={} get-net={}: {}".format(
254 ro_task_id, ro_task["target_id"], vim_id, e
255 )
256 )
257 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
258 task_status = "FAILED"
259
260 ro_vim_item_update = {}
261 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
262 ro_vim_item_update["vim_status"] = vim_info["status"]
263
264 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
265 ro_vim_item_update["vim_name"] = vim_info.get("name")
266
267 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
268 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
269 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
270 elif vim_info["status"] == "DELETED":
271 ro_vim_item_update["vim_id"] = None
272 ro_vim_item_update["vim_details"] = "Deleted externally"
273 else:
274 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
275 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
276
277 if ro_vim_item_update:
278 self.logger.debug(
279 "ro_task={} {} get-net={}: status={} {}".format(
280 ro_task_id,
281 ro_task["target_id"],
282 vim_id,
283 ro_vim_item_update.get("vim_status"),
284 ro_vim_item_update.get("vim_details")
285 if ro_vim_item_update.get("vim_status") != "ACTIVE"
286 else "",
287 )
288 )
289
290 return task_status, ro_vim_item_update
291
292 def delete(self, ro_task, task_index):
293 task = ro_task["tasks"][task_index]
294 task_id = task["task_id"]
295 net_vim_id = ro_task["vim_info"]["vim_id"]
296 ro_vim_item_update_ok = {
297 "vim_status": "DELETED",
298 "created": False,
299 "vim_details": "DELETED",
300 "vim_id": None,
301 }
302
303 try:
304 if net_vim_id or ro_task["vim_info"]["created_items"]:
305 target_vim = self.my_vims[ro_task["target_id"]]
306 target_vim.delete_network(
307 net_vim_id, ro_task["vim_info"]["created_items"]
308 )
309 except vimconn.VimConnNotFoundException:
310 ro_vim_item_update_ok["vim_details"] = "already deleted"
311 except vimconn.VimConnException as e:
312 self.logger.error(
313 "ro_task={} vim={} del-net={}: {}".format(
314 ro_task["_id"], ro_task["target_id"], net_vim_id, e
315 )
316 )
317 ro_vim_item_update = {
318 "vim_status": "VIM_ERROR",
319 "vim_details": "Error while deleting: {}".format(e),
320 }
321
322 return "FAILED", ro_vim_item_update
323
324 self.logger.debug(
325 "task={} {} del-net={} {}".format(
326 task_id,
327 ro_task["target_id"],
328 net_vim_id,
329 ro_vim_item_update_ok.get("vim_details", ""),
330 )
331 )
332
333 return "DONE", ro_vim_item_update_ok
334
335
336 class VimInteractionVdu(VimInteractionBase):
337 max_retries_inject_ssh_key = 20 # 20 times
338 time_retries_inject_ssh_key = 30 # wevery 30 seconds
339
340 def new(self, ro_task, task_index, task_depends):
341 task = ro_task["tasks"][task_index]
342 task_id = task["task_id"]
343 created = False
344 created_items = {}
345 target_vim = self.my_vims[ro_task["target_id"]]
346
347 try:
348 created = True
349 params = task["params"]
350 params_copy = deepcopy(params)
351 net_list = params_copy["net_list"]
352
353 for net in net_list:
354 # change task_id into network_id
355 if "net_id" in net and net["net_id"].startswith("TASK-"):
356 network_id = task_depends[net["net_id"]]
357
358 if not network_id:
359 raise NsWorkerException(
360 "Cannot create VM because depends on a network not created or found "
361 "for {}".format(net["net_id"])
362 )
363
364 net["net_id"] = network_id
365
366 if params_copy["image_id"].startswith("TASK-"):
367 params_copy["image_id"] = task_depends[params_copy["image_id"]]
368
369 if params_copy["flavor_id"].startswith("TASK-"):
370 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
371
372 affinity_group_list = params_copy["affinity_group_list"]
373 for affinity_group in affinity_group_list:
374 # change task_id into affinity_group_id
375 if "affinity_group_id" in affinity_group and affinity_group[
376 "affinity_group_id"
377 ].startswith("TASK-"):
378 affinity_group_id = task_depends[
379 affinity_group["affinity_group_id"]
380 ]
381
382 if not affinity_group_id:
383 raise NsWorkerException(
384 "found for {}".format(affinity_group["affinity_group_id"])
385 )
386
387 affinity_group["affinity_group_id"] = affinity_group_id
388
389 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
390 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
391
392 ro_vim_item_update = {
393 "vim_id": vim_vm_id,
394 "vim_status": "BUILD",
395 "created": created,
396 "created_items": created_items,
397 "vim_details": None,
398 "interfaces_vim_ids": interfaces,
399 "interfaces": [],
400 }
401 self.logger.debug(
402 "task={} {} new-vm={} created={}".format(
403 task_id, ro_task["target_id"], vim_vm_id, created
404 )
405 )
406
407 return "BUILD", ro_vim_item_update
408 except (vimconn.VimConnException, NsWorkerException) as e:
409 self.logger.error(
410 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
411 )
412 ro_vim_item_update = {
413 "vim_status": "VIM_ERROR",
414 "created": created,
415 "vim_details": str(e),
416 }
417
418 return "FAILED", ro_vim_item_update
419
420 def delete(self, ro_task, task_index):
421 task = ro_task["tasks"][task_index]
422 task_id = task["task_id"]
423 vm_vim_id = ro_task["vim_info"]["vim_id"]
424 ro_vim_item_update_ok = {
425 "vim_status": "DELETED",
426 "created": False,
427 "vim_details": "DELETED",
428 "vim_id": None,
429 }
430
431 try:
432 if vm_vim_id or ro_task["vim_info"]["created_items"]:
433 target_vim = self.my_vims[ro_task["target_id"]]
434 target_vim.delete_vminstance(
435 vm_vim_id, ro_task["vim_info"]["created_items"]
436 )
437 except vimconn.VimConnNotFoundException:
438 ro_vim_item_update_ok["vim_details"] = "already deleted"
439 except vimconn.VimConnException as e:
440 self.logger.error(
441 "ro_task={} vim={} del-vm={}: {}".format(
442 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
443 )
444 )
445 ro_vim_item_update = {
446 "vim_status": "VIM_ERROR",
447 "vim_details": "Error while deleting: {}".format(e),
448 }
449
450 return "FAILED", ro_vim_item_update
451
452 self.logger.debug(
453 "task={} {} del-vm={} {}".format(
454 task_id,
455 ro_task["target_id"],
456 vm_vim_id,
457 ro_vim_item_update_ok.get("vim_details", ""),
458 )
459 )
460
461 return "DONE", ro_vim_item_update_ok
462
463 def refresh(self, ro_task):
464 """Call VIM to get vm status"""
465 ro_task_id = ro_task["_id"]
466 target_vim = self.my_vims[ro_task["target_id"]]
467 vim_id = ro_task["vim_info"]["vim_id"]
468
469 if not vim_id:
470 return None, None
471
472 vm_to_refresh_list = [vim_id]
473 try:
474 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
475 vim_info = vim_dict[vim_id]
476
477 if vim_info["status"] == "ACTIVE":
478 task_status = "DONE"
479 elif vim_info["status"] == "BUILD":
480 task_status = "BUILD"
481 else:
482 task_status = "FAILED"
483
484 # try to load and parse vim_information
485 try:
486 vim_info_info = yaml.safe_load(vim_info["vim_info"])
487 if vim_info_info.get("name"):
488 vim_info["name"] = vim_info_info["name"]
489 except Exception:
490 pass
491 except vimconn.VimConnException as e:
492 # Mark all tasks at VIM_ERROR status
493 self.logger.error(
494 "ro_task={} vim={} get-vm={}: {}".format(
495 ro_task_id, ro_task["target_id"], vim_id, e
496 )
497 )
498 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
499 task_status = "FAILED"
500
501 ro_vim_item_update = {}
502
503 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
504 vim_interfaces = []
505 if vim_info.get("interfaces"):
506 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
507 iface = next(
508 (
509 iface
510 for iface in vim_info["interfaces"]
511 if vim_iface_id == iface["vim_interface_id"]
512 ),
513 None,
514 )
515 # if iface:
516 # iface.pop("vim_info", None)
517 vim_interfaces.append(iface)
518
519 task_create = next(
520 t
521 for t in ro_task["tasks"]
522 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
523 )
524 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
525 vim_interfaces[task_create["mgmt_vnf_interface"]][
526 "mgmt_vnf_interface"
527 ] = True
528
529 mgmt_vdu_iface = task_create.get(
530 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
531 )
532 if vim_interfaces:
533 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
534
535 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
536 ro_vim_item_update["interfaces"] = vim_interfaces
537
538 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
539 ro_vim_item_update["vim_status"] = vim_info["status"]
540
541 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
542 ro_vim_item_update["vim_name"] = vim_info.get("name")
543
544 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
545 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
546 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
547 elif vim_info["status"] == "DELETED":
548 ro_vim_item_update["vim_id"] = None
549 ro_vim_item_update["vim_details"] = "Deleted externally"
550 else:
551 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
552 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
553
554 if ro_vim_item_update:
555 self.logger.debug(
556 "ro_task={} {} get-vm={}: status={} {}".format(
557 ro_task_id,
558 ro_task["target_id"],
559 vim_id,
560 ro_vim_item_update.get("vim_status"),
561 ro_vim_item_update.get("vim_details")
562 if ro_vim_item_update.get("vim_status") != "ACTIVE"
563 else "",
564 )
565 )
566
567 return task_status, ro_vim_item_update
568
569 def exec(self, ro_task, task_index, task_depends):
570 task = ro_task["tasks"][task_index]
571 task_id = task["task_id"]
572 target_vim = self.my_vims[ro_task["target_id"]]
573 db_task_update = {"retries": 0}
574 retries = task.get("retries", 0)
575
576 try:
577 params = task["params"]
578 params_copy = deepcopy(params)
579 params_copy["ro_key"] = self.db.decrypt(
580 params_copy.pop("private_key"),
581 params_copy.pop("schema_version"),
582 params_copy.pop("salt"),
583 )
584 params_copy["ip_addr"] = params_copy.pop("ip_address")
585 target_vim.inject_user_key(**params_copy)
586 self.logger.debug(
587 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
588 )
589
590 return (
591 "DONE",
592 None,
593 db_task_update,
594 ) # params_copy["key"]
595 except (vimconn.VimConnException, NsWorkerException) as e:
596 retries += 1
597
598 if retries < self.max_retries_inject_ssh_key:
599 return (
600 "BUILD",
601 None,
602 {
603 "retries": retries,
604 "next_retry": self.time_retries_inject_ssh_key,
605 },
606 )
607
608 self.logger.error(
609 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
610 )
611 ro_vim_item_update = {"vim_details": str(e)}
612
613 return "FAILED", ro_vim_item_update, db_task_update
614
615
616 class VimInteractionImage(VimInteractionBase):
617 def new(self, ro_task, task_index, task_depends):
618 task = ro_task["tasks"][task_index]
619 task_id = task["task_id"]
620 created = False
621 created_items = {}
622 target_vim = self.my_vims[ro_task["target_id"]]
623
624 try:
625 # FIND
626 if task.get("find_params"):
627 vim_images = target_vim.get_image_list(**task["find_params"])
628
629 if not vim_images:
630 raise NsWorkerExceptionNotFound(
631 "Image not found with this criteria: '{}'".format(
632 task["find_params"]
633 )
634 )
635 elif len(vim_images) > 1:
636 raise NsWorkerException(
637 "More than one image found with this criteria: '{}'".format(
638 task["find_params"]
639 )
640 )
641 else:
642 vim_image_id = vim_images[0]["id"]
643
644 ro_vim_item_update = {
645 "vim_id": vim_image_id,
646 "vim_status": "DONE",
647 "created": created,
648 "created_items": created_items,
649 "vim_details": None,
650 }
651 self.logger.debug(
652 "task={} {} new-image={} created={}".format(
653 task_id, ro_task["target_id"], vim_image_id, created
654 )
655 )
656
657 return "DONE", ro_vim_item_update
658 except (NsWorkerException, vimconn.VimConnException) as e:
659 self.logger.error(
660 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
661 )
662 ro_vim_item_update = {
663 "vim_status": "VIM_ERROR",
664 "created": created,
665 "vim_details": str(e),
666 }
667
668 return "FAILED", ro_vim_item_update
669
670
671 class VimInteractionFlavor(VimInteractionBase):
672 def delete(self, ro_task, task_index):
673 task = ro_task["tasks"][task_index]
674 task_id = task["task_id"]
675 flavor_vim_id = ro_task["vim_info"]["vim_id"]
676 ro_vim_item_update_ok = {
677 "vim_status": "DELETED",
678 "created": False,
679 "vim_details": "DELETED",
680 "vim_id": None,
681 }
682
683 try:
684 if flavor_vim_id:
685 target_vim = self.my_vims[ro_task["target_id"]]
686 target_vim.delete_flavor(flavor_vim_id)
687 except vimconn.VimConnNotFoundException:
688 ro_vim_item_update_ok["vim_details"] = "already deleted"
689 except vimconn.VimConnException as e:
690 self.logger.error(
691 "ro_task={} vim={} del-flavor={}: {}".format(
692 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
693 )
694 )
695 ro_vim_item_update = {
696 "vim_status": "VIM_ERROR",
697 "vim_details": "Error while deleting: {}".format(e),
698 }
699
700 return "FAILED", ro_vim_item_update
701
702 self.logger.debug(
703 "task={} {} del-flavor={} {}".format(
704 task_id,
705 ro_task["target_id"],
706 flavor_vim_id,
707 ro_vim_item_update_ok.get("vim_details", ""),
708 )
709 )
710
711 return "DONE", ro_vim_item_update_ok
712
713 def new(self, ro_task, task_index, task_depends):
714 task = ro_task["tasks"][task_index]
715 task_id = task["task_id"]
716 created = False
717 created_items = {}
718 target_vim = self.my_vims[ro_task["target_id"]]
719
720 try:
721 # FIND
722 vim_flavor_id = None
723
724 if task.get("find_params"):
725 try:
726 flavor_data = task["find_params"]["flavor_data"]
727 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
728 except vimconn.VimConnNotFoundException:
729 pass
730
731 if not vim_flavor_id and task.get("params"):
732 # CREATE
733 flavor_data = task["params"]["flavor_data"]
734 vim_flavor_id = target_vim.new_flavor(flavor_data)
735 created = True
736
737 ro_vim_item_update = {
738 "vim_id": vim_flavor_id,
739 "vim_status": "DONE",
740 "created": created,
741 "created_items": created_items,
742 "vim_details": None,
743 }
744 self.logger.debug(
745 "task={} {} new-flavor={} created={}".format(
746 task_id, ro_task["target_id"], vim_flavor_id, created
747 )
748 )
749
750 return "DONE", ro_vim_item_update
751 except (vimconn.VimConnException, NsWorkerException) as e:
752 self.logger.error(
753 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
754 )
755 ro_vim_item_update = {
756 "vim_status": "VIM_ERROR",
757 "created": created,
758 "vim_details": str(e),
759 }
760
761 return "FAILED", ro_vim_item_update
762
763
764 class VimInteractionAffinityGroup(VimInteractionBase):
765 def delete(self, ro_task, task_index):
766 task = ro_task["tasks"][task_index]
767 task_id = task["task_id"]
768 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
769 ro_vim_item_update_ok = {
770 "vim_status": "DELETED",
771 "created": False,
772 "vim_details": "DELETED",
773 "vim_id": None,
774 }
775
776 try:
777 if affinity_group_vim_id:
778 target_vim = self.my_vims[ro_task["target_id"]]
779 target_vim.delete_affinity_group(affinity_group_vim_id)
780 except vimconn.VimConnNotFoundException:
781 ro_vim_item_update_ok["vim_details"] = "already deleted"
782 except vimconn.VimConnException as e:
783 self.logger.error(
784 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
785 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
786 )
787 )
788 ro_vim_item_update = {
789 "vim_status": "VIM_ERROR",
790 "vim_details": "Error while deleting: {}".format(e),
791 }
792
793 return "FAILED", ro_vim_item_update
794
795 self.logger.debug(
796 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
797 task_id,
798 ro_task["target_id"],
799 affinity_group_vim_id,
800 ro_vim_item_update_ok.get("vim_details", ""),
801 )
802 )
803
804 return "DONE", ro_vim_item_update_ok
805
806 def new(self, ro_task, task_index, task_depends):
807 task = ro_task["tasks"][task_index]
808 task_id = task["task_id"]
809 created = False
810 created_items = {}
811 target_vim = self.my_vims[ro_task["target_id"]]
812
813 try:
814 affinity_group_vim_id = None
815
816 if task.get("params"):
817 affinity_group_data = task["params"]["affinity_group_data"]
818 affinity_group_vim_id = target_vim.new_affinity_group(
819 affinity_group_data
820 )
821 created = True
822
823 ro_vim_item_update = {
824 "vim_id": affinity_group_vim_id,
825 "vim_status": "DONE",
826 "created": created,
827 "created_items": created_items,
828 "vim_details": None,
829 }
830 self.logger.debug(
831 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
832 task_id, ro_task["target_id"], affinity_group_vim_id, created
833 )
834 )
835
836 return "DONE", ro_vim_item_update
837 except (vimconn.VimConnException, NsWorkerException) as e:
838 self.logger.error(
839 "task={} vim={} new-affinity-or-anti-affinity-group:"
840 " {}".format(task_id, ro_task["target_id"], e)
841 )
842 ro_vim_item_update = {
843 "vim_status": "VIM_ERROR",
844 "created": created,
845 "vim_details": str(e),
846 }
847
848 return "FAILED", ro_vim_item_update
849
850
851 class VimInteractionSdnNet(VimInteractionBase):
852 @staticmethod
853 def _match_pci(port_pci, mapping):
854 """
855 Check if port_pci matches with mapping
856 mapping can have brackets to indicate that several chars are accepted. e.g
857 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
858 :param port_pci: text
859 :param mapping: text, can contain brackets to indicate several chars are available
860 :return: True if matches, False otherwise
861 """
862 if not port_pci or not mapping:
863 return False
864 if port_pci == mapping:
865 return True
866
867 mapping_index = 0
868 pci_index = 0
869 while True:
870 bracket_start = mapping.find("[", mapping_index)
871
872 if bracket_start == -1:
873 break
874
875 bracket_end = mapping.find("]", bracket_start)
876 if bracket_end == -1:
877 break
878
879 length = bracket_start - mapping_index
880 if (
881 length
882 and port_pci[pci_index : pci_index + length]
883 != mapping[mapping_index:bracket_start]
884 ):
885 return False
886
887 if (
888 port_pci[pci_index + length]
889 not in mapping[bracket_start + 1 : bracket_end]
890 ):
891 return False
892
893 pci_index += length + 1
894 mapping_index = bracket_end + 1
895
896 if port_pci[pci_index:] != mapping[mapping_index:]:
897 return False
898
899 return True
900
901 def _get_interfaces(self, vlds_to_connect, vim_account_id):
902 """
903 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
904 :param vim_account_id:
905 :return:
906 """
907 interfaces = []
908
909 for vld in vlds_to_connect:
910 table, _, db_id = vld.partition(":")
911 db_id, _, vld = db_id.partition(":")
912 _, _, vld_id = vld.partition(".")
913
914 if table == "vnfrs":
915 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
916 iface_key = "vnf-vld-id"
917 else: # table == "nsrs"
918 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
919 iface_key = "ns-vld-id"
920
921 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
922
923 for db_vnfr in db_vnfrs:
924 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
925 for iface_index, interface in enumerate(vdur["interfaces"]):
926 if interface.get(iface_key) == vld_id and interface.get(
927 "type"
928 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
929 # only SR-IOV o PT
930 interface_ = interface.copy()
931 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
932 db_vnfr["_id"], vdu_index, iface_index
933 )
934
935 if vdur.get("status") == "ERROR":
936 interface_["status"] = "ERROR"
937
938 interfaces.append(interface_)
939
940 return interfaces
941
942 def refresh(self, ro_task):
943 # look for task create
944 task_create_index, _ = next(
945 i_t
946 for i_t in enumerate(ro_task["tasks"])
947 if i_t[1]
948 and i_t[1]["action"] == "CREATE"
949 and i_t[1]["status"] != "FINISHED"
950 )
951
952 return self.new(ro_task, task_create_index, None)
953
954 def new(self, ro_task, task_index, task_depends):
955
956 task = ro_task["tasks"][task_index]
957 task_id = task["task_id"]
958 target_vim = self.my_vims[ro_task["target_id"]]
959
960 sdn_net_id = ro_task["vim_info"]["vim_id"]
961
962 created_items = ro_task["vim_info"].get("created_items")
963 connected_ports = ro_task["vim_info"].get("connected_ports", [])
964 new_connected_ports = []
965 last_update = ro_task["vim_info"].get("last_update", 0)
966 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
967 error_list = []
968 created = ro_task["vim_info"].get("created", False)
969
970 try:
971 # CREATE
972 params = task["params"]
973 vlds_to_connect = params["vlds"]
974 associated_vim = params["target_vim"]
975 # external additional ports
976 additional_ports = params.get("sdn-ports") or ()
977 _, _, vim_account_id = associated_vim.partition(":")
978
979 if associated_vim:
980 # get associated VIM
981 if associated_vim not in self.db_vims:
982 self.db_vims[associated_vim] = self.db.get_one(
983 "vim_accounts", {"_id": vim_account_id}
984 )
985
986 db_vim = self.db_vims[associated_vim]
987
988 # look for ports to connect
989 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
990 # print(ports)
991
992 sdn_ports = []
993 pending_ports = error_ports = 0
994 vlan_used = None
995 sdn_need_update = False
996
997 for port in ports:
998 vlan_used = port.get("vlan") or vlan_used
999
1000 # TODO. Do not connect if already done
1001 if not port.get("compute_node") or not port.get("pci"):
1002 if port.get("status") == "ERROR":
1003 error_ports += 1
1004 else:
1005 pending_ports += 1
1006 continue
1007
1008 pmap = None
1009 compute_node_mappings = next(
1010 (
1011 c
1012 for c in db_vim["config"].get("sdn-port-mapping", ())
1013 if c and c["compute_node"] == port["compute_node"]
1014 ),
1015 None,
1016 )
1017
1018 if compute_node_mappings:
1019 # process port_mapping pci of type 0000:af:1[01].[1357]
1020 pmap = next(
1021 (
1022 p
1023 for p in compute_node_mappings["ports"]
1024 if self._match_pci(port["pci"], p.get("pci"))
1025 ),
1026 None,
1027 )
1028
1029 if not pmap:
1030 if not db_vim["config"].get("mapping_not_needed"):
1031 error_list.append(
1032 "Port mapping not found for compute_node={} pci={}".format(
1033 port["compute_node"], port["pci"]
1034 )
1035 )
1036 continue
1037
1038 pmap = {}
1039
1040 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1041 new_port = {
1042 "service_endpoint_id": pmap.get("service_endpoint_id")
1043 or service_endpoint_id,
1044 "service_endpoint_encapsulation_type": "dot1q"
1045 if port["type"] == "SR-IOV"
1046 else None,
1047 "service_endpoint_encapsulation_info": {
1048 "vlan": port.get("vlan"),
1049 "mac": port.get("mac-address"),
1050 "device_id": pmap.get("device_id") or port["compute_node"],
1051 "device_interface_id": pmap.get("device_interface_id")
1052 or port["pci"],
1053 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1054 "switch_port": pmap.get("switch_port"),
1055 "service_mapping_info": pmap.get("service_mapping_info"),
1056 },
1057 }
1058
1059 # TODO
1060 # if port["modified_at"] > last_update:
1061 # sdn_need_update = True
1062 new_connected_ports.append(port["id"]) # TODO
1063 sdn_ports.append(new_port)
1064
1065 if error_ports:
1066 error_list.append(
1067 "{} interfaces have not been created as VDU is on ERROR status".format(
1068 error_ports
1069 )
1070 )
1071
1072 # connect external ports
1073 for index, additional_port in enumerate(additional_ports):
1074 additional_port_id = additional_port.get(
1075 "service_endpoint_id"
1076 ) or "external-{}".format(index)
1077 sdn_ports.append(
1078 {
1079 "service_endpoint_id": additional_port_id,
1080 "service_endpoint_encapsulation_type": additional_port.get(
1081 "service_endpoint_encapsulation_type", "dot1q"
1082 ),
1083 "service_endpoint_encapsulation_info": {
1084 "vlan": additional_port.get("vlan") or vlan_used,
1085 "mac": additional_port.get("mac_address"),
1086 "device_id": additional_port.get("device_id"),
1087 "device_interface_id": additional_port.get(
1088 "device_interface_id"
1089 ),
1090 "switch_dpid": additional_port.get("switch_dpid")
1091 or additional_port.get("switch_id"),
1092 "switch_port": additional_port.get("switch_port"),
1093 "service_mapping_info": additional_port.get(
1094 "service_mapping_info"
1095 ),
1096 },
1097 }
1098 )
1099 new_connected_ports.append(additional_port_id)
1100 sdn_info = ""
1101
1102 # if there are more ports to connect or they have been modified, call create/update
1103 if error_list:
1104 sdn_status = "ERROR"
1105 sdn_info = "; ".join(error_list)
1106 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1107 last_update = time.time()
1108
1109 if not sdn_net_id:
1110 if len(sdn_ports) < 2:
1111 sdn_status = "ACTIVE"
1112
1113 if not pending_ports:
1114 self.logger.debug(
1115 "task={} {} new-sdn-net done, less than 2 ports".format(
1116 task_id, ro_task["target_id"]
1117 )
1118 )
1119 else:
1120 net_type = params.get("type") or "ELAN"
1121 (
1122 sdn_net_id,
1123 created_items,
1124 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1125 created = True
1126 self.logger.debug(
1127 "task={} {} new-sdn-net={} created={}".format(
1128 task_id, ro_task["target_id"], sdn_net_id, created
1129 )
1130 )
1131 else:
1132 created_items = target_vim.edit_connectivity_service(
1133 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1134 )
1135 created = True
1136 self.logger.debug(
1137 "task={} {} update-sdn-net={} created={}".format(
1138 task_id, ro_task["target_id"], sdn_net_id, created
1139 )
1140 )
1141
1142 connected_ports = new_connected_ports
1143 elif sdn_net_id:
1144 wim_status_dict = target_vim.get_connectivity_service_status(
1145 sdn_net_id, conn_info=created_items
1146 )
1147 sdn_status = wim_status_dict["sdn_status"]
1148
1149 if wim_status_dict.get("sdn_info"):
1150 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1151
1152 if wim_status_dict.get("error_msg"):
1153 sdn_info = wim_status_dict.get("error_msg") or ""
1154
1155 if pending_ports:
1156 if sdn_status != "ERROR":
1157 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1158 len(ports) - pending_ports, len(ports)
1159 )
1160
1161 if sdn_status == "ACTIVE":
1162 sdn_status = "BUILD"
1163
1164 ro_vim_item_update = {
1165 "vim_id": sdn_net_id,
1166 "vim_status": sdn_status,
1167 "created": created,
1168 "created_items": created_items,
1169 "connected_ports": connected_ports,
1170 "vim_details": sdn_info,
1171 "last_update": last_update,
1172 }
1173
1174 return sdn_status, ro_vim_item_update
1175 except Exception as e:
1176 self.logger.error(
1177 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1178 exc_info=not isinstance(
1179 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1180 ),
1181 )
1182 ro_vim_item_update = {
1183 "vim_status": "VIM_ERROR",
1184 "created": created,
1185 "vim_details": str(e),
1186 }
1187
1188 return "FAILED", ro_vim_item_update
1189
1190 def delete(self, ro_task, task_index):
1191 task = ro_task["tasks"][task_index]
1192 task_id = task["task_id"]
1193 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1194 ro_vim_item_update_ok = {
1195 "vim_status": "DELETED",
1196 "created": False,
1197 "vim_details": "DELETED",
1198 "vim_id": None,
1199 }
1200
1201 try:
1202 if sdn_vim_id:
1203 target_vim = self.my_vims[ro_task["target_id"]]
1204 target_vim.delete_connectivity_service(
1205 sdn_vim_id, ro_task["vim_info"].get("created_items")
1206 )
1207
1208 except Exception as e:
1209 if (
1210 isinstance(e, sdnconn.SdnConnectorError)
1211 and e.http_code == HTTPStatus.NOT_FOUND.value
1212 ):
1213 ro_vim_item_update_ok["vim_details"] = "already deleted"
1214 else:
1215 self.logger.error(
1216 "ro_task={} vim={} del-sdn-net={}: {}".format(
1217 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1218 ),
1219 exc_info=not isinstance(
1220 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1221 ),
1222 )
1223 ro_vim_item_update = {
1224 "vim_status": "VIM_ERROR",
1225 "vim_details": "Error while deleting: {}".format(e),
1226 }
1227
1228 return "FAILED", ro_vim_item_update
1229
1230 self.logger.debug(
1231 "task={} {} del-sdn-net={} {}".format(
1232 task_id,
1233 ro_task["target_id"],
1234 sdn_vim_id,
1235 ro_vim_item_update_ok.get("vim_details", ""),
1236 )
1237 )
1238
1239 return "DONE", ro_vim_item_update_ok
1240
1241
1242 class NsWorker(threading.Thread):
1243 REFRESH_BUILD = 5 # 5 seconds
1244 REFRESH_ACTIVE = 60 # 1 minute
1245 REFRESH_ERROR = 600
1246 REFRESH_IMAGE = 3600 * 10
1247 REFRESH_DELETE = 3600 * 10
1248 QUEUE_SIZE = 100
1249 terminate = False
1250
1251 def __init__(self, worker_index, config, plugins, db):
1252 """
1253
1254 :param worker_index: thread index
1255 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1256 :param plugins: global shared dict with the loaded plugins
1257 :param db: database class instance to use
1258 """
1259 threading.Thread.__init__(self)
1260 self.config = config
1261 self.plugins = plugins
1262 self.plugin_name = "unknown"
1263 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1264 self.worker_index = worker_index
1265 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1266 # targetvim: vimplugin class
1267 self.my_vims = {}
1268 # targetvim: vim information from database
1269 self.db_vims = {}
1270 # targetvim list
1271 self.vim_targets = []
1272 self.my_id = config["process_id"] + ":" + str(worker_index)
1273 self.db = db
1274 self.item2class = {
1275 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1276 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1277 "image": VimInteractionImage(
1278 self.db, self.my_vims, self.db_vims, self.logger
1279 ),
1280 "flavor": VimInteractionFlavor(
1281 self.db, self.my_vims, self.db_vims, self.logger
1282 ),
1283 "sdn_net": VimInteractionSdnNet(
1284 self.db, self.my_vims, self.db_vims, self.logger
1285 ),
1286 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1287 self.db, self.my_vims, self.db_vims, self.logger
1288 ),
1289 }
1290 self.time_last_task_processed = None
1291 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1292 self.tasks_to_delete = []
1293 # it is idle when there are not vim_targets associated
1294 self.idle = True
1295 self.task_locked_time = config["global"]["task_locked_time"]
1296
1297 def insert_task(self, task):
1298 try:
1299 self.task_queue.put(task, False)
1300 return None
1301 except queue.Full:
1302 raise NsWorkerException("timeout inserting a task")
1303
1304 def terminate(self):
1305 self.insert_task("exit")
1306
1307 def del_task(self, task):
1308 with self.task_lock:
1309 if task["status"] == "SCHEDULED":
1310 task["status"] = "SUPERSEDED"
1311 return True
1312 else: # task["status"] == "processing"
1313 self.task_lock.release()
1314 return False
1315
1316 def _process_vim_config(self, target_id, db_vim):
1317 """
1318 Process vim config, creating vim configuration files as ca_cert
1319 :param target_id: vim/sdn/wim + id
1320 :param db_vim: Vim dictionary obtained from database
1321 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1322 """
1323 if not db_vim.get("config"):
1324 return
1325
1326 file_name = ""
1327
1328 try:
1329 if db_vim["config"].get("ca_cert_content"):
1330 file_name = "{}:{}".format(target_id, self.worker_index)
1331
1332 try:
1333 mkdir(file_name)
1334 except FileExistsError:
1335 pass
1336
1337 file_name = file_name + "/ca_cert"
1338
1339 with open(file_name, "w") as f:
1340 f.write(db_vim["config"]["ca_cert_content"])
1341 del db_vim["config"]["ca_cert_content"]
1342 db_vim["config"]["ca_cert"] = file_name
1343 except Exception as e:
1344 raise NsWorkerException(
1345 "Error writing to file '{}': {}".format(file_name, e)
1346 )
1347
1348 def _load_plugin(self, name, type="vim"):
1349 # type can be vim or sdn
1350 if "rovim_dummy" not in self.plugins:
1351 self.plugins["rovim_dummy"] = VimDummyConnector
1352
1353 if "rosdn_dummy" not in self.plugins:
1354 self.plugins["rosdn_dummy"] = SdnDummyConnector
1355
1356 if name in self.plugins:
1357 return self.plugins[name]
1358
1359 try:
1360 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1361 self.plugins[name] = ep.load()
1362 except Exception as e:
1363 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1364
1365 if name and name not in self.plugins:
1366 raise NsWorkerException(
1367 "Plugin 'osm_{n}' has not been installed".format(n=name)
1368 )
1369
1370 return self.plugins[name]
1371
1372 def _unload_vim(self, target_id):
1373 """
1374 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1375 :param target_id: Contains type:_id; where type can be 'vim', ...
1376 :return: None.
1377 """
1378 try:
1379 self.db_vims.pop(target_id, None)
1380 self.my_vims.pop(target_id, None)
1381
1382 if target_id in self.vim_targets:
1383 self.vim_targets.remove(target_id)
1384
1385 self.logger.info("Unloaded {}".format(target_id))
1386 rmtree("{}:{}".format(target_id, self.worker_index))
1387 except FileNotFoundError:
1388 pass # this is raised by rmtree if folder does not exist
1389 except Exception as e:
1390 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1391
1392 def _check_vim(self, target_id):
1393 """
1394 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1395 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1396 :return: None.
1397 """
1398 target, _, _id = target_id.partition(":")
1399 now = time.time()
1400 update_dict = {}
1401 unset_dict = {}
1402 op_text = ""
1403 step = ""
1404 loaded = target_id in self.vim_targets
1405 target_database = (
1406 "vim_accounts"
1407 if target == "vim"
1408 else "wim_accounts"
1409 if target == "wim"
1410 else "sdns"
1411 )
1412
1413 try:
1414 step = "Getting {} from db".format(target_id)
1415 db_vim = self.db.get_one(target_database, {"_id": _id})
1416
1417 for op_index, operation in enumerate(
1418 db_vim["_admin"].get("operations", ())
1419 ):
1420 if operation["operationState"] != "PROCESSING":
1421 continue
1422
1423 locked_at = operation.get("locked_at")
1424
1425 if locked_at is not None and locked_at >= now - self.task_locked_time:
1426 # some other thread is doing this operation
1427 return
1428
1429 # lock
1430 op_text = "_admin.operations.{}.".format(op_index)
1431
1432 if not self.db.set_one(
1433 target_database,
1434 q_filter={
1435 "_id": _id,
1436 op_text + "operationState": "PROCESSING",
1437 op_text + "locked_at": locked_at,
1438 },
1439 update_dict={
1440 op_text + "locked_at": now,
1441 "admin.current_operation": op_index,
1442 },
1443 fail_on_empty=False,
1444 ):
1445 return
1446
1447 unset_dict[op_text + "locked_at"] = None
1448 unset_dict["current_operation"] = None
1449 step = "Loading " + target_id
1450 error_text = self._load_vim(target_id)
1451
1452 if not error_text:
1453 step = "Checking connectivity"
1454
1455 if target == "vim":
1456 self.my_vims[target_id].check_vim_connectivity()
1457 else:
1458 self.my_vims[target_id].check_credentials()
1459
1460 update_dict["_admin.operationalState"] = "ENABLED"
1461 update_dict["_admin.detailed-status"] = ""
1462 unset_dict[op_text + "detailed-status"] = None
1463 update_dict[op_text + "operationState"] = "COMPLETED"
1464
1465 return
1466
1467 except Exception as e:
1468 error_text = "{}: {}".format(step, e)
1469 self.logger.error("{} for {}: {}".format(step, target_id, e))
1470
1471 finally:
1472 if update_dict or unset_dict:
1473 if error_text:
1474 update_dict[op_text + "operationState"] = "FAILED"
1475 update_dict[op_text + "detailed-status"] = error_text
1476 unset_dict.pop(op_text + "detailed-status", None)
1477 update_dict["_admin.operationalState"] = "ERROR"
1478 update_dict["_admin.detailed-status"] = error_text
1479
1480 if op_text:
1481 update_dict[op_text + "statusEnteredTime"] = now
1482
1483 self.db.set_one(
1484 target_database,
1485 q_filter={"_id": _id},
1486 update_dict=update_dict,
1487 unset=unset_dict,
1488 fail_on_empty=False,
1489 )
1490
1491 if not loaded:
1492 self._unload_vim(target_id)
1493
1494 def _reload_vim(self, target_id):
1495 if target_id in self.vim_targets:
1496 self._load_vim(target_id)
1497 else:
1498 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1499 # just remove it to force load again next time it is needed
1500 self.db_vims.pop(target_id, None)
1501
1502 def _load_vim(self, target_id):
1503 """
1504 Load or reload a vim_account, sdn_controller or wim_account.
1505 Read content from database, load the plugin if not loaded.
1506 In case of error loading the plugin, it load a failing VIM_connector
1507 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1508 :param target_id: Contains type:_id; where type can be 'vim', ...
1509 :return: None if ok, descriptive text if error
1510 """
1511 target, _, _id = target_id.partition(":")
1512 target_database = (
1513 "vim_accounts"
1514 if target == "vim"
1515 else "wim_accounts"
1516 if target == "wim"
1517 else "sdns"
1518 )
1519 plugin_name = ""
1520 vim = None
1521
1522 try:
1523 step = "Getting {}={} from db".format(target, _id)
1524 # TODO process for wim, sdnc, ...
1525 vim = self.db.get_one(target_database, {"_id": _id})
1526
1527 # if deep_get(vim, "config", "sdn-controller"):
1528 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1529 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1530
1531 step = "Decrypting password"
1532 schema_version = vim.get("schema_version")
1533 self.db.encrypt_decrypt_fields(
1534 vim,
1535 "decrypt",
1536 fields=("password", "secret"),
1537 schema_version=schema_version,
1538 salt=_id,
1539 )
1540 self._process_vim_config(target_id, vim)
1541
1542 if target == "vim":
1543 plugin_name = "rovim_" + vim["vim_type"]
1544 step = "Loading plugin '{}'".format(plugin_name)
1545 vim_module_conn = self._load_plugin(plugin_name)
1546 step = "Loading {}'".format(target_id)
1547 self.my_vims[target_id] = vim_module_conn(
1548 uuid=vim["_id"],
1549 name=vim["name"],
1550 tenant_id=vim.get("vim_tenant_id"),
1551 tenant_name=vim.get("vim_tenant_name"),
1552 url=vim["vim_url"],
1553 url_admin=None,
1554 user=vim["vim_user"],
1555 passwd=vim["vim_password"],
1556 config=vim.get("config") or {},
1557 persistent_info={},
1558 )
1559 else: # sdn
1560 plugin_name = "rosdn_" + vim["type"]
1561 step = "Loading plugin '{}'".format(plugin_name)
1562 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1563 step = "Loading {}'".format(target_id)
1564 wim = deepcopy(vim)
1565 wim_config = wim.pop("config", {}) or {}
1566 wim["uuid"] = wim["_id"]
1567 wim["wim_url"] = wim["url"]
1568
1569 if wim.get("dpid"):
1570 wim_config["dpid"] = wim.pop("dpid")
1571
1572 if wim.get("switch_id"):
1573 wim_config["switch_id"] = wim.pop("switch_id")
1574
1575 # wim, wim_account, config
1576 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1577 self.db_vims[target_id] = vim
1578 self.error_status = None
1579
1580 self.logger.info(
1581 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1582 )
1583 except Exception as e:
1584 self.logger.error(
1585 "Cannot load {} plugin={}: {} {}".format(
1586 target_id, plugin_name, step, e
1587 )
1588 )
1589
1590 self.db_vims[target_id] = vim or {}
1591 self.db_vims[target_id] = FailingConnector(str(e))
1592 error_status = "{} Error: {}".format(step, e)
1593
1594 return error_status
1595 finally:
1596 if target_id not in self.vim_targets:
1597 self.vim_targets.append(target_id)
1598
1599 def _get_db_task(self):
1600 """
1601 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1602 :return: None
1603 """
1604 now = time.time()
1605
1606 if not self.time_last_task_processed:
1607 self.time_last_task_processed = now
1608
1609 try:
1610 while True:
1611 """
1612 # Log RO tasks only when loglevel is DEBUG
1613 if self.logger.getEffectiveLevel() == logging.DEBUG:
1614 self._log_ro_task(
1615 None,
1616 None,
1617 None,
1618 "TASK_WF",
1619 "task_locked_time="
1620 + str(self.task_locked_time)
1621 + " "
1622 + "time_last_task_processed="
1623 + str(self.time_last_task_processed)
1624 + " "
1625 + "now="
1626 + str(now),
1627 )
1628 """
1629 locked = self.db.set_one(
1630 "ro_tasks",
1631 q_filter={
1632 "target_id": self.vim_targets,
1633 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1634 "locked_at.lt": now - self.task_locked_time,
1635 "to_check_at.lt": self.time_last_task_processed,
1636 },
1637 update_dict={"locked_by": self.my_id, "locked_at": now},
1638 fail_on_empty=False,
1639 )
1640
1641 if locked:
1642 # read and return
1643 ro_task = self.db.get_one(
1644 "ro_tasks",
1645 q_filter={
1646 "target_id": self.vim_targets,
1647 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1648 "locked_at": now,
1649 },
1650 )
1651 return ro_task
1652
1653 if self.time_last_task_processed == now:
1654 self.time_last_task_processed = None
1655 return None
1656 else:
1657 self.time_last_task_processed = now
1658 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1659
1660 except DbException as e:
1661 self.logger.error("Database exception at _get_db_task: {}".format(e))
1662 except Exception as e:
1663 self.logger.critical(
1664 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1665 )
1666
1667 return None
1668
1669 def _get_db_all_tasks(self):
1670 """
1671 Read all content of table ro_tasks to log it
1672 :return: None
1673 """
1674 try:
1675 # Checking the content of the BD:
1676
1677 # read and return
1678 ro_task = self.db.get_list("ro_tasks")
1679 for rt in ro_task:
1680 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1681 return ro_task
1682
1683 except DbException as e:
1684 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1685 except Exception as e:
1686 self.logger.critical(
1687 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1688 )
1689
1690 return None
1691
1692 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1693 """
1694 Generate a log with the following format:
1695
1696 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1697 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1698 task_array_index;task_id;task_action;task_item;task_args
1699
1700 Example:
1701
1702 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1703 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1704 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1705 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1706 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1707 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1708 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1709 """
1710 try:
1711 line = []
1712 i = 0
1713 if ro_task is not None and isinstance(ro_task, dict):
1714 for t in ro_task["tasks"]:
1715 line.clear()
1716 line.append(mark)
1717 line.append(event)
1718 line.append(ro_task.get("_id", ""))
1719 line.append(str(ro_task.get("locked_at", "")))
1720 line.append(str(ro_task.get("modified_at", "")))
1721 line.append(str(ro_task.get("created_at", "")))
1722 line.append(str(ro_task.get("to_check_at", "")))
1723 line.append(str(ro_task.get("locked_by", "")))
1724 line.append(str(ro_task.get("target_id", "")))
1725 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1726 line.append(str(ro_task.get("vim_info", "")))
1727 line.append(str(ro_task.get("tasks", "")))
1728 if isinstance(t, dict):
1729 line.append(str(t.get("status", "")))
1730 line.append(str(t.get("action_id", "")))
1731 line.append(str(i))
1732 line.append(str(t.get("task_id", "")))
1733 line.append(str(t.get("action", "")))
1734 line.append(str(t.get("item", "")))
1735 line.append(str(t.get("find_params", "")))
1736 line.append(str(t.get("params", "")))
1737 else:
1738 line.extend([""] * 2)
1739 line.append(str(i))
1740 line.extend([""] * 5)
1741
1742 i += 1
1743 self.logger.debug(";".join(line))
1744 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1745 i = 0
1746 while True:
1747 st = "tasks.{}.status".format(i)
1748 if st not in db_ro_task_update:
1749 break
1750 line.clear()
1751 line.append(mark)
1752 line.append(event)
1753 line.append(db_ro_task_update.get("_id", ""))
1754 line.append(str(db_ro_task_update.get("locked_at", "")))
1755 line.append(str(db_ro_task_update.get("modified_at", "")))
1756 line.append("")
1757 line.append(str(db_ro_task_update.get("to_check_at", "")))
1758 line.append(str(db_ro_task_update.get("locked_by", "")))
1759 line.append("")
1760 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1761 line.append("")
1762 line.append(str(db_ro_task_update.get("vim_info", "")))
1763 line.append(str(str(db_ro_task_update).count(".status")))
1764 line.append(db_ro_task_update.get(st, ""))
1765 line.append("")
1766 line.append(str(i))
1767 line.extend([""] * 3)
1768 i += 1
1769 self.logger.debug(";".join(line))
1770
1771 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1772 line.clear()
1773 line.append(mark)
1774 line.append(event)
1775 line.append(db_ro_task_delete.get("_id", ""))
1776 line.append("")
1777 line.append(db_ro_task_delete.get("modified_at", ""))
1778 line.extend([""] * 13)
1779 self.logger.debug(";".join(line))
1780
1781 else:
1782 line.clear()
1783 line.append(mark)
1784 line.append(event)
1785 line.extend([""] * 16)
1786 self.logger.debug(";".join(line))
1787
1788 except Exception as e:
1789 self.logger.error("Error logging ro_task: {}".format(e))
1790
1791 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1792 """
1793 Determine if this task need to be done or superseded
1794 :return: None
1795 """
1796 my_task = ro_task["tasks"][task_index]
1797 task_id = my_task["task_id"]
1798 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1799 "created_items", False
1800 )
1801
1802 if my_task["status"] == "FAILED":
1803 return None, None # TODO need to be retry??
1804
1805 try:
1806 for index, task in enumerate(ro_task["tasks"]):
1807 if index == task_index or not task:
1808 continue # own task
1809
1810 if (
1811 my_task["target_record"] == task["target_record"]
1812 and task["action"] == "CREATE"
1813 ):
1814 # set to finished
1815 db_update["tasks.{}.status".format(index)] = task[
1816 "status"
1817 ] = "FINISHED"
1818 elif task["action"] == "CREATE" and task["status"] not in (
1819 "FINISHED",
1820 "SUPERSEDED",
1821 ):
1822 needed_delete = False
1823
1824 if needed_delete:
1825 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1826 else:
1827 return "SUPERSEDED", None
1828 except Exception as e:
1829 if not isinstance(e, NsWorkerException):
1830 self.logger.critical(
1831 "Unexpected exception at _delete_task task={}: {}".format(
1832 task_id, e
1833 ),
1834 exc_info=True,
1835 )
1836
1837 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1838
1839 def _create_task(self, ro_task, task_index, task_depends, db_update):
1840 """
1841 Determine if this task need to create something at VIM
1842 :return: None
1843 """
1844 my_task = ro_task["tasks"][task_index]
1845 task_id = my_task["task_id"]
1846 task_status = None
1847
1848 if my_task["status"] == "FAILED":
1849 return None, None # TODO need to be retry??
1850 elif my_task["status"] == "SCHEDULED":
1851 # check if already created by another task
1852 for index, task in enumerate(ro_task["tasks"]):
1853 if index == task_index or not task:
1854 continue # own task
1855
1856 if task["action"] == "CREATE" and task["status"] not in (
1857 "SCHEDULED",
1858 "FINISHED",
1859 "SUPERSEDED",
1860 ):
1861 return task["status"], "COPY_VIM_INFO"
1862
1863 try:
1864 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1865 ro_task, task_index, task_depends
1866 )
1867 # TODO update other CREATE tasks
1868 except Exception as e:
1869 if not isinstance(e, NsWorkerException):
1870 self.logger.error(
1871 "Error executing task={}: {}".format(task_id, e), exc_info=True
1872 )
1873
1874 task_status = "FAILED"
1875 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1876 # TODO update ro_vim_item_update
1877
1878 return task_status, ro_vim_item_update
1879 else:
1880 return None, None
1881
1882 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1883 """
1884 Look for dependency task
1885 :param task_id: Can be one of
1886 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1887 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1888 3. task.task_id: "<action_id>:number"
1889 :param ro_task:
1890 :param target_id:
1891 :return: database ro_task plus index of task
1892 """
1893 if (
1894 task_id.startswith("vim:")
1895 or task_id.startswith("sdn:")
1896 or task_id.startswith("wim:")
1897 ):
1898 target_id, _, task_id = task_id.partition(" ")
1899
1900 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1901 ro_task_dependency = self.db.get_one(
1902 "ro_tasks",
1903 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1904 fail_on_empty=False,
1905 )
1906
1907 if ro_task_dependency:
1908 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1909 if task["target_record_id"] == task_id:
1910 return ro_task_dependency, task_index
1911
1912 else:
1913 if ro_task:
1914 for task_index, task in enumerate(ro_task["tasks"]):
1915 if task and task["task_id"] == task_id:
1916 return ro_task, task_index
1917
1918 ro_task_dependency = self.db.get_one(
1919 "ro_tasks",
1920 q_filter={
1921 "tasks.ANYINDEX.task_id": task_id,
1922 "tasks.ANYINDEX.target_record.ne": None,
1923 },
1924 fail_on_empty=False,
1925 )
1926
1927 if ro_task_dependency:
1928 for task_index, task in ro_task_dependency["tasks"]:
1929 if task["task_id"] == task_id:
1930 return ro_task_dependency, task_index
1931 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1932
1933 def _process_pending_tasks(self, ro_task):
1934 ro_task_id = ro_task["_id"]
1935 now = time.time()
1936 # one day
1937 next_check_at = now + (24 * 60 * 60)
1938 db_ro_task_update = {}
1939
1940 def _update_refresh(new_status):
1941 # compute next_refresh
1942 nonlocal task
1943 nonlocal next_check_at
1944 nonlocal db_ro_task_update
1945 nonlocal ro_task
1946
1947 next_refresh = time.time()
1948
1949 if task["item"] in ("image", "flavor"):
1950 next_refresh += self.REFRESH_IMAGE
1951 elif new_status == "BUILD":
1952 next_refresh += self.REFRESH_BUILD
1953 elif new_status == "DONE":
1954 next_refresh += self.REFRESH_ACTIVE
1955 else:
1956 next_refresh += self.REFRESH_ERROR
1957
1958 next_check_at = min(next_check_at, next_refresh)
1959 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1960 ro_task["vim_info"]["refresh_at"] = next_refresh
1961
1962 try:
1963 """
1964 # Log RO tasks only when loglevel is DEBUG
1965 if self.logger.getEffectiveLevel() == logging.DEBUG:
1966 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1967 """
1968 # 0: get task_status_create
1969 lock_object = None
1970 task_status_create = None
1971 task_create = next(
1972 (
1973 t
1974 for t in ro_task["tasks"]
1975 if t
1976 and t["action"] == "CREATE"
1977 and t["status"] in ("BUILD", "DONE")
1978 ),
1979 None,
1980 )
1981
1982 if task_create:
1983 task_status_create = task_create["status"]
1984
1985 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1986 for task_action in ("DELETE", "CREATE", "EXEC"):
1987 db_vim_update = None
1988 new_status = None
1989
1990 for task_index, task in enumerate(ro_task["tasks"]):
1991 if not task:
1992 continue # task deleted
1993
1994 task_depends = {}
1995 target_update = None
1996
1997 if (
1998 (
1999 task_action in ("DELETE", "EXEC")
2000 and task["status"] not in ("SCHEDULED", "BUILD")
2001 )
2002 or task["action"] != task_action
2003 or (
2004 task_action == "CREATE"
2005 and task["status"] in ("FINISHED", "SUPERSEDED")
2006 )
2007 ):
2008 continue
2009
2010 task_path = "tasks.{}.status".format(task_index)
2011 try:
2012 db_vim_info_update = None
2013
2014 if task["status"] == "SCHEDULED":
2015 # check if tasks that this depends on have been completed
2016 dependency_not_completed = False
2017
2018 for dependency_task_id in task.get("depends_on") or ():
2019 (
2020 dependency_ro_task,
2021 dependency_task_index,
2022 ) = self._get_dependency(
2023 dependency_task_id, target_id=ro_task["target_id"]
2024 )
2025 dependency_task = dependency_ro_task["tasks"][
2026 dependency_task_index
2027 ]
2028
2029 if dependency_task["status"] == "SCHEDULED":
2030 dependency_not_completed = True
2031 next_check_at = min(
2032 next_check_at, dependency_ro_task["to_check_at"]
2033 )
2034 # must allow dependent task to be processed first
2035 # to do this set time after last_task_processed
2036 next_check_at = max(
2037 self.time_last_task_processed, next_check_at
2038 )
2039 break
2040 elif dependency_task["status"] == "FAILED":
2041 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2042 task["action"],
2043 task["item"],
2044 dependency_task["action"],
2045 dependency_task["item"],
2046 dependency_task_id,
2047 dependency_ro_task["vim_info"].get(
2048 "vim_details"
2049 ),
2050 )
2051 self.logger.error(
2052 "task={} {}".format(task["task_id"], error_text)
2053 )
2054 raise NsWorkerException(error_text)
2055
2056 task_depends[dependency_task_id] = dependency_ro_task[
2057 "vim_info"
2058 ]["vim_id"]
2059 task_depends[
2060 "TASK-{}".format(dependency_task_id)
2061 ] = dependency_ro_task["vim_info"]["vim_id"]
2062
2063 if dependency_not_completed:
2064 # TODO set at vim_info.vim_details that it is waiting
2065 continue
2066
2067 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2068 # the task of renew this locking. It will update database locket_at periodically
2069 if not lock_object:
2070 lock_object = LockRenew.add_lock_object(
2071 "ro_tasks", ro_task, self
2072 )
2073
2074 if task["action"] == "DELETE":
2075 (new_status, db_vim_info_update,) = self._delete_task(
2076 ro_task, task_index, task_depends, db_ro_task_update
2077 )
2078 new_status = (
2079 "FINISHED" if new_status == "DONE" else new_status
2080 )
2081 # ^with FINISHED instead of DONE it will not be refreshing
2082
2083 if new_status in ("FINISHED", "SUPERSEDED"):
2084 target_update = "DELETE"
2085 elif task["action"] == "EXEC":
2086 (
2087 new_status,
2088 db_vim_info_update,
2089 db_task_update,
2090 ) = self.item2class[task["item"]].exec(
2091 ro_task, task_index, task_depends
2092 )
2093 new_status = (
2094 "FINISHED" if new_status == "DONE" else new_status
2095 )
2096 # ^with FINISHED instead of DONE it will not be refreshing
2097
2098 if db_task_update:
2099 # load into database the modified db_task_update "retries" and "next_retry"
2100 if db_task_update.get("retries"):
2101 db_ro_task_update[
2102 "tasks.{}.retries".format(task_index)
2103 ] = db_task_update["retries"]
2104
2105 next_check_at = time.time() + db_task_update.get(
2106 "next_retry", 60
2107 )
2108 target_update = None
2109 elif task["action"] == "CREATE":
2110 if task["status"] == "SCHEDULED":
2111 if task_status_create:
2112 new_status = task_status_create
2113 target_update = "COPY_VIM_INFO"
2114 else:
2115 new_status, db_vim_info_update = self.item2class[
2116 task["item"]
2117 ].new(ro_task, task_index, task_depends)
2118 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2119 _update_refresh(new_status)
2120 else:
2121 if (
2122 ro_task["vim_info"]["refresh_at"]
2123 and now > ro_task["vim_info"]["refresh_at"]
2124 ):
2125 new_status, db_vim_info_update = self.item2class[
2126 task["item"]
2127 ].refresh(ro_task)
2128 _update_refresh(new_status)
2129 else:
2130 # The refresh is updated to avoid set the value of "refresh_at" to
2131 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2132 # because it can happen that in this case the task is never processed
2133 _update_refresh(task["status"])
2134
2135 except Exception as e:
2136 new_status = "FAILED"
2137 db_vim_info_update = {
2138 "vim_status": "VIM_ERROR",
2139 "vim_details": str(e),
2140 }
2141
2142 if not isinstance(
2143 e, (NsWorkerException, vimconn.VimConnException)
2144 ):
2145 self.logger.error(
2146 "Unexpected exception at _delete_task task={}: {}".format(
2147 task["task_id"], e
2148 ),
2149 exc_info=True,
2150 )
2151
2152 try:
2153 if db_vim_info_update:
2154 db_vim_update = db_vim_info_update.copy()
2155 db_ro_task_update.update(
2156 {
2157 "vim_info." + k: v
2158 for k, v in db_vim_info_update.items()
2159 }
2160 )
2161 ro_task["vim_info"].update(db_vim_info_update)
2162
2163 if new_status:
2164 if task_action == "CREATE":
2165 task_status_create = new_status
2166 db_ro_task_update[task_path] = new_status
2167
2168 if target_update or db_vim_update:
2169 if target_update == "DELETE":
2170 self._update_target(task, None)
2171 elif target_update == "COPY_VIM_INFO":
2172 self._update_target(task, ro_task["vim_info"])
2173 else:
2174 self._update_target(task, db_vim_update)
2175
2176 except Exception as e:
2177 if (
2178 isinstance(e, DbException)
2179 and e.http_code == HTTPStatus.NOT_FOUND
2180 ):
2181 # if the vnfrs or nsrs has been removed from database, this task must be removed
2182 self.logger.debug(
2183 "marking to delete task={}".format(task["task_id"])
2184 )
2185 self.tasks_to_delete.append(task)
2186 else:
2187 self.logger.error(
2188 "Unexpected exception at _update_target task={}: {}".format(
2189 task["task_id"], e
2190 ),
2191 exc_info=True,
2192 )
2193
2194 locked_at = ro_task["locked_at"]
2195
2196 if lock_object:
2197 locked_at = [
2198 lock_object["locked_at"],
2199 lock_object["locked_at"] + self.task_locked_time,
2200 ]
2201 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2202 # contain exactly locked_at + self.task_locked_time
2203 LockRenew.remove_lock_object(lock_object)
2204
2205 q_filter = {
2206 "_id": ro_task["_id"],
2207 "to_check_at": ro_task["to_check_at"],
2208 "locked_at": locked_at,
2209 }
2210 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2211 # outside this task (by ro_nbi) do not update it
2212 db_ro_task_update["locked_by"] = None
2213 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2214 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2215 db_ro_task_update["modified_at"] = now
2216 db_ro_task_update["to_check_at"] = next_check_at
2217
2218 """
2219 # Log RO tasks only when loglevel is DEBUG
2220 if self.logger.getEffectiveLevel() == logging.DEBUG:
2221 db_ro_task_update_log = db_ro_task_update.copy()
2222 db_ro_task_update_log["_id"] = q_filter["_id"]
2223 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2224 """
2225
2226 if not self.db.set_one(
2227 "ro_tasks",
2228 update_dict=db_ro_task_update,
2229 q_filter=q_filter,
2230 fail_on_empty=False,
2231 ):
2232 del db_ro_task_update["to_check_at"]
2233 del q_filter["to_check_at"]
2234 """
2235 # Log RO tasks only when loglevel is DEBUG
2236 if self.logger.getEffectiveLevel() == logging.DEBUG:
2237 self._log_ro_task(
2238 None,
2239 db_ro_task_update_log,
2240 None,
2241 "TASK_WF",
2242 "SET_TASK " + str(q_filter),
2243 )
2244 """
2245 self.db.set_one(
2246 "ro_tasks",
2247 q_filter=q_filter,
2248 update_dict=db_ro_task_update,
2249 fail_on_empty=True,
2250 )
2251 except DbException as e:
2252 self.logger.error(
2253 "ro_task={} Error updating database {}".format(ro_task_id, e)
2254 )
2255 except Exception as e:
2256 self.logger.error(
2257 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2258 )
2259
2260 def _update_target(self, task, ro_vim_item_update):
2261 table, _, temp = task["target_record"].partition(":")
2262 _id, _, path_vim_status = temp.partition(":")
2263 path_item = path_vim_status[: path_vim_status.rfind(".")]
2264 path_item = path_item[: path_item.rfind(".")]
2265 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2266 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2267
2268 if ro_vim_item_update:
2269 update_dict = {
2270 path_vim_status + "." + k: v
2271 for k, v in ro_vim_item_update.items()
2272 if k
2273 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2274 }
2275
2276 if path_vim_status.startswith("vdur."):
2277 # for backward compatibility, add vdur.name apart from vdur.vim_name
2278 if ro_vim_item_update.get("vim_name"):
2279 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2280
2281 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2282 if ro_vim_item_update.get("vim_id"):
2283 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2284
2285 # update general status
2286 if ro_vim_item_update.get("vim_status"):
2287 update_dict[path_item + ".status"] = ro_vim_item_update[
2288 "vim_status"
2289 ]
2290
2291 if ro_vim_item_update.get("interfaces"):
2292 path_interfaces = path_item + ".interfaces"
2293
2294 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2295 if iface:
2296 update_dict.update(
2297 {
2298 path_interfaces + ".{}.".format(i) + k: v
2299 for k, v in iface.items()
2300 if k in ("vlan", "compute_node", "pci")
2301 }
2302 )
2303
2304 # put ip_address and mac_address with ip-address and mac-address
2305 if iface.get("ip_address"):
2306 update_dict[
2307 path_interfaces + ".{}.".format(i) + "ip-address"
2308 ] = iface["ip_address"]
2309
2310 if iface.get("mac_address"):
2311 update_dict[
2312 path_interfaces + ".{}.".format(i) + "mac-address"
2313 ] = iface["mac_address"]
2314
2315 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2316 update_dict["ip-address"] = iface.get("ip_address").split(
2317 ";"
2318 )[0]
2319
2320 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2321 update_dict[path_item + ".ip-address"] = iface.get(
2322 "ip_address"
2323 ).split(";")[0]
2324
2325 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2326 else:
2327 update_dict = {path_item + ".status": "DELETED"}
2328 self.db.set_one(
2329 table,
2330 q_filter={"_id": _id},
2331 update_dict=update_dict,
2332 unset={path_vim_status: None},
2333 )
2334
2335 def _process_delete_db_tasks(self):
2336 """
2337 Delete task from database because vnfrs or nsrs or both have been deleted
2338 :return: None. Uses and modify self.tasks_to_delete
2339 """
2340 while self.tasks_to_delete:
2341 task = self.tasks_to_delete[0]
2342 vnfrs_deleted = None
2343 nsr_id = task["nsr_id"]
2344
2345 if task["target_record"].startswith("vnfrs:"):
2346 # check if nsrs is present
2347 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2348 vnfrs_deleted = task["target_record"].split(":")[1]
2349
2350 try:
2351 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2352 except Exception as e:
2353 self.logger.error(
2354 "Error deleting task={}: {}".format(task["task_id"], e)
2355 )
2356 self.tasks_to_delete.pop(0)
2357
2358 @staticmethod
2359 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2360 """
2361 Static method because it is called from osm_ng_ro.ns
2362 :param db: instance of database to use
2363 :param nsr_id: affected nsrs id
2364 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2365 :return: None, exception is fails
2366 """
2367 retries = 5
2368 for retry in range(retries):
2369 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2370 now = time.time()
2371 conflict = False
2372
2373 for ro_task in ro_tasks:
2374 db_update = {}
2375 to_delete_ro_task = True
2376
2377 for index, task in enumerate(ro_task["tasks"]):
2378 if not task:
2379 pass
2380 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2381 vnfrs_deleted
2382 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2383 ):
2384 db_update["tasks.{}".format(index)] = None
2385 else:
2386 # used by other nsr, ro_task cannot be deleted
2387 to_delete_ro_task = False
2388
2389 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2390 if to_delete_ro_task:
2391 if not db.del_one(
2392 "ro_tasks",
2393 q_filter={
2394 "_id": ro_task["_id"],
2395 "modified_at": ro_task["modified_at"],
2396 },
2397 fail_on_empty=False,
2398 ):
2399 conflict = True
2400 elif db_update:
2401 db_update["modified_at"] = now
2402 if not db.set_one(
2403 "ro_tasks",
2404 q_filter={
2405 "_id": ro_task["_id"],
2406 "modified_at": ro_task["modified_at"],
2407 },
2408 update_dict=db_update,
2409 fail_on_empty=False,
2410 ):
2411 conflict = True
2412 if not conflict:
2413 return
2414 else:
2415 raise NsWorkerException("Exceeded {} retries".format(retries))
2416
2417 def run(self):
2418 # load database
2419 self.logger.info("Starting")
2420 while True:
2421 # step 1: get commands from queue
2422 try:
2423 if self.vim_targets:
2424 task = self.task_queue.get(block=False)
2425 else:
2426 if not self.idle:
2427 self.logger.debug("enters in idle state")
2428 self.idle = True
2429 task = self.task_queue.get(block=True)
2430 self.idle = False
2431
2432 if task[0] == "terminate":
2433 break
2434 elif task[0] == "load_vim":
2435 self.logger.info("order to load vim {}".format(task[1]))
2436 self._load_vim(task[1])
2437 elif task[0] == "unload_vim":
2438 self.logger.info("order to unload vim {}".format(task[1]))
2439 self._unload_vim(task[1])
2440 elif task[0] == "reload_vim":
2441 self._reload_vim(task[1])
2442 elif task[0] == "check_vim":
2443 self.logger.info("order to check vim {}".format(task[1]))
2444 self._check_vim(task[1])
2445 continue
2446 except Exception as e:
2447 if isinstance(e, queue.Empty):
2448 pass
2449 else:
2450 self.logger.critical(
2451 "Error processing task: {}".format(e), exc_info=True
2452 )
2453
2454 # step 2: process pending_tasks, delete not needed tasks
2455 try:
2456 if self.tasks_to_delete:
2457 self._process_delete_db_tasks()
2458 busy = False
2459 """
2460 # Log RO tasks only when loglevel is DEBUG
2461 if self.logger.getEffectiveLevel() == logging.DEBUG:
2462 _ = self._get_db_all_tasks()
2463 """
2464 ro_task = self._get_db_task()
2465 if ro_task:
2466 self._process_pending_tasks(ro_task)
2467 busy = True
2468 if not busy:
2469 time.sleep(5)
2470 except Exception as e:
2471 self.logger.critical(
2472 "Unexpected exception at run: " + str(e), exc_info=True
2473 )
2474
2475 self.logger.info("Finishing")